[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

Combining CDC Transactional Messages Using Kafka Streams

A team from Thoughtworks was engaged to replicate selected data from a 25-year-old IBM mainframe system into scalable microservices that can support big surges in demand without placing any load on the mainframe. The client selected a commercial CDC (change data capture) product that can copy IBM Db2 database changes to Apache Kafka®.

We designed an event-driven pipeline where the first stage is a Kafka Streams application that combines individual database change messages into transaction event messages. It consumes CDC messages from 25 database tables in a single topic and produces event messages into two domain topics, keyed by a domain ID. All messages are serialized using Avro.

The Kafka Streams application has two responsibilities:

  • Combine messages belonging to the same database transaction. Any service that consumes the messages can process them as atomic changes.

  • Separate changes by business domain and produce events to domain-specific topics. A customer service, for example, does not need to consume messages from the licensing topic.

This article describes the Kafka Streams approach we used, illustrated using a small example with four database tables in one domain.

CDC messages

The CDC product produces one message per database row change. Each message value includes a header section with transactional information, including these fields:

  • operation

  • transactionId

  • transactionEventCounter

  • transactionLastEvent

The values of transactionEventCounter range from 1 to the number of change messages in that transaction. The message with the highest value of transactionEventCounter has transactionLastEvent value true.

An example truncated message, rendered as JSON, is:

{
  "header": {
    "table": "Addresses",
    "operation": "INSERT",
    "transactionId": "00000000014EB5EE9907000000000000",
    "transactionEventCounter": 1,
    "transactionLastEvent": false
  },
  "data": {
    "address_id": 581439110,
    "creation_date": "1996-03-01",
    "effective_date": "1996-03-01",
    "street_number": "25",
    "street_name": "HALE",
    "street_type": "STREET",
    // etc.
  }
}

This CDC product provides little control over message order because it has limited capability for defining message keys. We configured it to produce all messages to a single topic, partitioned randomly.

Data model

A simplified data model with four tables is used in this article. This simplified diagram shows the tables and their primary key columns:

Table groups

The tables are divided into groups, where those in a group have a common column:

Group

Table

Common column

Address

Addresses

address_id

Customer

Customers CustomerNames CustomerAddresses

customer_id

Why did we do this? We need to be able to process database transactions with any combination of database change messages. 

Each table group corresponds to an aggregate root in the domain-driven design sense and the common column contains the keys used when producing messages for those groups. The CustomerAddresses table is part of the Customer group because it also contains information about the type of customer address and is more naturally part of the customer aggregate. Each table belongs to only one table group.

Domain event messages

We designed a set of domain event messages for each table group:

Group

Event message

Key

Produced when

Address

AddressCreatedEvent

address_id

A row is inserted into Addresses

Address

AddressModifiedEvent

address_id

A row is updated in Addresses

Customer

CustomerCreatedEvent

customer_id

A row is inserted into Customers, along with any inserts into CustomerNames or CustomerAddresses

Customer

CustomerModifiedEvent

customer_id

A row is updated in Customers

A row is inserted or updated in CustomerNames or CustomerAddresses for an existing row in Customers (i.e., no insert into Customers)

The source system does not delete customer and address records.

The topology

Our Kafka Streams topology has three main stages, explained in detail below:

  1. Group the consumed messages by transaction ID.

  2. Aggregate the messages into objects with all the messages for each transaction.

  3. When all messages belonging to a transaction have been received, transform the transaction objects into well-contextualized event messages of the types described above, and produce them to sink topics.

Here is a simplified view of the three stages, followed by enhancements to cater for complexity in actual use. 

All code examples here are in Kotlin. The messages all have string keys and Avro-serialized values. 

Group messages by transaction ID

The first stage groups the incoming messages by transaction ID. This is necessary because the CDC product is not able to generate Kafka keys using transaction ID.

val grouped: KStream<String, SpecificRecord> = StreamsBuilder()
    .stream(
        sourceTopic,
        Consumed.with(Serdes.String(), sourceSerde)
    )
    .groupBy(
        { _, message -> transactionIdFrom(message) },
        Grouped.with(Serdes.String(), sourceSerde)
    )

In this code:

  • Consumed messages are deserialized using sourceSerde.

  • The function transactionIdFrom extracts the transaction ID from the transactional information included in each message.

Grouping messages here forces Kafka Streams to repartition messages but that is unavoidable, given the limited options in the CDC product for creating message keys.

Aggregate messages by transaction ID

This is the core of the topology, where the grouped messages are added to Transaction objects, keyed by transaction ID. The Transaction class is a container with zero or more records from any of the four tables we are processing, and fields that are used to determine when all messages from a transaction have been aggregated. See below for how we constrain the size of the state store.

const val STATE_STORE = "transactions"

val aggregated: KTable<String, Transaction> = grouped.aggregate(
    { TransactionBuilder.newTransaction() },
    { key, value, trans ->
        TransactionBuilder.addEvent(key, value, trans)
    },
    Materialized.`as`<String, Transaction,
                      KeyValueStore<Bytes, ByteArray>>(STATE_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(transactionSerde)
)

In this code:

  • TransactionBuilder.newTransaction() constructs a new Transaction object.

  • TransactionBuilder.addEvent() method adds an event message to a transaction and returns a new, immutable Transaction instance.

  • The transaction objects are materialized to a key-value state store called transactions.

  • The Transaction class is serialized using transactionSerde.

We do not use windowed aggregation because it is not a good fit for aggregating by transaction ID:

  • We have no expectations about how long it will take for all the messages in a transaction to be received.

  • There is no mechanism for calling back when a window closes so we still need to detect transactions that never complete (see below).

KTables continuously emit messages as they aggregate incoming messages but we only want to continue processing transactions that are complete. We use a filter that only allows complete transactions to pass.

val completeAggregations: KStream<String, Transaction> = aggregated
    .filter { _, trans -> TransactionBuilder.isComplete(trans) }
    .toStream()

In this code:

  • TransactionBuilder.iscomplete() function uses transactional information in the messages to determine if all messages for that transaction have been aggregated (see above). If all the messages up to the final one (with last-message flag set true) have been aggregated, in any order, then it is complete.

  • The KTable is converted back to a KStream for further processing.

Process completed transaction objects

completeAggregations
    .flatMap { key, trans -> transformTransaction(key, trans) }
    .to(sinkTopic, Produced.with(Serdes.String(), sinkSerde))

In this code:

  • The transformTransaction() function returns one or more KeyValue objects with an event message derived from the source transaction data. 

  • Message keys are customer ID for CustomerCreatedEvent and CustomerModifiedEvent messages, and address ID for AddressCreatedEvent and AddressModifiedEvent messages.

  • The resulting stream is written to the sink topic with messages serialized using sinkSerde.

We use flatMap because the contract for produced events is one event per address ID or customer ID and the application needs to process transactions that span multiple database rows. 

For example, a single transaction may insert a row into Addresses and CustomerAddresses, so we need to produce both AddressCreatedEvent and CustomerModifiedEvent messages.

Similarly, we need to deal with batch insert or update transactions. For example, if a batch process inserts multiple address rows, we need to produce that number of AddressCreatedEvent messages.

Complexity

The topology we use has more complexity to handle situations encountered in our real-world experience processing CDC transactions.

Incomplete transactions and state store size

One of our concerns was to know if transactions never complete: in other words, if not all the CDC messages for a transaction are consumed by our service, for whatever reason. 

Transactions that never complete will remain in the state store undetected. Our solution is to create a Processor implementation that schedules a Punctuator to periodically check for incomplete transactions.

A side effect of not using windowed aggregation is that the local key-value state store used for Transaction objects has no built-in mechanism to delete objects after they have been processed. We use the punctuator function for that purpose as well.

The processor’s init() method includes:

this.punctuator = context.schedule(
        Duration.ofSeconds(60),
        PunctuationType.WALL_CLOCK_TIME,
        this::cleanUpOldTransactions
)

The cleanup function:

private fun cleanUpOldTransactions(timestamp: Long) =
    stateStore.all().use {
        it.forEach { keyValue ->
            val trans = keyValue.value.value()
            if (TransactionBuilder.isOld(trans, timestamp)) {
                if (!TransactionBuilder.isComplete(trans)) {
                    signalExpiredTransaction(keyValue.key)
                }
                stateStore.delete(keyValue.key)
            }
        }
    }

In this code:

  • The punctuator checks every minute.

  • cleanUpOldTransactions iterates through the transactions in the state store.

  • TransactionBuilder.isOld() checks whether the transaction started more than an hour ago.

  • When an old, incomplete transaction is found, signalExpiredTransaction() generates an alert that prompts investigation and action.

  • The old transaction is deleted from the state store.

Correlation ID in headers

To trace messages through processing by Kafka Streams and other services, we created a correlation ID that is not part of the messages. It is passed in Kafka headers and included in log events. 

The Kafka Streams DSL does not provide access to message headers but has access to the Processor API, which does. Our solution is to call KStream.transformValues() immediately after consuming messages. An implementation of ValueTransformer has access to the ProcessorContext, where we add a correlation ID to Kafka message headers. The processor’s transform() method returns the message unchanged. 

The topology preserves headers through repartitioning and forwards them through aggregation. This means the complete Transaction object is in a message with headers from the last CDC message that was part of it.

Our correlation ID is made by using a truncated hash of the source transaction ID. This design enables us to correlate all Kafka messages originating in one database transaction. 

Initial load

For initial load, the CDC product produces “refresh” messages representing a point-in-time view of the database tables. These messages do not contain any transactional information.

Our application has a second topology for processing refresh messages (not shown here) that uses the same steps but groups and aggregates messages by the ID values common to table groups (i.e. customer_id and address_id). It produces messages for each table group: AddressSnapshotEvent and CustomerSnapshotEvent.

Duplicate CDC messages

We occasionally encountered duplicate CDC messages. The effect of these is to cause the topology to produce duplicate domain event messages (assuming the duplicate CDC message arrives before the Transaction is removed from the state store).

The contract with downstream services is for at-least-once delivery and for those services to treat domain events as idempotent.

Conclusion

We created a Kafka Streams application as the first stage in a pipeline that uses CDC to copy data from a Db2 database into highly available domain microservices. The application is responsible for combining messages from database transactions into discrete events, and for producing those events to domain-specific topics.

The Kafka Streams application uses a simple topology to aggregate database-row messages into transactional events for downstream processing. The topology includes a check that all messages for a transaction have been processed, generating an alert if messages are not consumed within a reasonable time period.

The application constructs a correlation ID from the transaction ID and adds it to the Kafka headers, to enable tracing of messages that originate from a single database transaction. It can also produce snapshot messages from an initial load of the data.

Learn More!

  • Michael is a lead developer at Thoughtworks with many years experience developing back-end services. He has a particular interest in event-driven architectures and observability of complex systems.

Did you like this blog post? Share it now