[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

Analysing Changes with Debezium and Kafka Streams

Written By

Change Data Capture (CDC) is an excellent way to introduce streaming analytics into your existing database, and using Debezium enables you to send your change data through Apache Kafka®. Although most CDC systems give you two versions of a record, as it was before and as it is after the change, it can be difficult to work with if you’re maintaining a replica somewhere else. The Debezium MongoDB CDC Connector gives you just the record-by-record changes that allow you to do exactly what you desire, especially if the change delta itself is of analytical value. This blog post looks at how to combine Kafka Streams and tables to maintain a replica within Kafka and how to tailor the output record of a stream.

Imagine you have an application that collects prices for items that you have added to a wishlist. You set a target price that you’re willing to pay and add links to a couple of different stores that carry the item. You then store each item as a document in a MongoDB collection, such as this:

{
  	"item": "Software Engineering at Google",
  	"target_prize": 20,
  	"store": "amazon.co.uk",
  	"price": 31.99,
  	"last_check": "2020-05-24T16:05:43Z",
  	"url": "https://www.amazon.co.uk/dp/1492082791/"
}

The application can check if a new price surpasses the target price, but you now realise that you haven’t kept track of the changes over time. Fortunately, in the background, MongoDB keeps track of each and every change as part of its replication ability. You can even go as far as to consider it a continual stream of changes.

Change Data Capture with Debezium

Debezium is a collection of Kafka Connect connectors for different databases. As a Kafka Connect plugin, Debezium requires downloading the Debezium MongoDB CDC Connector and adding it to Connect’s plugin.path. Once the plugin is present, you can configure a connector for the MongoDB collection:

echo '{
  "name": "product-source",
  "config": {
	"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
	"mongodb.hosts": "MONGO_REPLICA_SET/MONGO_HOST:MONGO_PORT",
	"mongodb.name": "mongocdc",
	"mongodb.ssl.enabled": false,
	"mongodb.user": "MONGO_USERNAME",
	"mongodb.password": "MONGO_PASSWORD",
	"collection.whitelist": "wishlist.product",
	"key.converter": "org.apache.kafka.connect.json.JsonConverter",
	"value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}' | curl -X POST -d @- http://KAFKA_CONNECT_HOST:8083/connectors --header "Content-Type:application/json"

This configuration chooses the product collection in the wishlist database but you can also use wildcards to choose multiple collections. The connector will get to work and start sending documents to a topic it has created called mongocdc.wishlist.product. If you take a look at the new topic below, you can see the document inside Debezium’s transport envelope:

{
   "Schema":{
      "type": "struct",
      "fields": [
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "after"
         },
         ...
         {
            "type": "struct",
            "fields": [...],
            "optional": true,
            "field": "transaction"
         }
      ],
      "optional": false,
      "name": "mongocdc.wishlist.product.Envelope"
   },
   "payload": {
      "after": "{\"_id\": {\"$oid\": \"5ee39875c94bffb83b95288e\"},\"item\": \"Software Engineering at Google\",\"target_prize\": 20,\"store\": \"amazon.co.uk\",\"price\": 31.99,\"last_check\": \"2020-05-24T16:05:43Z\",\"url\": \"https://www.amazon.co.uk/dp/1492082791/\"}",
      "patch": null,
      "filter": null,
      "source": {
         "version": "1.1.2.Final",
         "connector": "mongodb",
         "name": "mongocdc",
         "ts_ms": 1591974225000,
         "snapshot": "last",
         "db": "wishlist",
         "rs": "mongo-replica-set",
         "collection": "product",
         "ord": 1,
         "h": 0,
         "tord": null
      },
      "op": "r",
      "ts_ms": 1591974232360,
      "transaction": null
   }
}

The envelope consists of a schema (abbreviated here) and a payload.You can see an after property in the payload that contains the complete document as a string. You’re now up and running at least until you encounter your first change.

{
   "schema": {...},
   "payload": {
      "after": null,
      "patch": "{\"$v\": 1,\"$set\": {\"last_check\": {\"$date\": 1591974366044},\"price\": 24.9}}",
      "filter": "{\"_id\": {\"$oid\": \"5ee39875c94bffb83b95288e\"}}",
      "source": {
         "version": "1.1.2.Final",
         "connector": "mongodb",
         "name": "mongocdc",
         "ts_ms": 1591974366000,
         "snapshot": "false",
         "db": "wishlist",
         "rs": "mongo-replica-set",
         "collection": "product",
         "ord": 1,
         "h": 0,
         "tord": null
      },
      "op": "u",
      "ts_ms": 1591974365467,
      "transaction": null
   }
}

Here’s the challenge. Unlike the Debezium CDC connectors for other databases, you only get a patch. While you could write your consumers to handle this, it would have two negative consequences. First, every consumer would need to read the whole topic in order to get the initial complete version of the document and all changes that have happened so far. Second, you’d have duplicate logic handling the merge of the sequence of patches.

Kafka Connect has the ability to specify transformers, some of which Debezium includes; however, you need more than a transformation—you need a way to maintain state between changes for each document in the collection. Thankfully, you can solve both the problems using Kafka Streams. Before taking a closer look at the solution, below is a quick review of some key Kafka Streams concepts.

Streams and tables with Kafka Streams

At its core, Kafka Streams provides two different abstractions on top of regular Kafka topics, streams, and tables. The best way to distinguish between them is to identify the purpose they serve for the data contained in them. A stream is a sequence of events that typically needs to be processed in a certain order, representing an element-by-element evolution of the whole dataset. A table, on the other hand, is a snapshot of the whole dataset at a particular point in time.

There is an interesting interplay between streams and tables: Processing one produces the other. For example, if you accumulate all the events up to a point in time, you produce a table. Should you then emit each change of this table, you produce a new stream. It’s this interplay that solves both challenges—two birds with one stone, as they say.

Using Debezium, changes to the MongoDB table are emitted into a topic representing a stream of changes. With Kafka Streams, you accumulate these into a table by applying each patch as they arrive, and as the table changes, it will emit the complete record as a new stream. This means that new consumers can begin reading the merged stream at any point as it will always contain complete records, and consumers do not need to maintain their own document state or merge logic.

Implementing Kafka Streams for Debezium

Writing a Kafka Streams application is fairly easy compared to connectors and consumers. Some configuration is provided, such as where Kafka can be found, and you can make use of a StreamsBuilderto create the stream (KStream) and table (KTable). Using SQL-like syntax, you can manipulate the data to achieve your goal and finally begin execution.

public static void main(String[] args) {
   String source = "mongocdc.wishlist.product";
   String dest = "product";
   String kafka = "localhost:9092";

Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, source + "-cdc"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass() ); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass() );

StreamsBuilder builder = new StreamsBuilder(); KStream<Bytes, Bytes> cdc = builder.stream(source); KTable<Bytes, Bytes> table = builder.table(dest);

cdc.leftJoin(table, (left, right) -> merge(right, left)) .groupByKey() .reduce((agg, bytes) -> bytes, Materialized.as(dest + ".table")) .toStream().to(dest);

KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }

Most of the code is a fairly straightforward configuration; however, the four lines of SQL-like syntax is where all the work happens and is worth some further explanation.

  cdc.leftJoin(table, (left, right) -> merge(right, left))
      .groupByKey()
      .reduce((agg, bytes) -> bytes, Materialized.as(dest + ".table"))
      .toStream().to(dest);

You can take the KStream, cdc; perform a left join with the KTable, table; and apply your own merge function (explained below) for each joined row. A left join is required as you will receive new documents that do not yet exist in the table, which is the right side of your join. In order for a record in the stream to match a row in the table, the key from the underlying topic is used.

The result of the join and merge is the complete document with the patch applied, but this currently exists only within your application. In order to save your changes, group the data by key for partitioning purposes and reduce the group in order to materialize the stream as your table. You can then send the results off in a new stream.

The merge function is specific to your data source, the Debezium processed MongoDB changelog. As shown from the events of Debezium earlier, you’ll receive a JSON document with the raw MongoDB data contained in a metadata-rich envelope. The key element is the payload, because it contains both the actual data from MongoDB as well as the operation (op) that occurred in MongoDB to cause this data event.

static Bytes merge(Bytes oldValue, Bytes newValue, UpdateStrategy strategy) {
try { if (newValue == null) { return null; }
ObjectMapper mapper = new ObjectMapper(); JsonNode newjson = mapper.readTree( new String(newValue.get(), StandardCharsets.UTF_8) ); JsonNode payload = newjson.get("payload"); String data = null;
switch (payload.get("op").asText()) { case "c": case "r": data = payload.get("after").asText(); break; case "u": data = update( new String(oldValue.get(), StandardCharsets.UTF_8), payload.get("patch").asText() ); break; case "d": data = null; break; }
if (data != null) { return new Bytes(data.getBytes(StandardCharsets.UTF_8)); } else { return null; } } catch (IOException e) { e.printStackTrace(); return null; } }

The operation, op, takes one of four values. Both c (create) and r (read) operations are handled in the same way, that is, as a new and complete document contained in the after element. d (delete) is also easy. You simply return null to remove the document. u (update) requires more work to apply the patch, as seen in the update function below.

static String update(String oldjson, String patch) throws IOException {
   ObjectMapper mapper = new ObjectMapper();
   ObjectNode after = (ObjectNode) mapper.readTree(oldjson);
   JsonNode json = mapper.readTree(patch);

if (json.has("$set")) { JsonNode set = json.get("$set"); for (Iterator<String> it = set.fieldNames(); it.hasNext(); ) { String key = it.next(); after.put(key, tidyValue(set.get(key))); } }

if (json.has("$unset")) { JsonNode unset = json.get("$unset"); for (Iterator<String> it = unset.fieldNames(); it.hasNext(); ) { String key = it.next(); after.remove(key); } }

return after.toString(); }

The patch value is from MongoDB itself and varies depending on the version used, in this case, 4.2. It could contain two lists of key/value pairs, one called $set and the other $unset. With $set, simply make sure there is a key in your document that is set to the value contained in the patch. For $unset, you just need to remove the key if it’s still present in your document.

One last thing to address is how a timestamp from MongoDB arrives in the patch. You may have noticed that in the initial document that there was an ISO8601 date/time format, but the patch contains an epoch time keyed by $date. The tidyValue method oversees the conversion to an ISO8601 date/time so that your consumers receive a consistent format.

static String tidyValue(JsonNode value) {
   if (value.has("$date")) {
       Date date = new Date(Long.parseLong(value.get("$date").asText()));
       return sdf.format(date);
   } else return value.asText();
}

Debezium connected to Kafka Streams

To run the Kafka Streams application, you need to do two things. First, the topic dest with a value of product needs to be created. You can then use Apache Maven to compile and run your application.

mvn compile exec:java \
  -Dexec.mainClass=com.github.gh_mlfowler.mongocdcdemo.MongoCDCKStream \
  -Dexec.cleanupDaemonThreads=false

Consumers of your dest topic will now receive the complete document with no Debezium envelope:

{
   "item":" Software Engineering at Google”,
   "price": 30.99,
   "_id": {
      "$oid": "5ecd8edd9b47ccc632c1aed8"
   },
   "store": "amazon.co.uk",
   "Last_check": “2020-05-24T17:05:43”,
   "target_prize": 20,
   "url":"https://www.amazon.co.uk/dp/1492082791/"
}

With some tweaking to your merge logic, you can actually emit the more typical before and after (sometimes also called old and new) versions of other CDC systems. You need to make a small modification to the op switch statement to put the initial document into an after element in the resulting document:

case "r":
   JsonNode after = mapper.readTree(payload.get("after").asText());
   ObjectNode out = mapper.createObjectNode();
   out.set("after", after);
   data = out.toString();
   break;

For each new update, take the previous after and put it into a before element. Then apply the $set and $unset as before to what will now be stored in the after element.

static String update(String oldjson, String patch) throws IOException {

ObjectMapper mapper = new ObjectMapper(); ObjectNode out = (ObjectNode) mapper.readTree(oldjson); out.set("before", out.get("after"));

ObjectNode after = out.get("after").deepCopy(); JsonNode json = mapper.readTree(patch);

if (json.has("$set")) {...} if (json.has("$unset")) {...}

out.set("after", after); return out.toString(); }

Once you start processing updates to an initial document, your consumers will see documents:

{
   "before":{
      "item":"Software Engineering at Google",
      "price":31.99,
      "_id":{
         "$oid":"5ecd8edd9b47ccc632c1aed8"
      },
      "store":"amazon.co.uk",
      "last_check":"2020-05-24T16:05:43",
      "target_prize":20,
      "url":"https://www.amazon.co.uk/dp/1492082791/"
    },
   "after":{
      "item":"Software Engineering at Google",
      "price":30.99,
      "_id":{
         "$oid":"5ecd8edd9b47ccc632c1aed8"
      },
      "store":"amazon.co.uk",
      "last_check":"2020-05-24T17:05:43",
      "target_prize":20,
      "url":"https://www.amazon.co.uk/dp/1492082791/"
   }
}

But why stop there? Why not introduce a third version, the delta? While the code is not shown here, you can get quite creative in what you generate processing the $set elements. For example, you could use the type of information provided by the Debezium envelope to implement per-type handling.

{
   "before":{
      ...
    },
   "delta":{
      "price":  -1,
      "last_check":"PT1H",
   }
   "after":{
      ...
   }
}

Summary

If you’re interested in experimenting further, check out this demo environment using Vagrant,  available on GitHub. Each of the three update strategies are implemented, which allows you to play with the output. A simple script modifies a document in MongoDB every minute, providing plenty of changes to observe.

By combining Debezium and Kafka Streams, you can enrich the change-only data from MongoDB with the historic document state to output complete documents for further consumption. Using Debezium’s envelope metadata, you’re able to access the typical before and after versions that other CDC systems generate as well as the delta version, which in some scenarios may be all you need.

To discover more Kafka connectors, be sure to visit the Confluent Hub.

  • Mike Fowler is principal data engineer at Claranet and is a Certified Data Management Professional (CDMP). Driven by a belief that humans should only do interesting things, Mike uses his years of experience to build reliable, scalable, and repeatable data pipelines and platforms. Mike holds 15 public cloud certifications across AWS, Azure, and Google, and he particularly enjoys taking customers on the journey from descriptive analytics to prescriptive analytics. Mike is an open source advocate having contributed to PostgreSQL, Terraform, and YAWL. He speaks regularly at conferences and meetups, sharing interesting ways to make great use of these technologies. When not parked behind a keyboard, he can be found amongst his five children, delighting them with his finest Dad jokes.

Did you like this blog post? Share it now