Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire

Schema Registry Clients in Action

Écrit par

There are plenty of materials available out there about Schema Registry. From Confluent alone, if you head to Confluent Developer and search “Schema Registry” you will discover an ever-growing repository of over 100 results including courses, articles, tutorials, blog posts, and more, providing comprehensive resources for enthusiasts and professionals alike.

If you are new to Schema Registry or don't know the difference between schema, schema type, subject, compatibility type, schema ID, and subject version, I would recommend starting with this free course: Schema Registry 101 by Danica Fine.

This article will show the bits and bytes of what happens behind the scenes in Apache Kafka® producer and consumer clients when communicating with the Schema Registry and serializing/deserializing messages.

We will briefly talk about setting data quality rules on the schema itself. They are crucial for maintaining high data integrity, quickly resolving data quality issues, and simplifying schema evolution. They are a part of the data contract concept in Confluent's Stream Governance solution on Confluent Cloud.

Finally, we will go through a quick demo in Python on how to use Confluent Schema Registry without Apache Kafka 😱. But don’t worry, the purpose is just to clarify that a given producer or consumer client independently communicates with at least two separate and different systems:

  1. Apache Kafka (TCP: Kafka Protocol).

  2. Schema Registry (HTTP: Schema Registry REST API).

The code examples shown in this article are all in Python, but Confluent has developed and maintains client libraries for Kafka in the following languages: Java, C/C++, .NET, and Go.

Why are the examples all in Python? Because that is the language I am most comfortable with and, in my opinion, it is easier to understand for readers not used to Java.

Schema Registry in a nutshell

Schema Registry is a component that lives outside and independently of the Kafka cluster. It is used to manage and store the schemas for the data exchanged between Kafka producers and consumers. Just to emphasize this point, Schema Registry does not store nor process messages exchanged between the producers and Kafka, it doesn't even serialize or deserialize messages. Instead, it only handles the metadata (subjects, versions, schemas, schema IDs, among other metadata).

What is a schema then? It is a defined data framework that allows producers and consumers to understand the type and structure of the data being exchanged. It is a blueprint that describes how the data should be formatted and what type of information it should contain. Essentially, it is the data contract that binds together the loosely coupled producers and consumers.

Schema Registry plays a crucial role in ensuring the quality, compatibility, and consistency of data formats within a Kafka ecosystem, especially in scenarios where different applications or services that do not communicate directly with each other may be producing and consuming data.

Schema Registry clients

The diagram below shows the four main Schema Registry clients:

  • Schema manager: Although a schema can be registered/managed by the producer clients themselves, it is good practice to have that done as part of a CI/CD pipeline, such as by using the Schema Registry Maven plugin. Using this method, the producer and consumer clients would have read-only access to the Schema Registry and hence “abide” by the data contract (schema) as defined, which will help ensure data quality and consistency.

  • Kafka cluster: Kafka brokers communicate with Schema Registry for validation through a process known as broker-side schema validation (enabled on a per-topic basis on Confluent Cloud and on dedicated clusters). This process allows the broker to verify that data produced to a Kafka topic is using a valid schema ID in Schema Registry that is registered according to the subject naming strategy. No other validation is performed, like schema structure, serialized data, etc., only the schema and subject.

  • Producer client: A producer client needs to communicate with both Schema Registry and Kafka clusters. When serializing the message, the producer will get the schema ID from the Schema Registry, serialize the message as per schema, and then produce the binary data to the Kafka cluster. The producer can also get a copy of the entire schema and its ID without the need to have that stored locally (more on that later).

  • Consumer client: A consumer client also communicates with both Schema Registry and Kafka clusters. However, it will first get the binary data from the Kafka cluster (serialized message), extract the schema ID, and based on it, get its corresponding schema from Schema Registry and only then deserialize the message.

Serializers

We previously mentioned serialization and deserialization, but what are they exactly?

Simply put, serialization is the process of writing the state of an object into a byte stream/array, and deserialization is the same, but the other way around. For example:

  • If we encode the string “my car” using the encoder UTF-16 little endian we will get the following byte array (represented in hexadecimal): \x6d\x00\x79\x00\x20\x00\x63\x00\x61\x00\x72\x00

  • The deserialization would have that converted back to the original string

  • As you have already noticed, that only works if the serializer and deserializer use the exact same encoding mechanism

  • By trying to decode that using UTF-16 big endian instead, we get the string 洀礀\u2000挀愀爀

  • Dude, where’s “my car”?

Apache Kafka has several native serializers, to name a few of them:

  • org.apache.kafka.common.serialization.StringSerializer

  • org.apache.kafka.common.serialization.IntegerSerializer

  • org.apache.kafka.common.serialization.DoubleSerializer

  • org.apache.kafka.common.serialization.BytesSerializer

You could even build your own serializer (don’t forget to build the corresponding deserializer!).

When using Confluent’s library, you have the following additional serializers and deserializers:

  • io.confluent.kafka.serializers.KafkaAvroSerializer

  • io.confluent.kafka.serializers.KafkaAvroDeserializer

  • io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

  • io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

  • io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

  • io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

When using the Confluent serializers, each serialized message will be prefixed with five bytes. Here is how data is mapped to low-level bytes:

Bytes

Area

Description

0

Magic Byte

Confluent serialization format version number. Currently always equal to 0 (zero).

1 - 4

Schema ID

Schema ID (unsigned integer, 4 bytes), as defined by the Schema Registry.

5 - …

Data

Serialized message for the specified schema format (for example, binary encoding for JSON, Avro, or Protocol Buffers).

There might be other serializer mechanisms that can have a different low-level bytes mapping. That is to say, serializing with one mechanism and deserializing with another might throw an exception or yield “Dude, where’s my car?”.

The reason to have the schema IDs added to each message is that Kafka messages do not want to be written with the entire Avro schema, otherwise, it would be a huge waste of space, and as they say, “space is money,” oh wait, isn't time? Well, you get the picture. Instead, Kafka messages are prefixed with the corresponding schema ID. The producers writing the messages and the consumers reading the messages must use the same Schema Registry to get the same mapping between a schema ID and the actual schema. If using different schema registries they should be in sync using tools such as Schema Linking.

You might be wondering: Hey, but in Confluent’s Schema Registry don’t we also have the schema version? Where does that fit in? Well, the subject version is an important piece of information so one can keep track of the evolution and history of the schema and know the latest version/ID of a given subject. So, when a consumer is deserializing a given message all that is needed is just the schema ID.

Serializing a message using AVRO

Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers. However, we will go through an example using the Avro framework. Schema Registry works in exactly the same way for any schema type. 

For example, let’s say that the following message needs to be Avro serialized:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random": 3122
}

The Avro schema, in that case, was registered as shown below and the Schema Registry has, for example, set the schema ID 100114 to it:

{
  "namespace": "pydatagen",
  "name": "demo",
  "type": "record",
  "fields":
    [
        {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
                }
            }
        },
        {
            "name": "user_id",
            "type": "string"
        },
        {
            "name": "first_name",
            "type": "string"
        },
        {
            "name": "last_name",
            "type": "string"
        },
        {
            "name": "gender",
            "type": "string"
        },
        {
            "name": "random",
            "type": "int"
        }
    ]
}

The serialized message will look like this (in binary):

\x00\x00\x01\x87\x12\xa4\x8d\xe6\xa9\xa0c\x0eUser_50\x12Carmelita\x08Wood\x0cFemale\xe40

Wow! So if we had string serialized the original message we would have 134 bytes, but the Avro serialized one has only 43 bytes? Was my message compressed? Technically no, it was just compacted. To get it back to what it was, as you realized, we will need to know the schema. Without it, we could kind of figure out the values, but not their corresponding keys/structure.

Oh! By the way, what does a Lancaster Bomber have to do with Avro? I will let you find that out. You will get a bonus point if you answer that one.

Coming back to the serialized message, let’s analyze the first five bytes of it:

  • The first \x00 (magic byte): 0, unfortunately not as magic as my favorite basketball legends Earvin “Magic” Johnson Jr. and Maria Paula Gonçalves da Silva.

  • \x00\x01\x87\x12 (schema ID): That specific one equates to 100114. See below how to get the schema ID using Python:

    >>> int.from_bytes(b"\x00\x01\x87\x12", "big")
    100114
    

The remaining bytes (\xa4 onward) are the actual payload but Avro serialized.

Producer client in action

A producer is a client or application that publishes messages, also known as records, to one or more Kafka topics. These messages are then consumed by one or more consumers that are subscribed to the topic(s).

A producer, when serializing the message using schemas, will communicate with both the Schema Registry cluster (HTTP) and the Kafka cluster (TCP).

That means each producer will have two client instances, one to each cluster. In Python and using Avro it looks like this (see the complete example at confluent-kafka-python/avro_producer.py, the code extract below only shows the most important parts for the purpose of this exercise).

Kafka client instance

from confluent_kafka import Producer
...
producer_conf = {"bootstrap.servers": "localhost:9092"}
producer = Producer(producer_conf)
Note:
There are several producer configuration options, for the full list of options please refer to Kafka Producer Configurations and confluent-kafka-python's producer.

Schema Registry client instance

from confluent_kafka.schema_registry import SchemaRegistryClient
...
schema_registry_conf = {
   "url": "http://localhost:8081"
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
Note:
For the list of Schema Registry configuration options please refer to confluent-kafka-python's SchemaRegistryClient.

That will create the client instance, however, no communication with Schema Registry will occur just yet.

Serializer object

The next step is to create the serializer object:

from confluent_kafka.schema_registry.avro import AvroSerializer
...
schema_str = "..."  # as per example above, corresponding to schema ID 100114
avro_serializer = AvroSerializer(
   schema_registry_client,
   schema_str,
   conf={
      "auto.register.schemas": True,
      "normalize.schemas": False,
      "use.latest.version": False
   }
)

The serializer object is used to output the Avro binary encoded data with Confluent Schema Registry framing. The configuration options are listed in the table below:

Property Name

Type

Description

auto.register.schemas

bool

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

normalize.schemas

bool

Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.

Defaults to False.

use.latest.version

bool

Whether to use the latest subject version for serialization.

Warning: There is no check that the latest schema is backwards compatible with the object being serialized.

Defaults to False.

subject.name.strategy

callable

Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.

Defaults to topic_subject_name_strategy.

Now our producer client is ready to serialize the messages and have them produced to the Kafka cluster:

from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
...
topic_name = "demo_users"
message = "{...}"  # as per example above (134 bytes JSON)
producer.produce(
   topic=topic_name,
   key=StringSerializer("your_key_here"),  # or simply None if you don’t need a key
   value=avro_serializer(
      message,
      SerializationContext(
         topic_name,
         MessageField.VALUE,
      )
   )
)

Now the serializer object (avro_serializer) will send a POST request to Schema Registry to try to register the schema in case it doesn’t exist (as auto.register.schemas was set to True) and get the corresponding schema ID (see example below). That is not a recommended configuration in production environments, ideally schemas should be managed via a CI/CD pipeline and the producer clients set with read-only access to Schema Registry (role DeveloperRead).

POST http://localhost:8081/subjects/demo_users-value/versions?normalize=False
Body {"schema": "{\"namespace\": \"pydatagen\", \"name\": \"demo\", \"type\": \"record\", \"fields\": [{\"name\": \"timestamp\", \"type\":{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}, {\"name\": \"user_id\", \"type\": {\"type\": \"string\"}}, {\"name\": \"first_name\", \"type\": {\"type\": \"string\"}}, {\"name\": \"last_name\", \"type\": {\"type\": \"string\"}}, {\"name\": \"gender\", \"type\": {\"type\": \"string\"}}, {\"name\": \"random\", \"type\": {\"type\": \"int\"}}]}"}

The following was the response:

200 OK
Content {"id": 100114}

Having the schema ID, all that is left to do is to serialize the message and prefix the magic byte (0) and schema ID (100114).

So the message:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random": 3122
}

Gets serialized as:

\x00\x00\x01\x87\x12\xa4\x8d\xe6\xa9\xa0c\x0eUser_50\x12Carmelita\x08Wood\x0cFemale\xe40

The next time that same producer client instance needs to serialize another message it will not submit a new POST request to the Schema Registry as both the schema and schema ID  were cached by the client. A new HTTP request would only happen if the producer client instance is restarted, or if it is serializing a message to another topic and/or subject, or if the schema changes.

One important thing to notice is that the serializer object had the configuration use.latest.version set as False. If we had instead set it to True (auto.register.schemas would need to be set to False) then it would submit a GET instead of a POST to fetch the latest schema available for the subject demo_users-value. By doing so, rather of deriving a schema for the string schema (schema_str) passed when creating the serializer object (avro_serializer), it would use the latest version of the schema in the subject, for example:

GET http://localhost:8081/subjects/demo_users-value/versions/latest

Yielding the response:

200 OK
Content {
   "subject": "demo_users-value",
   "version": 1,
   "id": 100114,
   "schema": "{\"type\": \"record\", \"name\": \"demo\", \"namespace\": \"pydatagen\", \"fields\" :[{\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}, {\"name\": \"user_id\", \"type\": \"string\"}, {\"name\": \"first_name\", \"type\": \"string\"}, {\"name\": \"last_name\", \"type\": \"string\"}, {\"name\": \"gender\", \"type\": \"string\"}, {\"name\": \"random\", \"type\":\"int\"}]}"
}

The schema and schema ID would also be equally cached by the producer client instance.

Consumer client in action

A consumer is a component responsible for subscribing to topics and processing the stream of messages published to those topics by producers. Consumers continuously poll Kafka brokers for new messages in the subscribed topics, maintaining an offset to keep track of their progress in message processing. Multiple consumer instances can form a consumer group, enabling parallelization of message consumption.

Just like its sibling producer (that it will never get to meet face to face, unfortunately), a consumer, when deserializing the message using schemas, will communicate with both clusters, but with the Kafka cluster first.

Consumers will also have two client instances, one for each cluster. In Python and using Avro it looks like this (see the complete example on GitHub):

Kafka client instance

from confluent_kafka import Consumer
...
consumer_conf = {
   "bootstrap.servers": "localhost:9092",
   "group.id": "my_group_id",
   "auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_conf)
Note:
There are several consumer configuration options, for the full list of options please refer to Kafka Consumer Configurations and confluent-kafka-python's consumer.

 

Schema Registry client instance

This instance has no difference from the producer instance:

from confluent_kafka.schema_registry import SchemaRegistryClient
...
schema_registry_conf = {
   "url": "http://localhost:8081"
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

Serializer object

The next step is to create the deserializer object:

from confluent_kafka.schema_registry.avro import AvroDeserializer
...
avro_deserializer = AvroDeSerializer(schema_registry_client)

The last step is to consume the messages from the Kafka broker and have them deserialized accordingly:

from confluent_kafka.serialization import SerializationContext, MessageField
...
topic_name = "demo_users"
consumer.subscribe([topic_name])
    while True:
        try:
            msg = consumer.poll(1.0)
            if msg is not None:
               message = avro_deserializer(
                  msg.value(),
                  SerializationContext(
                     msg.topic(),
                     MessageField.VALUE)
                  )
        except KeyboardInterrupt:
            break
    consumer.close()

When calling the deserializer object (avro_deserializer) it will submit a GET request to Schema Registry to get the schema based on the schema ID (bytes 1 to 4 of the serialized message), in our example 100114:

\x00\x00\x01\x87\x12\xa4\x8d\xe6\xa9\xa0c\x0eUser_50\x12Carmelita\x08Wood\x0cFemale\xe40

GET http://localhost:8081/schemas/ids/100114

Obtaining the response:

200 OK
Content {"schema": "{\"type\": \"record\", \"name\": \"demo\", \"namespace\": \"pydatagen\", \"fields\" :[{\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}, {\"name\": \"user_id\", \"type\": \"string\"}, {\"name\": \"first_name\", \"type\": \"string\"}, {\"name\": \"last_name\", \"type\": \"string\"}, {\"name\": \"gender\", \"type\": \"string\"}, {\"name\": \"random\", \"type\":\"int\"}]}"
}

So, the message gets deserialized as:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random": 3122
}

Boom! That was the original message generated by the producer.

So, let me get this right: The consumer didn’t need to have the schema saved locally or hard coded in its application? First of all, let’s join the StHCMfAiP (Stop the Hard Code Movement for Applications in Production!) and second, ideally yes, there is no need to have the schema saved locally with the consumer application as the message can be deserialized just based on the schema ID. However, there might be cases where the consumer wants to use a different version of the schema (if compatible) or just wants to rename a given field of the payload, for example:

from confluent_kafka.schema_registry.avro import AvroDeserializer
...
schema_str = '{"type": "record", "name": "demo", "namespace": "pydatagen", "fields" :[{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}, {"name": "user_id", "type": "string"}, {"name": "first_name", "type": "string"}, {"name": "last_name", "type": "string"}, {"name": "gender", "type": "string"}, {"name": "random_new", "type":"int", "aliases": ["random"]}]}'
avro_deserializer = AvroDeSerializer(
   schema_registry_client,
   schema_str=schema_str,
)

Notice the new argument on the deserializer object (schema_str, optional) where the field random is being renamed to random_new using aliases. The deserialized message would look like this:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random_new": 3122
}

Just to avoid angry mobs, no StHCMfAiP rules were broken here as that was just an example, never to be used in production 🙃. On a serious note, to ensure adaptability and maintain loose coupling in system development, it is imperative that any alternative schema utilized by a consumer be stored in Schema Registry. Relying on locally saved schemas within individual consumers risks introducing hard-coded dependencies and undermines the flexibility that a centralized Schema Registry provides.

Data quality rules

Data quality rules on the schema are used to prove the value of a single field based on a boolean predicate. They can trigger actions on failure or success of the rule, thus providing a mechanism to enforce the semantics of the data and further constrain the values of those fields. This helps in catching and fixing issues at the point of ingestion, minimizing the chances of poor quality data working its way downstream.

These rules can be attached to schemas in Confluent Schema Registry, which then acts as a gatekeeper of schemas as well as data quality and controls. This is a significant advancement in Confluent Schema Registry, extending its functionality beyond the grammar-based nature of schema languages such as Avro and Protobuf.

Moreover, data quality rules can be used to create domain validation and Event-Condition-Action (ECA) rules, transformation rules, and complex schema migration rules. These rules help in validating and constraining the values of fields, changing the value of a specific field or an entire message, and evolving a schema in an incompatible manner by applying transformations when consuming from a topic.

This article, Connect, Process, and Share Trusted Data Faster Than Ever: Kora Engine, Data Quality Rules, and More, written by David Araujo and Bharath Venkat, is an excellent resource for delving into the subject in more depth.

Schema Registry demo

As previously mentioned, Schema Registry and Apache Kafka clusters are two different and separate systems. Although there is no mandate to use Apache Kafka with Schema Registry, it is a highly recommended best practice. It is essential for maintaining data compatibility and fostering interoperability within distributed systems. This not only facilitates real-time data integration but also enhances system resilience, as changes can be efficiently tracked, versioned, and validated, ultimately promoting agility and scalability in Kafka-based architectures.

To demonstrate that Schema Registry is indeed a separate system, we have a demo that is available at this GitHub repository. Basically, the script avro_ser.py generates random messages, has them Avro serialized, and saves them to local files. No message (other than the Schema Registry metadata) will be stored in Apache Kafka.

usage: avro_ser.py [-h] [--qty RECORDS] [--schema SCHEMA] [--stats] [--print] [--save] [--config CONFIG]
AVRO serialiser
options:
  -h, --help       show this help message and exit
  --qty RECORDS    Quantity of input records to be randomized (based on the Avro schema)
  --config CONFIG  Configuration file to access the Schema Registry cluster (default 'config/test.ini')
  --schema SCHEMA  Avro schema file path
  --stats          Display statistics
  --print          Print messages in the console
  --save           Save serialized data to folder 'data/'

You can then have them deserialized using the script avro_deser.py.

usage: avro_deser.py [-h] [--config CONFIG] [--schema SCHEMA]
AVRO deserialiser
options:
  -h, --help       show this help message and exit
  --config CONFIG  Configuration file to access the Schema Registry cluster (default 'config/example.ini')
  --schema SCHEMA  avro Schema file path

It will require Confluent Schema Registry, but in that repository you can run it with Docker Compose (see file docker-compose.yml).

To execute a simple demo just execute the shell script ./demo.sh. Below is an example of the output you might get:

Avro weather schema:
{
    "name": "Weather",
    "namespace": "com.example",
    "doc": "My weather station",
    "type": "record",
    "fields": [
        {
            "name": "station",
            "type": "string"
        },
        {
            "name": "station_id",
            "type": "long"
        },
        {
            "name": "timestamp",
            "type": "long",
            "logicalType": "timestamp-millis"
        },
        {
            "name": "temp",
            "type": "double"
        },
        {
            "name": "active",
            "type": "boolean"
        }
    ]
}

Press any key to see 1 random input message(s) based on the Avro weather schema and stats...

{"station": "QGgXFAxZ", "station_id": 92918047, "timestamp": 1705311845943, "temp": 749.51, "active": false}

Avro: b'\x00\x00\x00\x00\x01\x10QGgXFAxZ\xbe\xc4\xceX\xee\xf8\x8d\xc8\xa1c\xaeG\xe1z\x14l\x87@\x00'
Protobuf: b'\x00\x00\x00\x00\x02\x00\n\x08QGgXFAxZ\x10\x9f\xa2\xa7,\x18\xb7\xfc\x86\xe4\xd01!\xaeG\xe1z\x14l\x87@'

Record(s) Avro serialized: 1

Input records:
- Total: 108 bytes
- Average per record: 108.00 bytes

Avro serialized/encoded records:
- Total: 33 bytes
  > Compress ratio: 69.44%

Protobuf serialized/encoded records:
- Total: 37 bytes
  > Compress ratio: 65.74%

The serialized messages will be saved into the folder ./data:

  • weather-avro-<EPOCH_time>.bin: Serialized message

  • weather-avro-<EPOCH_time>.json: Original message

To deserialize the messages run the script:

python3 avro_deser.py --config config/example.ini

For example:

File: data/weather.avro-1705312578851590.bin
b'\x00\x00\x00\x00\x01\x10QGgXFAxZ\xbe\xc4\xceX\xee\xf8\x8d\xc8\xa1c\xaeG\xe1z\x14l\x87@\x00'
{"station": "QGgXFAxZ", "station_id": 92918047, "timestamp": 1705311845943, "temp": 749.51, "active": false}

After that head to Confluent Control Center (http://localhost:9021) and see the messages in an internal topic called _schemas. You will find two messages, one to each schema created (Avro and Protobuf). If you add more schemas or change the existing ones additional messages will be published to that topic.

Conclusion

Integrating Schema Registry with Apache Kafka offers a powerful advantage to your data streaming ecosystem. This centralized schema management system ensures seamless data evolution, compatibility, and governance across distributed components. By enforcing versioning and facilitating efficient error detection, Schema Registry enhances collaboration, enabling teams to work independently on evolving schemas while maintaining backward and forward compatibility. The result is a robust and agile architecture that optimizes storage, network usage, and data integrity, making Schema Registry an indispensable tool for ensuring the reliability and scalability of your Kafka-based applications.

Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers. It is a key component of Stream Governance, available in the Essentials and Advanced Stream Governance packages in Confluent Cloud, and is integrated with the rest of the Confluent ecosystem.

Schema Registry in Confluent Cloud is a fully managed service with a per-environment, hosted Schema Registry. It allows you to compare schema versions, change subject level compatibility mode of a schema, search for schemas and fields, tag schemas and fields, and manage schemas for an environment. It also provides access control (RBAC) for Confluent Cloud Schema Registry.

This article explained how producer/consumer applications work symbiotically with Schema Registry and Apache Kafka clusters and how they cache information in memory locally to help improve overall efficiency. Where each application will have a different client connection to these systems. Also, not necessarily producer/consumer applications need to have schema saved locally with the application, as a matter of fact, it doesn’t even need to know about it. As long as the payload matches the schema the applications don’t need to care about that as the Confluent client libraries will take care of that for you when serializing and deserializing the messages.

We also briefly touched upon data quality rules and how they can significantly enhance the quality of data streams that power your business, making them a valuable tool in any data-driven organization.

Have a go yourself, play with the Confluent Schema Registry, and don’t hesitate to reach out to us should you want to take your data streaming applications, and business, to the next level. You can get started with Schema Registry with the steps below:

  • Italo Nesi is a Sr. Solutions Engineer at Confluent, bringing a wealth of over 30 years of experience in various roles such as software engineer, solutions engineer/architect, pre-sales engineer, full stack developer, IoT developer/architect, and a passionate home automation hobbyist. He possesses a strong penchant for building innovative solutions rather than starting from scratch, leveraging existing tools and technologies to deliver efficient and effective results for the core business. His expertise lies in combining his technical prowess with a practical approach, ensuring optimal outcomes while avoiding unnecessary reinvention of the wheel. He holds a bachelor’s degree in electronics engineering from the Federal University of Rio Grande do Norte/Brazil, an MBA from the Federal University of Rio de Janeiro/Brazil (COPPEAD), and an executive master’s degree in International Supply Chain Management from Université Catholique de Louvain/Belgium.

Avez-vous aimé cet article de blog ? Partagez-le !