[Webinar] Master Apache Kafka® Fundamentals with Confluent | Register Now

Managing Data Contracts: Helping Developers Codify “Shift Left”

Written By

We live in a world of events. The phone in your pocket is emitting data about your location, and receiving a notification to order your morning coffee from your favorite shop en route to work. Your thermostat knows you’re out for the day, and adjusts the temperature to save energy. Your refrigerator automatically orders a replacement water filter after serving a given amount of water. Railway sensors send a location event for cars passing by.

As a customer or consumer, we don’t give much thought to the format of these events. Our thoughts are more like:

  • “I REALLY need that extra shot of espresso this morning!”

  • “But if I take Elm Street this morning, is that speed trap still there?”

  • “I sure hope the fridge got a better deal on that filter than last time.”

Maybe it’s just me, but the data streaming engineer in me can’t help but think:

  • “Do they express those shots as a string or a numeric type?”

  • “How many drivers have to report the cop before the alert shows on the map?”

  • “Did the fridge get a software update that allows it to send discount codes to buy the filter?”

Ok, I don’t own a “connected fridge,” but you can see by these thoughts how the formatting of events can evolve over time. Remember the words of Heraclitus: “There is nothing permanent except change.” Even 2500 years ago, the guy knew a thing or two about event-driven design.

What is “Shift Left?”

Given the distributed nature of event-driven architecture—with its “produce once, consume many” nature—the logical place to isolate such changes is near the source, as far “left” as possible. Doing so means consumers are able to trust the data they encounter because of an agreed upon data contract—including structure, integrity constraints, metadata, security, and evolution.

In the diagram above, we see these elements of an explicit agreement between a data producer and a data consumer, formalizing a data contract:

  • For starters, we have a schema, which defines the fields and their types. For example, the customer_ssn is mandatory, and a string.

  • Next, we see declarative constraints and data quality rules on the values of the fields. With these, we can enforce data quality within streaming. Data is automatically rejected if it doesn’t adhere to these rules.

  • Metadata about the schema adds clarity for discoverability purposes, for instance which system/application wrote this data.

  • Fields that contain sensitive information must be encrypted.

  • Finally, Schema Registry provides a method of cataloging the language of the data—organizing, validating, and tracking the evolution of data contracts in a centralized location.

By building quality controls into the pipeline, the teams producing the data are the ones responsible for its quality. Producers are enabled to land the data in a clean way and data consumers can use it in a clean way.

So let’s dive into some tools to enable developers to produce this high-quality data in their pipelines. Much of what we’ll discuss here has been covered in the Confluent Cloud Documentation on Stream Governance, and by my colleague Gilles Philippart here on YouTube. But as a microservices developer—whose “love languages” are Kotlin and Terraform—I’d like to codify some of these concepts. We’ll create Confluent Cloud assets, the nucleus of our data streaming solution, using Terraform. Then in Confluent Schema Registry, we’ll configure our subjects, apply schemas for the data in our Apache Kafka® topics, and evolve those schemas with migration rules.

At the time of writing, the features involving migration rules and metadata are only available for the JVM languages, Go, and .NET Confluent client SDKs.

Developers, get in the game.

Application developers! We turn coffee into code. And in this case, we’ll turn schemas and migration rules into high-quality data streams. We’ll build a couple of simple Kafka client applications—first to produce and consume data with version 1 of a schema, then another to consume those same events with version 2 of the schema.

To start, clone the demo-scene repository and navigate to the data-contracts directory. Follow the steps outlined in the README document to create a Confluent Cloud environment, with a Kafka cluster and Kafka topic. From there, Apache Avro schemas are registered with Confluent Schema Registry, and then downloaded by the application modules to generate Java classes that are used to serialize events to Kafka. Let’s take a closer look at the modules in this directory.

Defining and registering schemas

The data-contracts demo includes a Gradle project called schemas, and as the name implies, our Apache Avro schemas live here. This Gradle project makes use of the schema-registry-plugin to interact with Confluent Schema Registry. For JVM developers, a similar plugin (maintained by Confluent) exists for Maven.

Using the Kotlin DSL for Gradle, the build.gradle.kts defines a schemaRegistry closure with a directive to register the schemas with their corresponding metadata and migration rules:

schemaRegistry {
   val baseBuildDir = "${project.projectDir}/src/main"
   val avroSchemaDir = "$baseBuildDir/avro"
   val rulesetDir = "$baseBuildDir/rulesets"
   val metadataDir = "$baseBuildDir/metadata"

   register {
       subject(inputSubject =  "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v1.avsc")
           .setMetadata("$metadataDir/membership_major_version_1.json")
       subject(inputSubject =  "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v2.avsc")
           .addLocalReference("validity-period", "$avroSchemaDir/validity_period.avsc")
           .setMetadata("$metadataDir/membership_major_version_2.json")
           .setRuleSet("$rulesetDir/membership_migration_rules.json")
   }
}

The schema-registry-plugin provides the registerSchemasTask to do just that:

./gradlew registerSchemasTask

As a result, we now have two versions of the membership-avro-value schema, denoting the compatibilityGroup to be specified as major_version. Then each schema has a metadata attribute value for the major_version key. In version 2, we also see migration rules defined from the setRuleSet method of the build file. Validate this in the Confluent Cloud console in the “Data Contracts” tab of the environment.

Application using schema version 1

Now let’s produce and consume data that adheres to the schemas we’ve registered, starting with version 1. A membership entity, version 1, is defined with this Avro schema:

{
 "name": "Membership",
 "namespace": "io.confluent.devrel",
 "type": "record",
 "fields": [
   {"name": "user_id", "type": "string"},
   {"name": "start_date", "type": {"type": "int", "logicalType": "date"}},
   {"name": "end_date", "type": {"type": "int", "logicalType": "date"}}
 ]
}

This is a “flat” structure, meaning that all fields are of a simple type like string or a numeric type. The module app-schema-v1 uses the schema-registry-plugin downloadSchemasTask to download version 1 of the membership-avro-value schema from the registry into its src/main/avro directory:

schemaRegistry {
   download {
       // download the membership avro schema, version 1
       subject("membership-avro-value", "${projectDir}/src/main/avro", 1)
   }
}

This module also uses the gradle-avro-dependency-plugin to generate Java code from the downloaded schema:

./gradlew generateAvroJava

Wait, this is a Kotlin project…right? YES! And the interoperability with Java libraries is a key feature of the Kotlin language. This means that we can easily use these generated classes, as well as the classes of the Kafka client. Let’s get down to business with that, first in the form of a KafkaProducer:

class MembershipProducer: BaseProducer<String, Membership>(mapOf(
   ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
   ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer",
   ProducerConfig.CLIENT_ID_CONFIG to "membership-producer-app-v1",
   AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
   AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
   AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
))

Zooming in on the configuration of the KafkaProducer above, we see:

  • The value.serializer property is a schema-registry-aware implementation—KafkaAvroSerializer.

  • The latest.compatibility.strict setting tells the serializer to check for backward compatibility between the latest version of the schema with the provided metadata and the schema of the object being serialized (true is the default, but I explicitly set it here).

  • The serializer is configured such that it will not  use the latest version of the schema—use.latest.version

  • Instead, the serializer should use the latest version of the schema where the metadata (use.latest.with.metadata) contains major_version=1. Sound familiar?

In a similar fashion, we create a class to house our KafkaConsumer:

class MembershipConsumer: BaseConsumer<String, Membership>(mapOf(
   ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v2",
   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
   AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
   AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
   AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
))

The consumer configuration looks a lot like the producer:

  • The value.deserializer property is a schema-registry-aware implementation—KafkaAvroDeserializer.

  • The latest.compatibility.strict and use.latest.version values are identical.

  • The deserializer should use the latest version of the schema where the metadata (use.latest.with.metadata) contains major_version=1.

Running the main function of the ApplicationMain object in this module will produce random records to the Kafka topic. It also started a consumer on a new thread to print the events to the console. Here is a sample output:

[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - Received Membership d0e65c83-b1c5-451d-b08b-8d1ed6fca8d6, {"user_id": "d0e65c83-b1c5-451d-b08b-8d1ed6fca8d6", "start_date": "2023-01-14", "end_date": "2025-05-28"}
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - Received Membership 940cf6fa-eb12-46af-87e8-5a9bc33df119, {"user_id": "940cf6fa-eb12-46af-87e8-5a9bc33df119", "start_date": "2023-05-23", "end_date": "2025-07-02"}

Consuming with schema version 2

Schemas are code, thus they will change. In this case we’ll evolve the membership schema to encapsulate the start_date and end_date fields into a new record type called ValidityPeriod. Sounds reusable, right? So a ValidityPeriod is defined in its own schema like this:

{
 "name": "ValidityPeriod",
 "namespace": "io.confluent.devrel",
 "type": "record",
 "fields": [
   {"name": "from", "type": {"type": "int", "logicalType": "date"}},
   {"name": "to", "type": {"type": "int", "logicalType": "date"}
   }
 ]
}

Then the new version of Membership is altered to use the new record type:

{
 "name": "Membership",
 "namespace": "io.confluent.devrel",
 "type": "record",
 "fields": [
   { "name": "user_id", "type": "string" },
   { "name": "validity_period", "type": "io.confluent.devrel.ValidityPeriod" }
 ]
}

And if you’ve been around Avro for more than a few days, you’ll see this is a BREAKING CHANGE! We can’t do that…or can we? Confluent Schema Registry provides functionality to create data migration rules—essentially mappings between compatibility groups of a schema. We can provide UPGRADE and DOWNGRADE rules, such that consumers may specify how to map between the “breaking” fields. Here’s an illustration from the Confluent Cloud documentation, explaining migration rules:

This functionality is available to the Java, Go, and .NET clients of Confluent Schema Registry. For Java clients, add the following dependency:

In Maven:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-rules</artifactId>
    <version>7.8.0</version>
</dependency>

In Gradle (as in our example):

implementation("io.confluent:kafka-schema-rules:7.8.0")

Let’s revisit how the data contracts were registered in the schemas module.

schemaRegistry {
   val baseBuildDir = "${project.projectDir}/src/main"
   val avroSchemaDir = "$baseBuildDir/avro"
   val rulesetDir = "$baseBuildDir/rulesets"
   val metadataDir = "$baseBuildDir/metadata"

   register {
       subject(inputSubject =  "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v1.avsc")
           .setMetadata("$metadataDir/membership_major_version_1.json")
       subject(inputSubject =  "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v2.avsc")
           .addLocalReference("validity-period", "$avroSchemaDir/validity_period.avsc")
           .setMetadata("$metadataDir/membership_major_version_2.json")
           .setRuleSet("$rulesetDir/membership_migration_rules.json")
   }
}

Here we see the membership_v2.avsc schema file in the register closure, defined as a subject. And we also see a call to setMetadata, applying “major_version”:  2 for this new version. We also see a call to addLocalReference, in this case to the definition of the ValidityPeriod type. Finally, we find a call to setRuleSet, defining a set of migration rules.

{
 "migrationRules": [
   {
     "name": "move_start_and_end_date_to_validity_period",
     "kind": "TRANSFORM",
     "type": "JSONATA",
     "mode": "UPGRADE",
     "expr": "$merge([$sift($, function($v, $k) {$k != 'start_date' and $k != 'end_date'}), {'validity_period': {'from':start_date,'to':end_date}}])"
   },
   {
     "name": "move_validity_period_to_start_date_and_end_date",
     "kind": "TRANSFORM",
     "type": "JSONATA",
     "mode": "DOWNGRADE",
     "expr": "$merge([$sift($, function($v, $k) {$k != 'validity_period'}), {'start_date': validity_period.from, 'end_date': validity_period.to}])"
   }
 ]
}

Taking a closer look at these migration rules, we see that they are of the kind TRANSFORM—denoting that they are used to transform the data to another structure. The mode attribute dictates the directionality of the transformation. And the transformations themselves are defined using JSONATA expressions. For more on JSONata, check out Robert Yokota’s blog on Understanding JSONata. While JSONata is typically used for migration rules, Google CEL expressions are more commonly used for data quality rules.

Here we see an UPGRADE rule (from major_version 1 to major_version 2), which states that if the event HAS start_date and end_date values, map those to the validity_period from and to fields, respectively. We also see a DOWNGRADE rule for clients deserializing the events with the version 1 schema. This rule says that if there is a validity_period, map the from and to fields of that object to the start_date and end_date on the root object.

Let’s use this version 2 schema to consume the version 1 events produced to the Kafka topic. Here is a consumer instance, configured to do just that:

class MembershipConsumer: BaseConsumer<String, Membership>(mapOf(
   ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v2",
   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
   AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
   AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=2"
))

The most noticeable difference between this consumer and the previous one is the use.latest.with.metadata, denoting we want to use the latest schema where compatibilityGroup is set to major_version=2. This consumer is able to deserialize all events with this new schema version because the client makes use of the UPGRADE migration rule. Here’s a sample console output:

[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - v2 - Received Membership 2c6382fb-792f-42ed-b5e7-21ef68b2984b, {"user_id": "2c6382fb-792f-42ed-b5e7-21ef68b2984b", "validity_period": {"from": "2024-05-15", "to": "2025-02-12"}}
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - v2 - Received Membership b0e34c68-208c-4771-be19-79689fc9ad28, {"user_id": "b0e34c68-208c-4771-be19-79689fc9ad28", "validity_period": {"from": "2022-11-06", "to": "2025-09-03"}}

So even the events produced with the version 1 schema are usable by clients using the version 2 schema. For more on the serializer/deserializer configurations we called out, see the Confluent Cloud documentation explanation of handling differences between preregistered and client-derived schemas.

Nice… I like it… land the plane

How is this functionality important or useful? Let’s go back to the reasons why companies implement a microservices architecture. Among the reasons is the trait of autonomy: allowing different elements of the architecture to scale, evolve, and deploy independently, while adhering to a set of contracts. In event-driven microservices, these contracts are schemas and the related rules that impose data governance on the events exchanged by our services.

With this functionality as implemented in Confluent Cloud with Data Governance, data contracts can be collaboratively defined, implemented and evolved. We can use CI/CD practices to test, validate, and register the contracts with Confluent Schema Registry. From there, applications can utilize tried and tested tooling to download and utilize those schemas to develop new functionality. Most importantly, with migration rules as a part of the data contract, consumers and producers can make changes to use new schemas at their own deployment cadence—be that a fully automated CD pipeline or a monthly release.

Tell me more!

Again, I invite you to clone the demo-scene repository and step through the examples in the data-contracts directory. Create a Confluent Cloud account and use the promo code POPTOUT000MZG62 for a $400 credit in the first 30 days—plenty of time to explore this demo. Add new topics and schemas for your use cases to see if these patterns work for you.

Check out the Confluent Cloud documentation regarding Governing Data Streams. Zoom in on the section about Data Contracts, featuring the aforementioned video from my colleague Gilles Philippart.

And as always, leave your feedback in GitHub or my social media channels. I’d love to hear from you—if you found this useful, or you’d like to hear more.

Apache®, Apache Kafka®, Kafka®, are registered trademarks of the Apache Software Foundation.

  • Sandon Jacobs is a Developer Advocate at Confluent, based in Raleigh, NC. Sandon has two decades of experience designing and building applications, primarily with Java and Scala. His data streaming journey began while building data pipelines for real-time bidding on mobile advertising exchanges—and Apache Kafka was the platform to meet that need. Later experiences in television media and the energy sector led his teams to Kafka Streams and Kafka Connect, integrating data from various in-house and vendor sources to build canonical data models.

    Outside of work, Sandon is actively involved in his Indigenous tribal community. He serves on the NC American Indian Heritage Commission, and also as a powwow singer and emcee at many celebrations around North America. Follow Sandon on Twitter @SandonJacobs or Instagram @_sandonjacobs, where he posts about his powwow travels, family, golf, and more.

Did you like this blog post? Share it now