mail@pastecode.io avatar
17 days ago
22 kB
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.nifi.cdc.mssql.processors;

import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.cdc.CDCException;
import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;

import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import java.math.BigDecimal;

@Tags({"sql", "jdbc", "cdc", "mssql"})
@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
        + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred. In a cluster, it is recommended to run "
        + "this processor on primary only.")
@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
        + "that it can continue from the same point in time if restarted.")
        @WritesAttribute(attribute = "tablename", description = "Name of the table this changeset was captured from."),
        @WritesAttribute(attribute = "mssqlcdc.row.count", description = "The number of rows in this changeset"),
        @WritesAttribute(attribute = "fullsnapshot", description = "Whether this was a full snapshot of the base table or not.")})
@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
        + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. "
        + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
    public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";

    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
            .displayName("Record Writer")
            .description("Specifies the Controller Service to use for writing out the records")

    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
            .displayName("Database Connection Pooling Service")
            .description("The Controller Service that is used to obtain connection to database")

    public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
            .displayName("CDC Table List")
            .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
                    + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")

    public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
            .displayName("Generate an Initial Source Table Snapshot")
            .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
                    + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
                    + "for extracting CDC changes.")
            .allowableValues("true", "false")

    public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor.Builder()
            .displayName("Changeset Row Limit")
            .description("If a very large change occurs on the source table, "
                    + "the generated changeset may be too large to quickly merge into a destination system. "
                    + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
                    + "The fullsnapshot attribute will be set to true when this happens.")

    public static final PropertyDescriptor INCLUDE_PREUPDATE_VALUES = new PropertyDescriptor.Builder()
            .displayName("Include Pre-Update Values")
            .description("When an update transaction occurs, should both the old values (operation=3) and the new values (operation=4) be returned. "
                    + "If true, both rows will always be in the same FlowFile and be sequential records, old values followed by new values. "
                    + "If you have unchangeable primary key(s) you do not need the old values to match-up the updated record to an old version, "
                    + "and you could reduce the number of rows to process by setting this property to false.")
            .allowableValues("true", "false")

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .description("Successfully created FlowFile from SQL query result set.")

    protected List<PropertyDescriptor> descriptors;
    protected Set<Relationship> relationships;

    protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>();

    // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
    protected Map<String, String> maxValueProperties;
    protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();

    public MSSQLCDCUtils getMssqlcdcUtils() {
        return mssqlcdcUtils;

    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        this.relationships = Collections.unmodifiableSet(relationships);

    public Set<Relationship> getRelationships() {
        return this.relationships;

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
        if (!propertyDescriptorName.startsWith("initial.timestamp.")) {
            return null;

        return new PropertyDescriptor.Builder()
                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        ProcessSession session = processSessionFactory.createSession();

        final ComponentLog logger = getLogger();
        final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);

        final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean();
        final boolean includePreupdateValues = processContext.getProperty(INCLUDE_PREUPDATE_VALUES).asBoolean();
        final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).evaluateAttributeExpressions().asInteger();

        final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]);

        final String tablesProp = processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue();
        String[] tables;
        if (tablesProp == null || tablesProp.length() == 0) {
            tables = allTables;
        } else {
            tables = tablesProp

        final StateManager stateManager = processContext.getStateManager();
        final StateMap stateMap;
        try {
            stateMap = stateManager.getState(Scope.CLUSTER);
        } catch (final IOException ioe) {
            logger.error("Failed to retrieve observed current timestamp values from the State Manager. Will not perform "
                    + "query until this is accomplished.", ioe);

        // Make a mutable copy of the current state property map. This will be updated and eventually
        // set as the current state map (after the session has been committed)
        final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());

        // If an initial max value for the table has been specified using properties, and this table is not in the state manager, sync it to the state property map
        for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
            String maxPropKey = maxProp.getKey().toLowerCase();
            String tableName = getStateKey(INITIAL_TIMESTAMP_PROP_START, maxPropKey);
            if (!statePropertyMap.containsKey(tableName)) {
                String newMaxPropValue = maxProp.getValue();

                statePropertyMap.put(tableName, newMaxPropValue);

        //Build a capture plan for each table
        ArrayList<TableCapturePlan> tableCapturePlans = new ArrayList<>();
        try (final Connection con = dbcpService.getConnection()) {
            for (String t : tables) {
                final String tableKey = t.toLowerCase();
                if (!schemaCache.containsKey(tableKey)) {
                    throw new ProcessException("Unknown CDC enabled table named " + t + ". Known table names: " + String.join(", ", allTables));

                final MSSQLTableInfo tableInfo = schemaCache.get(tableKey);

                //Get Max Timestamp from state (if it exists)
                String sTime = null;
                if (statePropertyMap.containsKey(tableKey)) {
                    sTime = statePropertyMap.get(tableKey);
                    logger.info("Table `{}` using Timestamp '{}'", new Object[]{tableKey, sTime});
                } else {
                    logger.info("Table `{}` has no saved Timestamp", new Object[]{tableKey});

                final TableCapturePlan tableCapturePlan = new TableCapturePlan(tableInfo, fullSnapshotRowLimit, takeInitialSnapshot, includePreupdateValues, sTime);

                //Determine Plan Type
                tableCapturePlan.computeCapturePlan(con, getMssqlcdcUtils());


            for (TableCapturePlan capturePlan : tableCapturePlans) {
                final String selectQuery;

                if (capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC) {
                    selectQuery = getMssqlcdcUtils().getCDCSelectStatement(capturePlan.getTable(), capturePlan.getIncludePreupdateValues(), capturePlan.getMaxTime());
                } else if (capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT) {
                    selectQuery = getMssqlcdcUtils().getSnapshotSelectStatement(capturePlan.getTable());
                } else {
                    throw new ProcessException("Unknown Capture Plan type, '" + capturePlan.getPlanType() + "'.");

                logger.debug("SQL Statement for `{}`: {}", new Object[]{capturePlan.getTable(), selectQuery});

                FlowFile cdcFlowFile = session.create();
                try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
                    if (capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC && capturePlan.getMaxTime() != null) {
                        st.setString(1, capturePlan.getMaxTime());

                    final ResultSet resultSet = st.executeQuery();

                    final Map<String, String> attributes = new HashMap<>();
                    Timestamp maxTimestamp = null;
                    long rows = 0L;
                    int fieldCount;
                    try (final ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null);
                         final OutputStream out = session.write(cdcFlowFile)) {
                        final RecordSchema writeSchema = resultSetRecordSet.getSchema();
                        fieldCount = writeSchema.getFieldCount();
                        try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, cdcFlowFile)) {

                            Record record;
                            while ((record = resultSetRecordSet.next()) != null) {
                                //final String start_lsn = String.valueOf(getMssqlcdcUtils().hexToDecimal(record.getValue("start_lsn").toString()));
                                //final String seqval = String.valueOf(getMssqlcdcUtils().hexToDecimal(record.getValue("seqval").toString()));
                                //record.setValue("start_lsn", start_lsn);
                                //record.setValue("seqval", seqval);
                                //String start_lsn_hex = new String(record.getValue("start_lsn").toString().replaceFirst("0x", ""));
                                //String seqval_hex = new String(record.getValue("seqval").toString().replaceFirst("0x", ""));
                                record.setValue("start_lsn", new BigDecimal(record.getValue("start_lsn"));
                                //record.setValue("seqval", new BigInteger(seqval_hex, 16).toString());

                                maxTimestamp = (Timestamp) record.getValue("tran_end_time");


                            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                        } catch (SchemaNotFoundException e) {

                    if (rows == 0) {

                    attributes.put("tablename", capturePlan.getTable().getSourceTableName());
                    attributes.put("mssqlcdc.row.count", Long.toString(rows));
                    attributes.put("maxvalue.tran_end_time", maxTimestamp.toString());
                    attributes.put("fullsnapshot", Boolean.toString(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT));

                    cdcFlowFile = session.putAllAttributes(cdcFlowFile, attributes);
                    session.transfer(cdcFlowFile, REL_SUCCESS);

                    statePropertyMap.put(capturePlan.getTable().getSourceTableName().toLowerCase(), maxTimestamp.toString());
                    stateManager.setState(statePropertyMap, Scope.CLUSTER);

                } catch (IOException e) {
                    throw new ProcessException("Failed to update cluster state with new timestamps.", e);
        } catch (SQLException e) {
            throw new ProcessException("Error working with MS SQL CDC Database.", e);

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;

    public void onScheduled(final ProcessContext context) {
        //Prefetch list of all CDC tables and their schemas.
        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        try (final Connection con = dbcpService.getConnection()) {
            List<MSSQLTableInfo> tableSchemas = getMssqlcdcUtils().getCDCTableList(con);

            for (MSSQLTableInfo ti : tableSchemas) {
                schemaCache.put(ti.getSourceTableName().toLowerCase(), ti);
        } catch (SQLException e) {
            throw new ProcessException("Unable to communicate with database in order to determine CDC tables", e);
        } catch (CDCException e) {
            throw new ProcessException(e.getMessage(), e);

        maxValueProperties = getDefaultMaxValueProperties(context.getProperties());

    protected static String getStateKey(String prefix, String tableName) {
        return tableName.toLowerCase().replace(prefix, "");

    protected Map<String, String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties) {
        final Map<String, String> defaultMaxValues = new HashMap<>();

        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
            final String key = entry.getKey().getName();

            if (!key.startsWith(INITIAL_TIMESTAMP_PROP_START)) {

            defaultMaxValues.put(key.substring(INITIAL_TIMESTAMP_PROP_START.length()), entry.getValue());

        return defaultMaxValues;
Leave a Comment