Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Best Practices for Confluent Schema Registry

作成者 :

In this article, we present some best practices and key concepts for using Confluent Schema Registry.

Understand schema IDs

When using Confluent Schema Registry, producers and consumers interoperate via schema IDs. When sending a message, the producer will look for the schema of the message in Schema Registry. If the schema is found, it is cached for future use. After serializing a message with a schema, the producer will prepend the ID of the schema to the payload.  

When reading from Apache Kafka®, the consumer will look for the schema ID at the beginning of the payload, and then ask Schema Registry for the corresponding schema, which is cached and then used to deserialize the message.

Source: Schema Registry Concepts documentation

Understand subjects and versions

A subject is a named, ordered history of schema versions. A schema can be identified in two ways: either by schema ID or by a subject and version pair. While a schema will always have a unique ID, it may be associated with one or more subject-version pairs, depending on the subject(s) to which the schema was registered. For example, if you register an Order schema to the order-value subject, it will be assigned an ID, say 123, and a version, say 1. It can then be identified by ID 123 or by subject-version pair (order-value, 1). Later if you register the exact same schema to a different subject, say order-key, it will retain the same ID 123, but can now be identified by an additional subject-version pair, such as (order-key, 2).

Within a subject, versions increase by 1 whenever a new schema is registered to the subject. The history of subject versions is also referred to as schema evolution. Typically, one subject is used for message keys and another subject for message values.

Understand data contracts

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. A schema is only one element of a data contract. Confluent Schema Registry supports the following aspects of a data contract:

  • Structure: This is the part of the contract that is covered by the schema, which defines the fields and their types.

  • Integrity constraints: This includes declarative constraints or data quality rules on the domain values of fields, such as the constraint that age must be a positive integer.

  • Metadata: Metadata is additional information about the schema or its constituent parts, such as whether a field contains sensitive information. Metadata can also include documentation for a data contract, such as who created it.

  • Rules or policies: These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue.

  • Change or evolution: This implies that data contracts are versioned, and can support declarative schema migration rules for how to transform data from one version to another so that even changes that would normally break downstream components can be easily accommodated.

Schema migration rules and data quality rules will be discussed later in this article.

Pre-register schemas

By default, producers will try to auto-register schemas. This is convenient in development environments when first trying to establish a working end-to-end scenario that uses schemas. However, in a production environment, it's best to use a more controlled mechanism for deploying schemas, such as a CI/CD pipeline. The Schema Registry Maven Plugin is a popular option when using a CI/CD pipeline. REST APIs can also be used directly to pre-register schemas.

To disable auto-registration for producers, use the configuration auto.register.schemas=false.

Use schema normalization

When Confluent Schema Registry receives a schema for either registration or lookup, it can convert the schema to normalized form. If normalization is not in effect, the schema will be reformatted slightly, such as by removing extra whitespace, but otherwise, its syntax will largely remain the same. In general, it is strongly recommended to enable normalization.  Normalization will allow two schemas that are semantically the same, but differ in syntax, to be treated as the same schema. This aids interoperability for clients in different programming languages, especially when the schemas that are presented to Schema Registry have syntactic differences. Some of the syntactic differences that are handled by normalization include the following:

  • The ordering of properties in JSON Schema

  • The ordering of imports and options in Protobuf

  • The ordering of schema references

  • Non-qualified names vs. fully-qualified names

Normalization can be enabled in several ways:

  1. Using normalize.schemas=true in the configuration for producers

  2. At the subject level using PUT /config/{subject} or globally using PUT /config with a payload of { "normalize": true }

  3. Using <normalizeSchemas>true</normalizeSchemas> with the schema-registry:register goal of the Schema Registry Maven Plugin.

Prefer the topic-based subject name strategy

When registering or looking up the schema ID for a schema, the producer tells Schema Registry which subject to use. By default, the producer will use a topic-based subject name strategy (called TopicNameStrategy), where the subject name is the topic name suffixed with either "-key" or "-value", depending on if the payload corresponds to the key or value of the message.  This means that messages in the topic will all use the same pair of subjects, one for key and one for value, or one subject if only the value has a schema.

The above scenario works well when only storing one type in a topic, such as a customer type, where the schemas reside in a "customer-value" subject. However, sometimes you may want to store more than one type in a topic, especially when you need to maintain the ordering of events for different types.

In the past, one solution was to use a record-based subject name strategy (either RecordNameStrategy or TopicRecordNameStrategy), where the schema to be used resides in a subject that has the same name as the message or record type as it appears in the schema, such as "Customer" or "Order". Unfortunately, this allows an unbounded number of types to be stored in a Kafka topic, which makes it impossible to use with streaming platforms such as ksqlDB and Apache Flink®.

A better alternative, that allows for a bounded and known set of types to be stored in a Kafka topic, is to use the topic-based name strategy with union types, where the union explicitly states which types can be stored in the topic. Avro, JSON Schema, and Protobuf all support the concept of a union. The union type is called a oneof in JSON Schema and Protobuf. See this article for examples.

Use schema composition

When using union types, it's best to make use of Schema Registry's support for schema composition, which uses a feature called schema references. Rather than having a union where each member of the union specifies its definition directly in the union, the union can specify its members using references to other schemas. For example, below is an Avro union containing three schema reference names. 

[
  "io.confluent.examples.avro.Customer",
  "io.confluent.examples.avro.Product",
  "io.confluent.examples.avro.Order"
]

When registering the above schema to Schema Registry, an array of schema references is also sent, which might look like the following:

[
  { 
    "name": "io.confluent.examples.avro.Customer",
    "subject": "customer-value",
    "version": 1
  },
  {
    "name": "io.confluent.examples.avro.Product",
    "subject": "product-value",
    "version": 1
  },
  {
    "name": "io.confluent.examples.avro.Order",
    "subject": "order-value",
    "version": 1
  }
]

Schema composition using schema references facilitates reuse and the DRY (don't repeat yourself) principle.

Use subject aliases

Note that above we specified the subject names using the convention of the topic-based subject name strategy, with "customer-value", "product-value", and "order-value". However, you may have some scenarios where the subject name does not follow the topic-based subject name strategy. This is the case when using auto-registration with the Protobuf serializer, where references may have been registered using the import names. This would also be the case if one had previously used a record-based subject name strategy but now wanted to switch to a topic-based subject name strategy.

Subject aliases can be used to allow subjects that were previously created without the "-key" or "-value" suffix to be used with the topic-based subject name strategy. For example, if schemas had been registered under the subject "io.confluent.examples.avro.Customer", we can create an alias of "customer-value" to that subject using the following REST command:

PUT /config/customer-value
{
  "alias": "io.confluent.examples.avro.Customer"
}

Understand compatibility levels

Three levels of compatibility are supported by Schema Registry:

  1. Backward compatibility: All messages that conform to the previous version of the schema are also valid according to the new version

  2. Forward compatibility: All messages that conform to the new version are also valid according to the previous version of the schema

  3. Full compatibility: The previous version of the schema and the new version are both backward compatible and forward compatible

A forward compatibility check can also be thought of as a backward compatibility check with the arguments switched. This implies that a full compatibility check is both a backward compatibility check of the new schema against the old schema and a backward compatibility check of the old schema against the new schema.

When registering a new schema version, compatibility checks can be performed against the previous version, or against all versions. In the latter case, the checks are said to be performed transitively.

The guarantee provided by a compatibility level can be thought of as a safety property. It tries to establish that nothing bad will happen to a client application, depending on how the schema (and therefore the data) evolves over time.

Compatibility checks know nothing of what actual data exists in a system, so may appear more strict than necessary. For example, in JSON Schema, a schema that allows additional properties in the payload that are not defined in the schema is referred to as an open content model. Adding a new property definition to an open content model is a backward incompatible change. That's because undefined properties that appear in old data may conflict with the new property definition, such as having a different type. However, you may not have any data with undefined properties. To get around this issue, the compatibility level can be temporarily set to NONE while the new schema is registered to the subject. Alternatively, compatibility groups can be used, which are explained later.

Prefer backward compatibility

Backward compatibility, which is the default setting, should be preferred over the other compatibility levels. Backward compatibility allows a consumer to read old messages. Kafka Streams requires at least backward compatibility, because it may need to read old messages in the changelog topic.

Full compatibility also allows reading older messages but can be overly restrictive, as a full compatibility check is a backward compatibility check of the new schema against the old schema, as well as the old schema against the new schema.

Some argue that full compatibility should be preferred because it allows both new consumers to use old schemas (via backward compatibility) and old consumers to use new schemas (via forward compatibility). With backward compatibility, consumers need to be upgraded before producers, while with forward compatibility, producers need to be upgraded before consumers.  Full compatibility allows you to upgrade (or downgrade) consumers and producers independently in any order. However, the actual rules for full compatibility make it restrictive and difficult to use in practice, due to it being a backward compatibility check in both directions.  

In general, working with backward compatibility is a more natural means of evolving a system.  When making backward compatible changes, we first upgrade consumers to handle the changes, then we upgrade producers. This allows both producers and consumers to evolve over time. When making forward-compatible changes, we first upgrade the producers, but we can choose to never upgrade consumers! This means that the only changes that we can make for forward compatibility are those that won't break existing consumers, which is fairly restrictive.

A better alternative to the use of full compatibility is to use schema migration rules, as described later, which also allows new consumers to use old schemas, and old consumers to use new schemas. Schema migration rules also allow consumers and producers to be upgraded (or downgraded) independently. Furthermore, schema migration rules can handle many more scenarios than a full compatibility setting and were designed for arbitrarily complex schema evolutions, including ones that would normally be breaking changes.

If a schema evolves more than once, you can use transitive backward compatibility to ensure that old messages corresponding to the different schemas can be read by the same consumer.

Understand compatibility groups

Within a subject, typically each schema is compatible with the previous schema. If you want to introduce a breaking change, similar to bumping a major version when using semantic versioning, then you can add a metadata property to your schema as follows:

{
  "schema": "...",
  "metadata": {
    "properties": {
      "major_version": "2"
    }
  },
  "ruleSet": ...
}

The name “major_version” above is arbitrary, you could have called it “application.major.version” for example.

You can then specify that a consumer uses only the latest schema of a specific major version.

use.latest.with.metadata=major_version=2
latest.cache.ttl.sec=300

The above example also specifies that the client should check for a new latest version every five minutes. This TTL configuration can also be used with the use.latest.version=true configuration.

Finally, we can configure Schema Registry to only perform compatibility checks for schemas that share the same compatibility group, where the schemas are partitioned by "major_version":

PUT /config/{subject}
{
  "compatibilityGroup": "major_version"
}

Use schema migration rules

Once a subject uses compatibility groups to accommodate breaking changes in the version history, we can add schema migration rules so that old consumers can read messages using new schemas, and new consumers can read messages using old schemas. As mentioned, schema migration rules can handle many more scenarios than a full compatibility setting. Below is a set of rules using JSONata to handle changing a "state" field to "status". The UPGRADE rule allows new consumers to transform the "state" field to "status", while the DOWNGRADE rule allows old consumers to transform the "status" field to "state". This means that both old and new producers can continue producing data, and both old and new consumers will see the data in their desired format. Furthermore, producers and consumers can be upgraded or downgraded independently.

{
  "ruleSet": {
    "domainRules": [
      ...
    ],
    "migrationRules": [
      {
        "name": "changeStateToStatus",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "UPGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'state'}), {'status': $.'state'}])"
      },
      {
        "name": "changeStatusToState",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "DOWNGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'status'}), {'state': $.'status'}])"
      }
    ]
  }
}

The following video shows how an old producer and a new producer can both simultaneously interoperate with an old consumer and a new consumer, allowing producers and consumers to be both upgraded (or downgraded) independently, even with a normally incompatible change.

Use data quality rules

As part of the effort to shift the responsibility for data quality to the producer, an activity often referred to as shift-left, Schema Registry now allows developers to specify data quality rules along with the schema as part of a data contract.

For example, here is a rule using the Google Common Expression Language to ensure that the "totalPrice" field contains a positive value:

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPrice > 0"
      }
    ]
  }
}

The following video shows how to use a data quality rule in conjunction with a dead letter queue to receive invalid messages.

Understand schema deletion

Schemas are intended to be immutable. This is because messages can be stored for an arbitrarily long time, so consumers in the future need to know exactly what schema was used by the producer at the time of serialization. This also explains why a new schema version is created instead of overwriting the previous version, whenever a schema is registered to a subject.

A schema can be soft-deleted, which has the effect of making it unavailable for producers when serializing messages. However, it is still available for consumers to deserialize messages.  

A schema can also be hard-deleted, which removes it completely from Schema Registry, making it unavailable for both producers and consumers. Hard-deleting a schema is an irreversible operation that should only be done if you are absolutely certain that you no longer need the schema to deserialize messages.

Don't mutate schemas

Even though schemas are intended to be immutable, you can mutate a schema if absolutely necessary. For example, you may want to add a metadata property, such as "major_version", to an existing schema. Or you may want to restore a hard-deleted schema. However, it's not recommended to mutate schemas, as if not done correctly, you could easily corrupt your schemas, causing your clients to fail.

If you find that you have no other option, first you would need to hard delete the schema that you want to overwrite. If the schema exists in more than one subject, you'll need to hard delete each subject-version pair. Then you would force a subject into IMPORT mode using the following command:

PUT /mode/{subject}?force=true
{
  "mode": "IMPORT"
}

Next, you would register your new schema to Schema Registry, while specifying the exact schema ID and version to use.  

POST /subjects/{subject}/versions
{
  "id": {id},
  "version" {version},
  "schema": "...",
  "schemaType": "...",
  "references": ["..."]
}

Finally, you may need to restart any clients to pick up the mutated schema, since clients don't expect schemas to change and therefore cache them for reuse.

However, since this article is about best practices, hopefully you won't have a reason to mutate your schemas.

If you’d like to learn more about Schema Registry, check out the course on Confluent Developer.

  • Robert Yokota is a software engineer at Confluent, currently working in the area of data governance. He previously worked at Microsoft.

このブログ記事は気に入りましたか?今すぐ共有