Kafka Connectors
Kafka is arguably the most common interface today for building event driven systems at scale.
I can probably write many many awesome things about Kafka but that’s a story for another time. What I want to talk about today is one of the greatest superpowers I think Kafka possesses: Kafka Connect (All hail the the awesome folks from Confluent who are responsible for developing this!)
Please note that Kafka Connectors I’ll be talking about below are not magic, you can code all the functionality yourselves but who doesn’t like to reduce the code overhead in their product by utilizing existing packages with support for some pretty awesome things, all out of the box.
In this article, we’ll be talking particulary about PostgreSQL Source Connector (developed & supported by RedHat team) & ElasticSearch Sink Connector (developed & supported by Confluent team). Except for these, there are a whole lot of other connectors available here.
Problems that can be solved using Connectors (relatively easily & faster):
- ETL Pipelines
- Building event driven systems
- Building capacity for event sourcing
- Maintaining Audit Trails
- Maintaining a search engine like ElasticSearch without writing any code (PostgreSQL ↔ Kafka ↔ ElasticSearch setup)
- And so much more!
Here’s how our setup will look like:
Pre-requisites:
It is assumed that you have already have the following set up & running:
- Kafka
- Kafka Connect (Reference Kafka Connect config file shared below)
- PostgreSQL
- ElasticSearch (Only required if you want to set up ElasticSearch sink connector)
I do not currently have the setup guides for the above tools because there is a lot of content already available out there, however I found it most difficult to find a working example of PostgreSQL ↔ Kafka ↔ ElasticSearch setup.
What we hope to achieve
By end of this blog, you’ll hopefully have:
- Data flowing from your PostgreSQL instance to Kafka cluster (using Debezium Connector by RedHat)
- Data flowing from your Kafka cluster to ElasticSearch cluster (using ES connector by Confluent)
Note: I am using Kafka Connect in standalone mode which is best suited for:
- testing purposes
- dev environment setups
- one-off jobs
In production however, it is advisable to use Kafka Connect in distributed mode.
Following changes have to be made in PostgreSQL config to enable Debezium to be able to read data from WALs
# postgresql.conf
# tells the server that it should use logical decoding with the write-ahead log
wal_level = logical
# tells the server that it should use a maximum of 1 separate processes for processing WAL changes
max_wal_senders = 1
# tells the server that it should allow a maximum of 1 replication slots to be created for streaming WAL changes
max_replication_slots = 1
Please note that from PostgreSQL 10+, pgoutput, the standard logical decoding plug-in is always present, meaning that no additional libraries must be installed, and the Debezium connector will interpret the raw replication event stream into change events directly. However, for PostgreSQL versions prior to PostgreSQL 10+, a logical decoding output plugin will have to be installed and configured in the PostgreSQL server, detailed guide for which can be found here.
Once the above configuration is done for PostgreSQL server, we can move on to creating Kafka Connectors.
Postgres Source Connector Properties File
# /etc/kafka/postgres-connector.properties
name=postgres-connector-dev
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=2
plugin.name=pgoutput
database.hostname=rdsurl.com
database.port=5200
database.user=db_user
database.password=db_pass
database.dbname=db_name
database.server.name=db_server_name
database.tcpKeepAlive=true
poll.interval.ms=100
heartbeat.interval.ms=1000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
errors.log.enable=true
errors.logs.include.messages=true
table.whitelist=public.table_name,public.table_name2
snapshot.mode=initial
snapshot.locking.mode=none
tombstones.on.delete=false
ElasticSearch Sink Connector Properties File
# /etc/kafka/es.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url:http://x.x.x.x:9000
tasks.max=1
connection.username=elastic_username
connection.password=elastic_pass
topics=db_name.public.table_name,db_name.public.table_name2
type.name=kafka-connect
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
transforms=unwrap,key
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.key.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.key.field=id
key.ignore=false
Kafka Connect Standalone Properties File
# /etc/kafka/connect-standalone.properties
group.id=connect-cluster-org_name_abc
bootstrap.servers=x.x.x.x:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# I had the source code for both Debezium package and ES connect package in
# /etc/kafka/connectors directory so I added that path below for Connect to be
# able to find those packages
plugin.path=/usr/share/java,/etc/kafka/connectors
Kafka Connect Service systemd File
# kafka-connect.service
[Unit]
Requires=kafka.service
After=kafka.service
[Service]
Type=simple
ExecStart=/bin/sh -c '/opt/kafka/bin/connect-standalone.sh /etc/kafka/connect-standalone.properties /etc/kafka/postgres-connector.properties /etc/kafka/es.properties > /tmp/kafka-connect.log 2>&1'
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
If you want to learn more about Kafka Connect, here’s a very good introductory article.
Please note that there are a lot of nuances behind every config item in the PSQL & ES config files above, explaining those all are beyond the scope of one article but the config above is supposed to help you just dive deep, get your hands dirty & see the magic of your data flowing from PostgreSQL to Kafka to ElasticSearch.
For your production setup, I’d highly recommend you to read about the configuration options available for both the connectors & tweak settings as per your use case.