Kafka Connect – Debezium Source Connector for SQL Server

Overview

Configuring a distributed Kafka Connect cluster is an important step when working with Apache Kafka for data integration. This blog will guide you through the process of setting up a Kafka Connect cluster in a distributed mode.

Follow the below steps to setup a local distributed connect cluster backed by a source Kafka cluster in Confluent Cloud

Prerequisites

  1. Access to Confluent Cloud (API Key and API Secret)
  2. Confluent CLI 

Set up a local Connect Worker with Confluent Platform Install

Download the 7.3.0 ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/.

Follow the instructions based on Distributed Cluster.

Once the package is extracted, it should looks something like this:

 

Distributed Cluster:

Create kafka-connect-distributed.properties in the distributed-config directory (Create directory named distributed-config under /etc directory and then create files in that folder), whose contents look like the following

bootstrap.servers=<cloud-bootstrap-servers>
  
group.id=connect-cluster

auto.create.topics.enable= true

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Connect clusters create three topics to manage offsets, configs, and status
# information. Note that these contribute towards the total partition limit quota.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
security.protocol=SASL_SSL

consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
consumer.security.protocol=SASL_SSL

producer.bootstrap.servers=<cloud-bootstrap-servers>
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
producer.security.protocol=SASL_SSL

database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers>

database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.ssl.endpoint.identification.algorithm=https
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";

database.history.producer.security.protocol=SASL_SSL
database.history.producer.ssl.endpoint.identification.algorithm=https
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations).
plugin.path=/home/<user>/confluent-7.3.0/share/java,/home/<user>/confluent-7.3.0/share/confluent-hub-components

Note: Replace relevant values with content enclosed in <>

Once the connect config is ready, now install a Debezium Source Connector Plugin by following steps:

Using confluent hub:

confluent-hub install debezium/debezium-connector-sqlserver:<version-number>

Manual installation:

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Note: keep connector files to Plugin path /home/<user>/confluent-7.3.0/share/confluent-hub-components

Now we are ready to run Distributed connect cluster with following command:

> cd confluent-7.3.0
> ./bin/connect-distributed ./etc/distributed-config/kafka-connect-distributed.properties

Check if the distributed connect cluster is running by using curl localhost:8083 command and get result the in JSON format:

curl -s -XGET "http://localhost:8083/connector-plugins"
 

Result:

[
	{
		"class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
		"type": "source",
		"version": "7.3.0"
	},
	{
		"class": "io.confluent.kafka.connect.datagen.DatagenConnector",
		"type": "source",
		"version": "null"
	},
	{
		"class": "io.debezium.connector.sqlserver.SqlServerConnector",
		"type": "source",
		"version": "1.9.3.Final"
	},
	{
		"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
		"type": "source",
		"version": "7.3.0-ce"
	},
	{
		"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
		"type": "source",
		"version": "7.3.0-ce"
	},
	{
		"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
		"type": "source",
		"version": "7.3.0-ce"
	}
]

To list all running connectors:

curl -s -XGET "http://localhost:8083/connectors" | jq '.'

Result:

[
  "dev-my-source-connector"
]

To create new connector, we need to prepare connector config JSON and use PUT request which will create connector if not exists and if exists it will update the current connector.

Sample PUT Request:

curl -i -X PUT -H 'Content-Type: application/json' \
http://localhost:8083/connectors/<connector name>/config \
-d '{		
		"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
		"kafka.auth.mode": "KAFKA_API_KEY",
		"kafka.api.key": "<api-key>",
		"kafka.api.secret": "<api-secret>",
		"database.hostname": "<Database Server IP>",
		"database.port": "1433",
		"database.user": "<DB User>",
		"database.password": "<DB Password>",
		"database.server.name": "<DB Server Name>",
		"table.include.list": "<dbschema>.<db table name>",
		"schema.whitelist": "<dbschema>",
		"database.history.kafka.bootstrap.servers": "<cloud-bootstrap-servers>",
		"tasks.max": 1,
		"topic.prefix": "<topic prefix>",
		"database.names": "<Database Name>",
		"database.encrypt": false,
		"output.data.format": "JSON",
		"database.history.kafka.topic": "<history topic name>",
		"topic.creation.enable": true,
		"schema.history.internal.kafka.topic": "<schema topic name>",
		"topic.creation.default.replication.factor": 1,
		"trace.records.topic.replication.factor": 1,
		"topic.creation.default.partitions": 2,
		"topic.creation.default.cleanup.policy": "delete",
		"topic.creation.default.compression.type": "producer",
		"snapshot.mode": "initial",
		"schema.history.internal.kafka.bootstrap.servers": "<cloud-bootstrap-servers>",
		"producer.security.protocol": "SASL_SSL",
		"producer.sasl.mechanism": "PLAIN",
		"producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";",
		"consumer.security.protocol": "SASL_SSL",
		"consumer.sasl.mechanism": "PLAIN",
		"consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";",
		"database.history.consumer.security.protocol": "SASL_SSL",
		"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
		"database.history.consumer.sasl.mechanism": "PLAIN",
		"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";",
		"database.history.producer.security.protocol": "SASL_SSL",
		"database.history.producer.ssl.endpoint.identification.algorithm": "https",
		"database.history.producer.sasl.mechanism": "PLAIN",
		"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";",
		"sanitize.field.names": false,
		"time.precision.mode": "connect",
		"decimal.handling.mode": "double"
}'
Share via:

Leave a Comment

Your email address will not be published. Required fields are marked *