Level Up Your Kafka Skills in Just 5 Days | Join Season of Streaming
Microservices architectures have now been widely adopted among developers, and with a great degree of success. However, drawbacks do exist. Data silos can arise where information processed by one microservice is not visible to the other microservice.
The blog post no-more-silos is an excellent introduction to how silos in an application can be bridged with Kafka Connect and CDC. This blog post will review the advantages and disadvantages inherent to moving data from a database using JDBC and CDC, and then move on to explore the real use case of how a legacy bank used Kafka Connect to bridge the silos and keep multiple applications/databases in sync.
Kafka Connect users generally possess a good understanding of how connectors can be used to source/sink data in and out of Kafka using a no-code/low-code paradigm. Even so, customers can be unaware of the power and flexibility of Kafka Connect to solve some complex problems.
The JDBC source connector is probably the easiest ways to get data from a database into Kafka. Using this connector in conjunction with Kafka Connect allows you to obtain batches of data using a periodic SQL query, apply any necessary transformations, and then write it into a Kafka topic.
Here is a different approach. We know the DBA won’t appreciate Kafka Connect constantly pulling data out of the database using SQL, and a lot of times that database has already been overwhelmed with other more important tasks and there is no room for accommodating Kafka Connect constantly running SQL queries on the database.
Introducing CDC (Change Data Capture), where Kafka Connect is configured to read from the Databases WAL (Write Ahead Log) file. Broadly put, relational databases use a transaction log (also called a binlog or redo log, depending on DB flavor) to which every event in the database is written. Update a row, insert a row, delete a row – it all goes to the database’s transaction log. CDC tools generally work by utilizing this transaction log to extract at very low latency and low impact the events that are occurring on the database (or a schema/table within it).
Now we’ll dive into a classic use case, a legacy bank attempting to keep up with customers’ needs by continually adding capability to their products.
Online banking transactions have been made available. Customer data from online transactions gets stored on the PostgreSQL database, but over-the-phone transactions are recorded on a different database.
A more traditional approach might have the bank run a nightly job to keep the two databases in sync. This approach has various limitations, primarily:
This problem can be avoided using Kafka Connect. Here is a visualization of the high-level architecture:
In the above example, data is pulled from the two databases, SQL Server and PostgreSQL, using the Debezium CDC connector. Note that the tables have a similar (but not the same) data model and, of course, different data types specific to each database.
The two sink connectors now pull data from the respective topic and sink to the other database.
Look at the sequence of events closely, and you’ll see that it will get stuck in an infinite loop. Here is why:
You can break this loop using SMT, as described in the next section.
Source code for source and sink connectors can be found here.
This is a familiar problem for anyone who has worked with the Kafka Replicator. Replicator supports active setup in a multi-dc setup, where producers can write to both Kafka clusters (DC-1 and DC-2), and data from one cluster will be copied over to the other as a backup site. This problem is solved with some simple filtering. To enable this Replicator feature, configure provenance.header.enable=true. The replicator then puts the provenance information in the message header after replication. The Replicator will not replicate a message to a destination cluster if the cluster-ID of the destination cluster matches the origin cluster-ID in the provenance header, and the destination topic name matches the origin topic name in the provenance header.
Now that you understand how the Replicator handles active-active setup, you may want to do filtering similar to the Replicator. You may choose to create a column in the database table to show where that record was created and use it to filter records where not required.
For example, you could add an SRC column with a default value of “PGSQL”, to indicate where each record was first created. (This column also becomes important for traceability once we start copying this record to different databases.)
Apply filter operations so the data produced in the current database (PGSql) is copied to the destination (SqlServer), but once copied to the destination it’s not sent back to the initiating DB (PGSql), hence breaking the infinite loop. Here is the sequence of events:
Here is the snippet of PostgreSQL CDC Source connector, which filters out anything produced by the other database. (SRC == “SQLSRV”),
"name": "postgres-cdc-azure-src", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "oncloud.postgres.database.azure.com", "transforms": "unwrap, filterPostGres, ExtractField, TimestampConverter", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.filterPostGres.type": "io.confluent.connect.transforms.Filter$Value", "transforms.filterPostGres.filter.condition": "$[?(@.SRC == \"SQLSRV\")]", "transforms.filterPostGres.filter.type": "exclude", "transforms.filterPostGres.missing.or.null.behavior": "include",
We implemented this filter operation using SMT (single message transform); here is the list of all available SMT with a detailed explanation. You also need to account for the case when the WebApp writes to SQLServer. The source code for source and sink connectors can be found here.
Now that you know keeping two databases in sync is possible using Kafka Connect, you can implement a three-way database sync.
The bank now wants to offer customers a new capability: smartwatch/credit card payments. Assuming this data is stored in a MySql database, you now have to keep three databases in sync.
To implement this, you will need two more connectors and a little more filtering:
The same concept applies whether you need to keep two, three, or more databases in sync. Here is a diagram of events with three databases:
You need just one topic per database and table, so to capture changes on customers table on SQL Server you will also only need one topic sqlserver.customer, which can be sinked to anywhere using the available connectors.
You have now reviewed a real-life business problem, where Kafka Connect helped a bank keep multiple applications/databases in sync for a better customer experience.
Superior application architecture provides the flexibility to share data created by your application with other applications while supporting data evolution. Kafka Connect, with the use of AVRO, solves these problems and offers various benefits, such as:
Trying to keep multiple databases in sync can be tricky. This table includes a concise list of some of the problems you may encounter, and the solution to those problems.
Problem | Solution |
Databases use specific data types. For example: date, datetime, smallint, money |
Use SMT to cast such database-specific datatypes to something supported by AVRO. |
The destination database data model is different. | KConnect supports transformation; also we encourage inserting data in insert/upsert mode. |
The parent table should always insert a record before the child table in case of referential integrity between multiple tables. | This is a hard problem to solve and has many approaches. The ideal approach would be to introduce some sort of delay for child tables, but this solution is error-prone. A good solution is using ksqlDB to write to a topic only after the parent record is already present. |
Sample code is available in the git repo. This environment is complex to set up, as you need to set up two or maybe even three databases in CDC mode. You can take a more straightforward approach for a PoC where you can use any database (MySql, in this example) and create three tables in different schemas (Oracle, Postgres, SQLserver) to mimic the three databases. You can also replace the CDC source connector with A JDBC source connector if you want to avoid CDC completely.
Here is an example:
Source Code for this setup can be found here.
Here are some more recommended readings that you may find useful. Check out why stream processing is better than batch processing(ETL), and how CDC is a better way of fetching data from a database.
Dive into the inner workings of brokers as they serve data up to a consumer.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.