[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

Decoupling Systems with Apache Kafka, Schema Registry and Avro

Verfasst von

As your Apache Kafka® deployment starts to grow, the benefits of using a schema registry quickly become compelling. Confluent Schema Registry, which is included in the Confluent Platform, enables you to achieve strong decoupling of the systems you integrate via Kafka, in turn allowing your teams to be more agile and create applications that are more robust to change.

Why you want a schema registry

Kafka producers and consumers are already decoupled in the sense that they do not communicate with one another directly; instead, information transfer happens via Kafka topics. But they are coupled in the sense that consumers need to know what the data they are reading represents in order to make sense of it—but this is something that is controlled by the producer! In the same way that Kafka acts as an intermediary for your data, Schema Registry can act as an intermediary for the types of data that you publish to Kafka.

In any non-trivial system, requirements change over time and the schemas needed to satisfy them change accordingly; that is, schemas evolve. This evolution makes this form of coupling even stronger. Not only do consumers need to understand the type of data they are reading today, they also depend on any changes that might be made to this type over time. You can reduce this coupling by defining constraints on the way schemas are allowed to evolve upfront and manage these constraints using Schema Registry as well. Producers agree to adhere to the published evolution rules, and consumers are written to be able to handle any schema that is consistent with them. The downside of this, of course, is that it limits flexibility and adds some overhead due to additional formality. However, the payoff in terms of reduced coupling is usually much greater.

Confluent Schema Registry is built around the Apache Avro serialization format. One of the reasons Avro was selected as the recommended format for use with Confluent Platform is that it has flexible, well-defined rules around schema evolution. Jay Kreps expands on this and other appealing aspects of Avro in his article Why Avro for Kafka Data?

Side Note: Avro is often compared with Protobuf and Thrift, two other commonly used serialization formats. A key way in which Avro differs from these formats is that it requires that the schema used to serialize data be available when parsing it. Protobuf and Thrift do notthe serialized data includes enough meta-information to allow it to be parsed without a schema. But this meta-information is only a partial description of the data. For example, it doesn’t include field names or doc strings, so it doesn’t replace the role of a schema registry. 

Putting it into practice with .NET

In this blog post, we’ll walk through a practical example of how to use Avro and Schema Registry from C#/.NET. We’ll assume that you have a Kafka cluster and Schema Registry up and running already (for more information, refer to the Confluent Platform Quick Start). We’ll also assume that you’re set up to build and run C# applications, either targeting the .NET Framework or .NET Core on your platform of choice.

The .NET Avro serializer and deserializer allow you to work with Avro data in one of two ways:

  1. Via specific classes generated using the avrogen tool
  2. Using the GenericRecord class to read and write records with any schema, without needing to pre-generate any class files

We’ll demonstrate how to use both, starting with GenericRecord, then the specific compiler-generated classes. In general, you’ll find working with the specific classes much simpler, and you should prefer this where possible. Use GenericRecord in scenarios where you need to work dynamically with data of any type.

First, let’s define a schema for a simple log message type. We’ll specify this over two files: one for the LogMessage itself, and the other for the LogLevel enumeration that it depends on:

{
  "namespace": "MessageTypes",
  "type": "enum", 
  "doc": "Enumerates the set of allowable log levels.",
  "name": "LogLevel", 
  "symbols": ["None", "Verbose", "Info", "Warning", "Error"]
}

{  "namespace": "MessageTypes",  "type": "record",  "doc": "A simple log message type as used by this blog post.",  "name": "LogMessage",  "fields": [    { "name": "IP", "type": "string" },    { "name": "Message", "type": "string" },    { "name": "Severity", "type": MessageTypes.LogLevel }  ] }

The Avro format allows you to specify documentation along with your schemas using the doc attribute. You should make good use of this feature. Schemas should be self-contained—someone using your schema should not need to consult any other source of information. Pro tip: you can auto-generate user-readable documentation for your schemas using the Avrodoc tool.

Below is an example of how to produce a LogMessage message to Kafka conforming to the above schema. (Note: The complete source code for all examples is available on GitHub.)

var config = new Dictionary<string, object>
{
    { "bootstrap.servers", bootstrapServers },
    { "schema.registry.url", schemaRegistryUrl }
};

using (var producer = new Producer<Null, GenericRecord>( config, null, new AvroSerializer<GenericRecord>())) {    var logLevelSchema = (EnumSchema)Schema.Parse(        File.ReadAllText("LogLevel.avsc"));    var logMessageSchema = (RecordSchema)Schema        .Parse(File.ReadAllText("LogMessage.V1.avsc")            .Replace(                "MessageTypes.LogLevel",                File.ReadAllText("LogLevel.avsc")));

   var record = new GenericRecord(logMessageSchema);    record.Add("IP", "127.0.0.1");    record.Add("Message", "a test log message");    record.Add("Severity", new GenericEnum(logLevelSchema, "Error"));    producer.ProduceAsync("log-messages", null, record)        .ContinueWith(dr => Console.WriteLine(dr.Result.Error            ? $"error producing message: {dr.Result.Error.Reason}"            : $"produced to: {dr.Result.TopicPartitionOffset}"));

   producer.Flush(TimeSpan.FromSeconds(30)); }

There are a few things worth noting:

  • In addition to the required producer property bootstrap.servers, you’ll need to specify a value for schema.registry.url, which is used by the Avro serializer to communicate with Schema Registry.
  • The Avro specification does not provide a way for one schema file to reference another. However, in order to avoid duplicating the LogLevel schema definition, we need to do this because we’re required to specify it separately when constructing the GenericEnum Severity value. Instead of relying on Avro, we use string substitution in our C# program. The LogMessage schema specified above is not a valid Avro schema prior to doing this substitution.
  • By default, the serializer will automatically register new schemas with Schema Registry where compatibility rules allow. Since we haven’t yet registered any other schema against this topic, our LogMessage schema will be automatically registered before the message is produced.
  • Behind the scenes, each schema registered with Schema Registry is given a unique ID that is prepended to the serialized data in each message sent to Kafka. When a consumer reads a message, it uses this ID to associate the message with the schema that was used to write it. If the consumer encounters data that was serialized with a schema it has not seen yet, it will contact Schema Registry to retrieve the schema.

You can verify that the schema was added to Schema Registry as follows:

curl http://localhost:8081/subjects

> ["log-messages-value"]

Notice that the schema was registered under the name log-messages-value, which is the topic name concatenated with -value. This name is referred to as a subject. Corresponding to any given topic, there are potentially two subjects registered in Schema Registry—one corresponding to the message value and one corresponding to the message key (i.e., the topic name appended with -key).

You can interact with Schema Registry further using its REST API. Something else you might want to do is set the subject compatibility to FULL (by default, it’s BACKWARD):

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "FULL"}' \
    http://localhost:8081/config/log-messages-value

Full compatibility enforces that any schema written to this subject is both forwards and backwards compatible with older schemas:

  • A schema is backward compatible if it can be used to read data written with a previous version of the schema.
  • A schema is forward compatible if previous versions of the schema can be used to read data written by it.

From the perspective of decoupling producers and consumers, forwards compatibility is very desirable because it ensures consumers will be able to read new versions of data without being updated. Backwards compatibility is important for data destined for archival storage because it ensures that the latest schema can be used to read all historical data.

It’s common to want to evolve a schema by adding another field. Conveniently, the new schema will be both forwards and backwards compatible with the old one, provided you specify a default value for the added fields. For a full set of forwards/backwards compatibility rules, refer to the documentation on data serialization and evolution.

OK, let’s add a tags field to our original schema, and this time we’ll define the LogLevel enum inline:

{ 
  "namespace": "MessageTypes",
  "type": "record",
  "doc": "A simple log message type as used by this blog post.",
  "name": "LogMessage",
  "fields": [
    { "name": "IP", "type": "string" },
    { "name": "Message", "type": "string" },
    { "name": "Tags",
      "type": { "type": "map",
                "values": "string"},
                "default": {}},
    { "name": "Severity",
      "type": { "namespace": "MessageTypes",
                "type": "enum",
                "doc": "Enumerates the set of allowable log levels.",
                "name": "LogLevel",
                "symbols": ["None", "Verbose", "Info", "Warning", "Error"]}}
  ]
}

Instead of making use of the GenericRecord class, let’s now use the avrogen tool to generate a C# class corresponding to the new schema. avrogen is available as a .NET Core Global Tool (requires .NET Core 2.1 or above). It can be installed as follows:

dotnet tool install -g Confluent.Apache.Avro.AvroGen

Now, you can run the tool in your project directory:

avrogen -s LogMessage.V2.avsc .

And then use the generated class directly with AvroSerializer:

var config = new Dictionary<string, object>
{
    { "bootstrap.servers", bootstrapServers },
    { "schema.registry.url", schemaRegistryUrl }
};

using (var producer = new Producer<Null, MessageTypes.LogMessage>( config, null, new AvroSerializer<MessageTypes.LogMessage>())) {    producer.ProduceAsync(        "log-messages", null,        new MessageTypes.LogMessage        {            IP = "192.168.0.1",            Message = "a test message 2",            Severity = MessageTypes.LogLevel.Info,            Tags = new Dictionary<string, string> { { "location", "CA" } }        });    producer.Flush(TimeSpan.FromSeconds(30)); }

Or with AvroDeserializer:

var consumerConfig = new Dictionary<string, object>
{
    { "group.id", Guid.NewGuid().ToString() },
    { "bootstrap.servers", bootstrapServers },
    { "schema.registry.url", schemaRegistryUrl },
    { "auto.offset.reset", "beginning" }
};

using (var consumer = new Consumer<Null, MessageTypes.LogMessage>(   consumerConfig, null, new AvroDeserializer<MessageTypes.LogMessage>())) {    consumer.OnConsumeError        += (_, error) => Console.WriteLine($"an error occured: {error.Error.Reason}");

   consumer.OnMessage        += (_, msg) => Console.WriteLine($"{msg.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")}: [{msg.Value.Severity}] {msg.Value.Message}");

   consumer.Subscribe("log-messages");

   while (true)    {        consumer.Poll(TimeSpan.FromSeconds(1));    } }

When used in the context of the consumer, the MessageTypes.LogMessage schema is referred to as the reader schema. Messages read by the consumer may contain data that was serialized using this schema or potentially any other schema (referred to as the writer schema). In the event that the reader and writer schemas are different, Avro schema resolution rules will be used to reconcile the difference. If the schemas are incompatible, the consumed message will be delivered via the OnConsumeError event rather than the OnMessage event.

With the above examples, we’ve shown how straightforward it is to use Confluent Schema Registry and Avro serialized data with your .NET applications. People just starting out with Kafka often choose to work with a simple text-based serialization format such as JSON, but as your Kafka deployment starts to grow in complexity, there is a lot to be gained from using a data format that provides strong schema evolution rules and by employing a central schema registry to manage them.

If you enjoyed this article on Confluent Schema Registry and Avro, you might also enjoy:

  • As an early employee at Confluent, Matt Howlett has worked on many of Confluent’s community and enterprise products, including Confluent Control Center, Schema Registry, REST Proxy, and client libraries in various languages. Prior to joining Confluent, Matt spent many years developing materials tracking and optimization systems for large mining companies in Australia. His first exposure to distributed systems was in the computer games industry, where he worked on the server of a massively multiplayer online game engine.

Ist dieser Blog-Beitrag interessant? Jetzt teilen