Overview
Configuring a distributed Kafka Connect cluster is an important step when working with Apache Kafka for data integration. This blog will guide you through the process of setting up a Kafka Connect cluster in a distributed mode.
Follow the below steps to setup a local distributed connect cluster backed by a source Kafka cluster in Confluent Cloud
Prerequisites
- Access to Confluent Cloud (API Key and API Secret)
- Confluent CLI
Set up a local Connect Worker with Confluent Platform Install
Download the 7.3.0 ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/.
Follow the instructions based on Distributed Cluster.
Once the package is extracted, it should looks something like this:
Distributed Cluster:
Create kafka-connect-distributed.properties
in the distributed-config directory (Create directory named distributed-config under /etc directory and then create files in that folder), whose contents look like the following
bootstrap.servers=<cloud-bootstrap-servers> group.id=connect-cluster auto.create.topics.enable= true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.bootstrap.servers=<cloud-bootstrap-servers> producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers> database.history.consumer.security.protocol=SASL_SSL database.history.consumer.ssl.endpoint.identification.algorithm=https database.history.consumer.sasl.mechanism=PLAIN database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; database.history.producer.security.protocol=SASL_SSL database.history.producer.ssl.endpoint.identification.algorithm=https database.history.producer.sasl.mechanism=PLAIN database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). plugin.path=/home/<user>/confluent-7.3.0/share/java,/home/<user>/confluent-7.3.0/share/confluent-hub-components
Note: Replace relevant values with content enclosed in <>
Once the connect config is ready, now install a Debezium Source Connector Plugin by following steps:
Using confluent hub:
confluent-hub install debezium/debezium-connector-sqlserver:<version-number>
Manual installation:
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Note: keep connector files to Plugin path /home/<user>/confluent-7.3.0/share/confluent-hub-components
Now we are ready to run Distributed connect cluster with following command:
> cd confluent-7.3.0
> ./bin/connect-distributed ./etc/distributed-config/kafka-connect-distributed.properties
Check if the distributed connect cluster is running by using curl localhost:8083
command and get result the in JSON format:
curl -s -XGET "http://localhost:8083/connector-plugins"
Result:
[ { "class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "type": "source", "version": "7.3.0" }, { "class": "io.confluent.kafka.connect.datagen.DatagenConnector", "type": "source", "version": "null" }, { "class": "io.debezium.connector.sqlserver.SqlServerConnector", "type": "source", "version": "1.9.3.Final" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "7.3.0-ce" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "7.3.0-ce" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "7.3.0-ce" } ]
To list all running connectors:
curl -s -XGET "http://localhost:8083/connectors" | jq '.'
Result:
[ "dev-my-source-connector" ]
To create new connector, we need to prepare connector config JSON and use PUT request which will create connector if not exists and if exists it will update the current connector.
Sample PUT Request:
curl -i -X PUT -H 'Content-Type: application/json' \ http://localhost:8083/connectors/<connector name>/config \ -d '{ "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "<api-key>", "kafka.api.secret": "<api-secret>", "database.hostname": "<Database Server IP>", "database.port": "1433", "database.user": "<DB User>", "database.password": "<DB Password>", "database.server.name": "<DB Server Name>", "table.include.list": "<dbschema>.<db table name>", "schema.whitelist": "<dbschema>", "database.history.kafka.bootstrap.servers": "<cloud-bootstrap-servers>", "tasks.max": 1, "topic.prefix": "<topic prefix>", "database.names": "<Database Name>", "database.encrypt": false, "output.data.format": "JSON", "database.history.kafka.topic": "<history topic name>", "topic.creation.enable": true, "schema.history.internal.kafka.topic": "<schema topic name>", "topic.creation.default.replication.factor": 1, "trace.records.topic.replication.factor": 1, "topic.creation.default.partitions": 2, "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.compression.type": "producer", "snapshot.mode": "initial", "schema.history.internal.kafka.bootstrap.servers": "<cloud-bootstrap-servers>", "producer.security.protocol": "SASL_SSL", "producer.sasl.mechanism": "PLAIN", "producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";", "consumer.security.protocol": "SASL_SSL", "consumer.sasl.mechanism": "PLAIN", "consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.ssl.endpoint.identification.algorithm": "https", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.ssl.endpoint.identification.algorithm": "https", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";", "sanitize.field.names": false, "time.precision.mode": "connect", "decimal.handling.mode": "double" }'