Streaming PostgreSQL Changes to Kafka with Debezium
Introduction: Why Send Changes to Kafka
In modern distributed systems, keeping multiple services in sync and maintaining data consistency across microservices can be challenging. When dealing with microservices architecture, it's crucial to have an efficient way to propagate changes in database to other services in real-time. One effective solution is to publish database changes to message broker like Apache Kafka. Kafka acts as an intermediary that allows various services to subscribe to these changes and react accordingly. This approach ensures real-time data synchronization, reduces the complexity of direct service-to-service communication, and enhances the overall scalability and fault tolerance of the system.
Use Cases for Publishing Database Changes to Kafka
- Real-Time Analytics: Feeding database changes to a real-time analytics system to provide up-to-the-minute insights.
- Event-Driven Architecture: Enabling services to react to database changes, triggering workflows or business processes.
- Cache Invalidation: Automatically invalidating or updating cache entries based on database changes to ensure consistency.
- Data Replication: Replicating data across different data stores or geographic regions for redundancy and high availability.
- Audit Logging: Keeping a comprehensive audit log of all changes made to database for compliance and debugging purposes.
What is Debezium?
Debezium is an open-source distributed platform that captures database changes and streams them to Kafka in real-time. It leverages the database's transaction log to detect changes and publish them as events in Kafka topics. Debezium supports various databases, including PostgreSQL, MySQL, and MongoDB, making it a versatile choice for change data capture (CDC) needs.
PostgreSQL Configuration: Logical WAL Replication
In this article, we will be using PostgreSQL as our database with logical WAL replication enabled. You can maintain your database in any database management system. For a convenient deployment option, consider cloud-based solutions like Rapidapp, which offers managed PostgreSQL databases with built-in logical WAL replication, simplifying setup and maintenance.
Create a free database with built-in logical WAL replication in Rapidapp in seconds here
If you choose to maintain your own PostgreSQL database, you can enable logical WAL replication with following PostgreSQL configuration.
...
wal_level = logical
...
You can see more details about WAL Level in PostgreSQL Documentation.
Deploying Debezium Connect with PostgreSQL Connection
There are several ways to deploy Debezium Connect, but we will use Docker for spin up a container to run Debezium Connect as follows.
docker run --rm --name debezium \
-e BOOTSTRAP_SERVERS=<bootstrap_servers> \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=connect_configs \
-e OFFSET_STORAGE_TOPIC=connect_offsets \
-e STATUS_STORAGE_TOPIC=connect_statuses \
-e ENABLE_DEBEZIUM_SCRIPTING='true' \
-e CONNECT_SASL_MECHANISM=SCRAM-SHA-256 \
-e CONNECT_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";' \
-p 8083:8083 debezium/connect:2.7
BOOTSTRAP_SERVERS
: You can set bootstrap server for this env variable. You can find this on Upstash dashboard if you are using their managed Kafka.
CONNECT_SASL_JAAS_CONFIG
: This part contains security module and username/password pair. You don't need to set this if
you are not using Kafka with authentication. However, if you are using Kafka from Upstash, then you can find username and
password values on Kafka cluster details page.
CONFIG_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector properties.
OFFSET_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector offsets.
STATUS_STORAGE_TOPIC
: This environment variable is used to specify the Kafka topic where Debezium will store the connector statuses.
Debezium connect is ready, but it is empty which means, no source
will be tracked which is PostgreSQL, and no data will be sent to sink
which is Kafka in our case.
We will also leverage two SaaS solutions:
- Rapidapp for PostgreSQL: To quickly set up and manage our PostgreSQL database.
Create a free database in Rapidapp Starter in seconds here
- Upstash Redis: A managed Redis service optimized for low-latency data caching.
Create a free Redis database in Upstash here
Adding Debezium Connector
You can add new connector to Debezium Connect by using its REST API as follows.
curl --location 'http://localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<pg_host>",
"database.port": "<pg_port>",
"database.user": "<pg_user>",
"database.password": "<pg_pass>",
"database.dbname": "<pg_db>",
"database.server.id": "<unique_id>",
"table.include.list": "<schema.table_name>",
"topic.prefix": "<pg_topic>",
"plugin.name": "pgoutput",
"kafka.bootstrap.servers": "<kafka_host>:<kafka_port>",
"kafka.topic.prefix": "<kafka_topic_prefix>"
}
}'
Line 7: This is needed to tell Debezium how to connect source.
Line 8-12: PostgreSQL connection properties, if you have used Rapidapp, you can grab details on
Connection Properties
tab in database details page
Line 13: This is the unique database server id which will be used by Debezium to differentiate th sources.
Line 14: This is the list of tables that will be monitored by Debezium.
Line 16: This field is used to tell Debezium which plugin should be used for this connector to serialize/deserialize data from PostgreSQL bin log.
Once the connector is created, you can verify it by listing available connectors with the following;
curl -XGET http://localhost:8083/connectors
Step-by-Step Spring Boot Application Setup
In this section, we will implement a simple Spring Boot CRUD application where whenever you do a modification in PostgreSQL
database, it will be synchronized to Kafka automatically. This will be useful especially some other service is interested in
those changes. In our case, we will be maintaining Product
information in PostgreSQL database. Let's get started!
Project Initialization and Dependencies
We will be using Spring Boot and PostgreSQL to build the application. You can initialize a spring boot project by using Spring Boot CLI. Once installed, you can use following command to initialize a project with required dependencies.
spring init \
--dependencies=web,data-jpa,postgresql,lombok \
--type=maven-project \
--javaVersion=21 \
spring-pg-debezium
Line 2: web
for implementing REST endpoints, data-jpa
for database persistence, and postgresql
for PostgreSQL driver.
Line 3: --type=maven-project
for creating a Maven project.
Line 4: --javaVersion=21
we will use Java 21 in Google Cloud Run environment.
Implementing Entity and Repository
We have only one entity here, Product
, which will be used to store product information. Let's create a
new entity called Product
as follows.
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
class Product {
@Id
@GeneratedValue
private Long id;
private String title;
@Column(name = "price", precision = 10, scale = 2)
private BigDecimal price;
}
Line 2: Automatically enable getter/setter methods by using Lombok
Line 3: Generate no-arg constructor
Line 4: Generate constructor with all instance variables
Line 13: Define price
column that accepts value with 10 digits max and 2 decimal places e.g. 023.99
In order to manage Product entity in database, we will use following repository interface.
interface ProductRepository extends CrudRepository<Product, Integer>{}
Implementing Rest Endpoints
We have one root endpoint /api/v1/products
inside one controller and implement 3 actions for
create, update, and delete as follows
@RestController
@RequestMapping("/api/v1/products")
@RequiredArgsConstructor
class ProductController {
private final ProductRepository productRepository;
@PostMapping
void create(@RequestBody CreateProductRequest request) {
Product product = new Product();
product.setTitle(request.getTitle());
product.setPrice(request.getPrice());
productRepository.save(product);
}
@PatchMapping("/{id}")
void update(@RequestBody UpdateProductRequest request, @PathVariable("id") Long id) {
Product p = productRepository.findById(id).orElseThrow(() -> new EntityNotFoundException("Product not found"));
p.setPrice(request.getPrice());
productRepository.save(p);
}
@DeleteMapping("/{id}")
void delete(@PathVariable("id") Long id) {
productRepository.deleteById(id);
}
}
create
method accepts a request CreateProductRequest
which contains title
, and price
information as shown below.
@Data
@NoArgsConstructor
@AllArgsConstructor
class CreateProductRequest {
private String title;
private BigDecimal price;
}
update
is used to update product price, and it accepts a request as follows.
@Data
@NoArgsConstructor
@AllArgsConstructor
class UpdateProductRequest {
private BigDecimal price;
}
Now we have persistence layer and rest endpoints ready and we are ready to configure application.
Application Configuration
This section contains application level configurations such as the application name, datasource, and jpa as shown below:
spring:
application:
name: spring-pg-debezium
datasource:
url: <connection-string-from-rapidapp|or your own managed postgres url>
username: <username>
password: <password>
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
hibernate:
ddl-auto: update
Line 5: Connection URL for the PostgreSQL database. You can obtain this from Rapidapp or your own managed PostgreSQL service. It should have a format like jdbc:postgresql://<host>:<port>/<database>?sslmode=require
.
Running Application
You can run application as follows
./mvnw spring-boot:run
Demo
Once you perform any of the following request, you will see it will be published to Kafka cluster where you can consume and see the message.
Create Product
curl -XPOST -H "Content-Type: application/json" http://localhost:8080/api/v1/products -d '{"title": "Blue Iphone", "price": "37.3213"}'
``
### Update Product
```bash
curl -XPATCH -H "Content-Type: application/json" http://localhost:8080/api/v1/products/1 -d '{"price": "37.1213"}'
Delete Product
curl -XDELETE http://localhost:8080/api/v1/products/1
Conclusion
Integrating Debezium with PostgreSQL and Kafka in a Spring Boot environment allows you to efficiently stream database changes to various services. This setup not only enhances data consistency and real-time processing capabilities but also simplifies the architecture of your microservices. By following this guide, you can leverage the power of change data capture to build responsive and scalable applications.
You can find the complete source code for this project on GitHub.