Level Up Your Kafka Skills in Just 5 Days | Join Season of Streaming

Using Data Contracts with Confluent Schema Registry

Written By

The Confluent Schema Registry plays a pivotal role in ensuring that producers and consumers in a streaming platform are able to communicate effectively. Ensuring the consistent use of schemas and their versions allows producers and consumers to easily interoperate, even when schemas evolve over time.

As streaming has become more ubiquitous, enterprises find that downstream components such as consumers are often tasked with handling data inconsistencies, incompatible changes, and expensive and complicated transformations in order to be able to process the data effectively. This has led to an effort to shift these responsibilities to the source of the data, such as the producer, an activity often referred to as shift-left. In the context of modern data architectures, such as streaming and data mesh, the effort to shift-left has led to an emphasis on the data contract. As a result, Confluent Schema Registry, both in Confluent Platform Enterprise and Confluent Cloud, now supports the use of data contracts.

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. The upstream component enforces the data contract, while the downstream component can assume that the data it receives conforms to the data contract. Data contracts are important because they provide transparency over dependencies and data usage in a streaming architecture. They help to ensure the consistency, reliability, and quality of the data in event streams, and they provide a single source of truth for understanding the data in motion.

In this article, we'll walk through an example of enhancing a schema to be a full-fledged data contract. Our example will involve the following actions:

  • Defining an initial schema for the data contract

  • Enhancing the data contract with business metadata

  • Adding data quality rules to the data contract

  • Specifying custom actions for the data contract

  • Adding migration rules to the data contract for a complex schema evolution

Defining an order schema

Let's create a simple Avro schema to represent a customer order.

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "state",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Assuming the above schema is in a file named order.avsc, upload it to a local Schema Registry as follows:

jq -n --rawfile schema order.avsc '{schema: $schema}' | 
  curl http://localhost:8081/subjects/orders-value/versions --json @-

Let's try producing and consuming with our new schema using the Avro console producer and consumer, which you can find the documentation for here. First, start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer, and pass the schema ID that was returned during registration as the value of value.schema.id.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record in JSON format is sent via the producer, it will be received by the consumer.

Enhancing the data contract with metadata

As mentioned, in the effort to shift-left the responsibilities for the quality and timeliness of streaming data, data contracts are typically defined by the producers. By examining the data contract, a downstream consumer can determine which person or team is responsible for the data contract and what service level objectives (SLOs) the data contract is attempting to achieve. 

In a file named order_metadata.json, we declare that this contract is owned by the Orders Team, and that one of the service level objectives is to ensure that orders are available to downstream consumers no later than 10 seconds after the order timestamp. We'll make use of these metadata properties later in this article.

{
  "metadata": {
    "properties": {
       "owner": "OrdersTeam",
       "owner_email": "orders.team@acme.com",
       "slo_timeliness_secs": "10"
    } 
  }
}

Register the metadata directly to Schema Registry without specifying a schema. When omitting the schema while registering metadata, a new schema version will be created with the schema of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_metadata.json

Adding data quality rules to the data contract

As shown in the previous section, data contracts can capture business metadata. They can also be used to capture integrity constraints or data quality rules to ensure the quality of the data. In a file named order_ruleset.json, let's add a rule that the price needs to be a positive number. The rule is specified as a Google Common Expression Language (CEL) expression.  

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0"
      }
    ]
  }
}

Similarly to the metadata, register the rule set directly to Schema Registry without specifying a schema, as a new schema version will be created with the schema (as well as metadata) of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset.json

The above rule will cause all messages with a non-positive price to be rejected.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0"
      }
    ]
  }

Let's try out our data quality rule. First start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": -1, "state": "Pending", "timestamp": 1693591356 }

We should see an error of the form: 

Expr failed: 'message.totalPriceCents > 0'

To learn more about CEL, see Understanding CEL in Data Contract Rules.

Specifying custom rule executors and actions

The rule framework for data contracts is completely customizable. One can specify both custom rule executors and actions for a data contract.  

Below is the Java interface for rule executors. The "transform" method is used by rules of both CONDITION and TRANSFORM. In the former case, it should return a Boolean value; in the latter case, it should return the transformed value.

public interface RuleExecutor extends RuleBase {
  String type();

  Object transform(RuleContext ctx, Object message) throws RuleException;
}

Below is the Java interface for rule actions. An exception is passed to the rule action if the rule executor failed, which includes a condition that returned false. After performing its work, the rule action should throw this exception if it exists.

public interface RuleAction extends RuleBase {
  String type();

  void run(RuleContext ctx, Object message, RuleException ex)
    throws RuleException;
}

Assume that on the consumer side we want to check the timeliness of data to see if it conforms to the declared service level objective. If it doesn't, we want to send an alert in the form of an email message. Below is a custom rule action to check the timeliness SLO. Note that the type of the action is specified as "EMAIL".  Also, the expression ctx.getParameter(name) will look for the value of a rule parameter with the given name. If not found, it will then look for the value of a metadata property with the given name.

public class EmailAction implements RuleAction {

  private static final String USERNAME = "username";
  private static final String PASSWORD = "password";

  private String username;
  private String password;

  public String type() {
    return "EMAIL";
  }

  @Override
  public void configure(Map<String, ?> configs) {
    this.username = (String) configs.get(USERNAME);
    this.password = (String) configs.get(PASSWORD);
  }

  public void run(RuleContext ctx, Object message, RuleException ex)
    throws RuleException {
    if (!meetsSloTimeliness(ctx, message)) {
      sendMail(ctx, message);
    }
    if (ex != null) {
      throw ex;
    }
  }

  private boolean meetsSloTimeliness(RuleContext ctx, Object message)
    throws RuleException {
    try {
      String sloStr = ctx.getParameter("slo_timeliness_secs");
      int slo = Integer.parseInt(sloStr);
      long timestamp = (Long) ((GenericData.Record) message).get("timestamp");
      long now = System.currentTimeMillis();
      return now - timestamp <= slo * 1000L;
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }

  private void sendMail(RuleContext ctx, Object message)
      throws RuleException {
    try {
      String from = ctx.getParameter("mail.smtp.from");
      String to = ctx.getParameter("owner_email");
      String sloTimeliness = ctx.getParameter("slo_timeliness_secs");

      Properties props = new Properties();
      props.put("mail.smtp.host", ctx.getParameter("mail.smtp.host"));
      props.put("mail.smtp.port", ctx.getParameter("mail.smtp.port"));
      props.put("mail.smtp.auth", "true");
      props.put("mail.smtp.starttls.enable", "true");
      Session session = Session.getInstance(props, new Authenticator() {
        @Override
        protected PasswordAuthentication getPasswordAuthentication() {
          return new PasswordAuthentication(username, password);
        }
      });

      Message mail = new MimeMessage(session);
      mail.setFrom(new InternetAddress(from));
      mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
      mail.setSubject("Mail Subject");

      String msg = "Order '" + ((GenericData.Record) message).get("orderId")
          + "' failed to meet the timeliness SLO of "
          + sloTimeliness + " seconds";

      MimeBodyPart mimeBodyPart = new MimeBodyPart();
      mimeBodyPart.setContent(msg, "text/html; charset=utf-8");
      Multipart multipart = new MimeMultipart();
      multipart.addBodyPart(mimeBodyPart);
      mail.setContent(multipart);

      Transport.send(mail);
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }
}

Now that we've implemented a rule action to check the timeliness SLO, let's add it to the rule set. Note that the "mode" of the rule is "READ" since the timeliness check happens on the consumer. We use a CEL expression of "true", to ensure that the action is always invoked.  Finally, the "onSuccess" action is set to "EMAIL".

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0"
      },
      {
        "name": "checkSloTimeliness",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "READ",
        "expr": "true",
        "params": {
          "mail.smtp.host": "smtp.acme.com",
          "mail.smtp.port": "25",
          "mail.smtp.from": "billing.team@acme.com"
        },
        "onSuccess": "EMAIL"
      }
    ]
  }
}

The above code is available in a GitHub repository.  After cloning the repository, register the schema with the following command:

mvn schema-registry:register

To run the consumer:

mvn compile exec:java \ 
  -Dexec.mainClass="io.confluent.data.contracts.ConsumerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties group-1 client-1 <emailUsername> <emailPassword>"

To generate random Order data, run the producer:

mvn compile exec:java \
  -Dexec.mainClass="io.confluent.data.contracts.ProducerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties client-1"

You should now see emails being sent for any records that do not meet the timeliness SLO.

Adding migration rules to the data contract

The consumers may be satisfied with the data contract so far, but perhaps one day the Orders Team decides that they want to change the schema in a backward-incompatible manner. For example, they decide to change the field named "state" to "status". In Avro, one possibility is to use an alias, but some Avro implementations might not support aliases. So for the sake of this article, we'll consider changing the name of an Avro field to be a backward-incompatible change (as is the case for Protobuf and JSON Schema).

To support breaking changes within a schema version history, we need a way to partition the history into subsets of versions that are compatible with one another. We can achieve this with the notion of a compatibility group. We choose an arbitrary name for a metadata property, such as "major_version", and then use that property to specify which data contracts belong to the same compatibility group.

First, we configure Schema Registry to only perform compatibility checks for schemas within a compatibility group.

curl http://localhost:8081/config/orders-value \
  -X PUT --json '{ "compatibilityGroup": "major_version" }'

Next, we change the field named "state" to "status", as specified in a file named "order2.avsc".  

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Register the schema, and pass a new metadata property with the name "major_version" and value "2", so that the schema need not be backward compatible with previous versions.

jq -n --rawfile schema order2.avsc '{ schema: $schema, metadata: { properties: { owner: "OrdersTeam", email: "orders.team@acme.com", slo_timeliness_secs: 10, major_version: 2 } } }' | 
  curl http://localhost:8081/subjects/orders-value/versions --json @-

If desired, we can now pin a client to the latest version of a specific compatibility group using the following client-side properties:

use.latest.with.metadata=major_version=2
latest.cache.ttl.sec=1800

Above we've specified that the client should check for a new latest version after every 30 minutes.

Now that our subject supports breaking changes in the version history as partitioned by compatibility groups, we need a way for the consumer to transform messages for the previous compatibility group to the current one. We can achieve this with migration rules using JSONata.  Below the UPGRADE rule allows new consumers to read old messages, while the DOWNGRADE rule allows old consumers to read new messages. If you plan to upgrade all consumers, you can omit the DOWNGRADE rule. In each migration rule, we use the JSONata function called $sift() to remove a field with one name, and then use a JSON property to add a field with another name.

{
  "ruleSet": {
    "domainRules": [
      ...
    ],
    "migrationRules": [
      {
        "name": "changeStateToStatus",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "UPGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'state'}), {'status': $.'state'}])"
      },
      {
        "name": "changeStatusToState",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "DOWNGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'status'}), {'state': $.'status'}])"
      }
    ]
  }
}

The new rule set, in a file named order_ruleset2.json, has both domain rules and migration rules and can be registered separately from the schema, as before.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset2.json

After updating the metadata and rule set, let's try out our migration rule. We'll produce a record using the original schema version with field "state" and see it transformed by the consumer to conform to the latest schema version with field "status".  

First, start the consumer. Note that we specify use.latest.version=true to tell the consumer that we want the record to conform to the latest version even if it was produced with an older version. As mentioned, we could alternatively specify use.latest.with.metadata=major_version=2. Either setting will cause the migration rule to be invoked.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092 \
  --property use.latest.version=true

In a separate terminal, start the producer. Note that we should pass the ID of the original schema that has a field named "state" instead of "status".

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record is sent to the producer, the following record is received by the consumer, where the JSONata transform has modified the record to have a field named "status" instead of "state".

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "status": "Pending", "timestamp": 1693591356 }

The combination of compatibility groups with migration rules for transforming data across those groups is a powerful combination for evolving schemas in arbitrarily complex ways.

To learn more about JSONata, see Understanding JSONata.

Summary

The use of data contracts helps to shift-left the responsibility of ensuring data quality, interoperability, and compliance to the upstream component, which is the source of the data. By doing so, a data contract can provide a formal agreement between an upstream component and a downstream component for the structure and semantics of the data in motion. Today Confluent Schema Registry supports the use of data contracts to make streaming more reliable and powerful than ever before.

To learn more, see the documentation on Data Contracts for Schema Registry. If you’d like to get started with Confluent Cloud or Confluent Platform:

  • Robert Yokota is a software engineer at Confluent, currently working in the area of data governance. He previously worked at Microsoft.

Did you like this blog post? Share it now