Streaming PostgreSQL Changes to Kafka with Debezium
Introduction: Why Send Changes to Kafka
In modern distributed systems, keeping multiple services in sync and maintaining data consistency across microservices can be challenging. When dealing with microservices architecture, it's crucial to have an efficient way to propagate changes in database to other services in real-time. One effective solution is to publish database changes to message broker like Apache Kafka. Kafka acts as an intermediary that allows various services to subscribe to these changes and react accordingly. This approach ensures real-time data synchronization, reduces the complexity of direct service-to-service communication, and enhances the overall scalability and fault tolerance of the system.
Use Cases for Publishing Database Changes to Kafka
- Real-Time Analytics: Feeding database changes to a real-time analytics system to provide up-to-the-minute insights.
- Event-Driven Architecture: Enabling services to react to database changes, triggering workflows or business processes.
- Cache Invalidation: Automatically invalidating or updating cache entries based on database changes to ensure consistency.
- Data Replication: Replicating data across different data stores or geographic regions for redundancy and high availability.
- Audit Logging: Keeping a comprehensive audit log of all changes made to database for compliance and debugging purposes.
What is Debezium?
Debezium is an open-source distributed platform that captures database changes and streams them to Kafka in real-time. It leverages the database's transaction log to detect changes and publish them as events in Kafka topics. Debezium supports various databases, including PostgreSQL, MySQL, and MongoDB, making it a versatile choice for change data capture (CDC) needs.
PostgreSQL Configuration: Logical WAL Replication
In this article, we will be using PostgreSQL as our database with logical WAL replication enabled. You can maintain your database in any database management system. For a convenient deployment option, consider cloud-based solutions like Rapidapp, which offers managed PostgreSQL databases with built-in logical WAL replication, simplifying setup and maintenance.
Create a free database with built-in logical WAL replication in Rapidapp in seconds here
If you choose to maintain your own PostgreSQL database, you can enable logical WAL replication with following PostgreSQL configuration.
...
wal_level = logical
...
You can see more details about WAL Level in PostgreSQL Documentation.
Deploying Debezium Connect with PostgreSQL Connection
There are several ways to deploy Debezium Connect, but we will use Docker for spin up a container to run Debezium Connect as follows.
docker run --rm --name debezium \
-e BOOTSTRAP_SERVERS=<bootstrap_servers> \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=connect_configs \
-e OFFSET_STORAGE_TOPIC=connect_offsets \
-e STATUS_STORAGE_TOPIC=connect_statuses \
-e ENABLE_DEBEZIUM_SCRIPTING='true' \
-e CONNECT_SASL_MECHANISM=SCRAM-SHA-256 \
-e CONNECT_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";' \
-p 8083:8083 debezium/connect:2.7
BOOTSTRAP_SERVERS
: You can set bootstrap server for this env variable. You can find this on Upstash dashboard if you are using their managed Kafka.
CONNECT_SASL_JAAS_CONFIG
: This part contains security module and username/password pair. You don't need to set this if
you are not using Kafka with authentication. However, if you are using Kafka from Upstash, then you can find username and
password values on Kafka cluster details page.
CONFIG_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector properties.
OFFSET_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector offsets.
STATUS_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector statuses.
Debezium connect is ready, but it is empty which means, no source
will be tracked which is PostgreSQL, and no data will be sent to sink
which is Kafka in our case.
We will also leverage two SaaS solutions:
- Rapidapp for PostgreSQL: To quickly set up and manage our PostgreSQL database.
Create a free database in Rapidapp Starter in seconds here
- Upstash Redis: A managed Redis service optimized for low-latency data caching.
Create a free Redis database in Upstash here
Adding Debezium Connector
You can add new connector to Debezium Connect by using its REST API as follows.
curl --location 'http://localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<pg_host>",
"database.port": "<pg_port>",
"database.user": "<pg_user>",
"database.password": "<pg_pass>",
"database.dbname": "<pg_db>",
"database.server.id": "<unique_id>",
"table.include.list": "<schema.table_name>",
"topic.prefix": "<pg_topic>",
"plugin.name": "pgoutput",
"kafka.bootstrap.servers": "<kafka_host>:<kafka_port>",
"kafka.topic.prefix": "<kafka_topic_prefix>"
}
}'
Line 7: This is needed to tell Debezium how to connect source.
Line 8-12: PostgreSQL connection properties, if you have used Rapidapp, you can grab details on
Connection Properties
tab in database details page
Line 13: This is the unique database server id which will be used by Debezium to differentiate th sources.
Line 14: This is the list of tables that will be monitored by Debezium.
Line 16: This field is used to tell Debezium which plugin should be used for this connector to serialize/deserialize data from PostgreSQL bin log.
Once the connector is created, you can verify it by listing available connectors with the following;
curl -XGET http://localhost:8083/connectors
Step-by-Step Spring Boot Application Setup
In this section, we will implement a simple Spring Boot CRUD application where whenever you do a modification in PostgreSQL
database, it will be synchronized to Kafka automatically. This will be useful especially some other service is interested in
those changes. In our case, we will be maintaining Product
information in PostgreSQL database. Let's get started!