Skip to main content

One post tagged with "Kafka"

View All Tags

Streaming PostgreSQL Changes to Kafka with Debezium

· 8 min read
Huseyin BABAL
Software Developer

Introduction: Why Send Changes to Kafka

In modern distributed systems, keeping multiple services in sync and maintaining data consistency across microservices can be challenging. When dealing with microservices architecture, it's crucial to have an efficient way to propagate changes in database to other services in real-time. One effective solution is to publish database changes to message broker like Apache Kafka. Kafka acts as an intermediary that allows various services to subscribe to these changes and react accordingly. This approach ensures real-time data synchronization, reduces the complexity of direct service-to-service communication, and enhances the overall scalability and fault tolerance of the system.

Use Cases for Publishing Database Changes to Kafka

  • Real-Time Analytics: Feeding database changes to a real-time analytics system to provide up-to-the-minute insights.
  • Event-Driven Architecture: Enabling services to react to database changes, triggering workflows or business processes.
  • Cache Invalidation: Automatically invalidating or updating cache entries based on database changes to ensure consistency.
  • Data Replication: Replicating data across different data stores or geographic regions for redundancy and high availability.
  • Audit Logging: Keeping a comprehensive audit log of all changes made to database for compliance and debugging purposes.

What is Debezium?

Debezium is an open-source distributed platform that captures database changes and streams them to Kafka in real-time. It leverages the database's transaction log to detect changes and publish them as events in Kafka topics. Debezium supports various databases, including PostgreSQL, MySQL, and MongoDB, making it a versatile choice for change data capture (CDC) needs.

PostgreSQL Configuration: Logical WAL Replication

In this article, we will be using PostgreSQL as our database with logical WAL replication enabled. You can maintain your database in any database management system. For a convenient deployment option, consider cloud-based solutions like Rapidapp, which offers managed PostgreSQL databases with built-in logical WAL replication, simplifying setup and maintenance.

tip

Create a free database with built-in logical WAL replication in Rapidapp in seconds here

If you choose to maintain your own PostgreSQL database, you can enable logical WAL replication with following PostgreSQL configuration.

postgresql.conf
...
wal_level = logical
...

You can see more details about WAL Level in PostgreSQL Documentation.

Deploying Debezium Connect with PostgreSQL Connection

There are several ways to deploy Debezium Connect, but we will use Docker for spin up a container to run Debezium Connect as follows.

docker run --rm --name debezium \
-e BOOTSTRAP_SERVERS=<bootstrap_servers> \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=connect_configs \
-e OFFSET_STORAGE_TOPIC=connect_offsets \
-e STATUS_STORAGE_TOPIC=connect_statuses \
-e ENABLE_DEBEZIUM_SCRIPTING='true' \
-e CONNECT_SASL_MECHANISM=SCRAM-SHA-256 \
-e CONNECT_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_SASL_JAAS_CONFIG='org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";' \
-p 8083:8083 debezium/connect:2.7

BOOTSTRAP_SERVERS: You can set bootstrap server for this env variable. You can find this on Upstash dashboard if you are using their managed Kafka.

CONNECT_SASL_JAAS_CONFIG: This part contains security module and username/password pair. You don't need to set this if you are not using Kafka with authentication. However, if you are using Kafka from Upstash, then you can find username and password values on Kafka cluster details page.

CONFIG_STORAGE_TOPIC: This environment variable is used to specify the Kafka topic where Debezium will store the connector properties.

OFFSET_STORAGE_TOPIC: This environment variable is used to specify the Kafka topic where Debezium will store the connector offsets.

STATUS_STORAGE_TOPIC: This environment variable is used to specify the Kafka topic where Debezium will store the connector statuses.

Debezium connect is ready, but it is empty which means, no source will be tracked which is PostgreSQL, and no data will be sent to sink which is Kafka in our case.

We will also leverage two SaaS solutions:

  • Rapidapp for PostgreSQL: To quickly set up and manage our PostgreSQL database.
tip

Create a free database in Rapidapp Starter in seconds here

  • Upstash Redis: A managed Redis service optimized for low-latency data caching.
tip

Create a free Redis database in Upstash here

Adding Debezium Connector

You can add new connector to Debezium Connect by using its REST API as follows.

curl --location 'http://localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<pg_host>",
"database.port": "<pg_port>",
"database.user": "<pg_user>",
"database.password": "<pg_pass>",
"database.dbname": "<pg_db>",
"database.server.id": "<unique_id>",
"table.include.list": "<schema.table_name>",
"topic.prefix": "<pg_topic>",
"plugin.name": "pgoutput",
"kafka.bootstrap.servers": "<kafka_host>:<kafka_port>",
"kafka.topic.prefix": "<kafka_topic_prefix>"
}
}'

Line 7: This is needed to tell Debezium how to connect source.

Line 8-12: PostgreSQL connection properties, if you have used Rapidapp, you can grab details on Connection Properties tab in database details page

Line 13: This is the unique database server id which will be used by Debezium to differentiate th sources.

Line 14: This is the list of tables that will be monitored by Debezium.

Line 16: This field is used to tell Debezium which plugin should be used for this connector to serialize/deserialize data from PostgreSQL bin log.

Once the connector is created, you can verify it by listing available connectors with the following;

curl -XGET http://localhost:8083/connectors

Step-by-Step Spring Boot Application Setup

In this section, we will implement a simple Spring Boot CRUD application where whenever you do a modification in PostgreSQL database, it will be synchronized to Kafka automatically. This will be useful especially some other service is interested in those changes. In our case, we will be maintaining Product information in PostgreSQL database. Let's get started!

Project Initialization and Dependencies

We will be using Spring Boot and PostgreSQL to build the application. You can initialize a spring boot project by using Spring Boot CLI. Once installed, you can use following command to initialize a project with required dependencies.

spring init \
--dependencies=web,data-jpa,postgresql,lombok \
--type=maven-project \
--javaVersion=21 \
spring-pg-debezium

Line 2: web for implementing REST endpoints, data-jpa for database persistence, and postgresql for PostgreSQL driver.

Line 3: --type=maven-project for creating a Maven project.

Line 4: --javaVersion=21 we will use Java 21 in Google Cloud Run environment.

Implementing Entity and Repository

We have only one entity here, Product, which will be used to store product information. Let's create a new entity called Product as follows.

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
class Product {

@Id
@GeneratedValue
private Long id;

private String title;

@Column(name = "price", precision = 10, scale = 2)
private BigDecimal price;
}

Line 2: Automatically enable getter/setter methods by using Lombok

Line 3: Generate no-arg constructor

Line 4: Generate constructor with all instance variables

Line 13: Define price column that accepts value with 10 digits max and 2 decimal places e.g. 023.99

In order to manage Product entity in database, we will use following repository interface.

interface ProductRepository extends CrudRepository<Product, Integer>{}

Implementing Rest Endpoints

We have one root endpoint /api/v1/products inside one controller and implement 3 actions for create, update, and delete as follows

@RestController
@RequestMapping("/api/v1/products")
@RequiredArgsConstructor
class ProductController {

private final ProductRepository productRepository;

@PostMapping
void create(@RequestBody CreateProductRequest request) {
Product product = new Product();
product.setTitle(request.getTitle());
product.setPrice(request.getPrice());
productRepository.save(product);
}

@PatchMapping("/{id}")
void update(@RequestBody UpdateProductRequest request, @PathVariable("id") Long id) {
Product p = productRepository.findById(id).orElseThrow(() -> new EntityNotFoundException("Product not found"));
p.setPrice(request.getPrice());
productRepository.save(p);
}

@DeleteMapping("/{id}")
void delete(@PathVariable("id") Long id) {
productRepository.deleteById(id);
}
}

create method accepts a request CreateProductRequest which contains title, and price information as shown below.

@Data
@NoArgsConstructor
@AllArgsConstructor
class CreateProductRequest {

private String title;

private BigDecimal price;

}

update is used to update product price, and it accepts a request as follows.

@Data
@NoArgsConstructor
@AllArgsConstructor
class UpdateProductRequest {

private BigDecimal price;

}

Now we have persistence layer and rest endpoints ready and we are ready to configure application.

Application Configuration

This section contains application level configurations such as the application name, datasource, and jpa as shown below:

application.yaml
spring:
application:
name: spring-pg-debezium
datasource:
url: <connection-string-from-rapidapp|or your own managed postgres url>
username: <username>
password: <password>
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
hibernate:
ddl-auto: update

Line 5: Connection URL for the PostgreSQL database. You can obtain this from Rapidapp or your own managed PostgreSQL service. It should have a format like jdbc:postgresql://<host>:<port>/<database>?sslmode=require.

Running Application

You can run application as follows

./mvnw spring-boot:run

Demo

Once you perform any of the following request, you will see it will be published to Kafka cluster where you can consume and see the message.

Create Product

curl -XPOST -H "Content-Type: application/json" http://localhost:8080/api/v1/products -d '{"title": "Blue Iphone", "price": "37.3213"}'
``

### Update Product
```bash
curl -XPATCH -H "Content-Type: application/json" http://localhost:8080/api/v1/products/1 -d '{"price": "37.1213"}'

Delete Product

curl -XDELETE  http://localhost:8080/api/v1/products/1

Conclusion

Integrating Debezium with PostgreSQL and Kafka in a Spring Boot environment allows you to efficiently stream database changes to various services. This setup not only enhances data consistency and real-time processing capabilities but also simplifies the architecture of your microservices. By following this guide, you can leverage the power of change data capture to build responsive and scalable applications.

tip

You can find the complete source code for this project on GitHub.