[Webinar] How to Protect Sensitive Data with CSFLE | Register Today
Kafka Connect is part of Apache Kafka®, providing streaming integration between data stores and Kafka. For data engineers, it just requires JSON configuration files to use. There are connectors for common (and not-so-common) data stores out there already, including JDBC, Elasticsearch, IBM MQ, S3 and BigQuery, to name but a few.
For developers, Kafka Connect has a rich API in which additional connectors can be developed if required. In addition to this, it also has a REST API for configuration and management of connectors.
Kafka Connect is modular in nature, providing a very powerful way of handling integration requirements. Some key components include:
One of the more frequent sources of mistakes and misunderstanding around Kafka Connect involves the serialization of data, which Kafka Connect handles using converters. Let’s take a good look at how these work, and illustrate some of the common issues encountered.
Kafka messages are organized into topics. Each message is a key/value, but that is all that Kafka requires. Both key and value are just bytes when they are stored in Kafka. This makes Kafka applicable to a wide range of use cases, but it also means that developers have the responsibility of deciding how to serialize the data.
In configuring Kafka Connect, one of the key things to standardize on is the serialization format. You need to make sure that anyone reading from the topic is using the same serialization format as those writing to the topic. Otherwise, confusion and errors will ensue!
There are various serialization formats with common ones including:
There are advantages and disadvantages to each of these—well, except delimited, in which case it’s only disadvantages 😉
Some guiding principles for choosing a serialization format include:
No, not at all. The format in which you’re reading data from a source, or writing it to an external data store, doesn’t need to have a bearing on the format you use to serialize your messages in Kafka.
A connector in Kafka Connect is responsible for taking the data from the source data store (for example, a database) and passing it as an internal representation of the data to the converter. Kafka Connect’s converters then serialize this source data object onto the topic.
The same happens in reverse when using Kafka Connect as a sink—the converter deserializes the data from the topic into this internal representation, which is passed to the connector to write to the target data store using the appropriate method specific to the target.
What this means is that you can have data on a topic in Avro (for example), and when you come to write it to HDFS (for example), you simply specify that you want the sink connector to use that format.
Kafka Connect takes a default converter configuration at the worker level, and it can also be overridden per connector. Since using the same serialization format throughout your pipelines is generally a good idea, you’ll often just set the converter at the worker, and never need to specify it in a connector. But maybe you’re pulling data from someone else’s topic and they’ve decided to use a different serialization format—in that case you’d set this in the connector configuration. Even though you override it in the connector’s configuration, it’s still the converter that performs the work.
Connectors that are written properly never [de]serialize the messages stored in Kafka, and always let the configured converter do that work.
Remember, Kafka messages are just pairs of key/value bytes, and you need to specify the converter for both keys and value, using the key.converter and value.converter configuration setting. In some situations, you may use different converters for the key and the value.
Here’s an example of using the String converter. Since it’s just a string, there’s no schema to the data, and thus it’s not so useful to use for the value:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
Some converters have additional configuration. For Avro, you need to specify the Schema Registry. For JSON, you need to specify if you want Kafka Connect to embed the schema in the JSON itself. When you specify converter-specific configurations, always use the key.converter. or value.converter. prefix. For example, to use Avro for the message payload, you’d specify the following:
"value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081",
Common converters include:
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
org.apache.kafka.connect.converters.ByteArrayConverter
Whilst JSON does not by default support carrying a schema, Kafka Connect supports two ways that you can still have a declared schema and use JSON. The first is to use JSON Schema with the Confluent Schema Registry. If you cannot use the Schema Registry then your second (less optimal option) is to use Kafka Connect’s support of a particular structure of JSON in which the schema is embedded. The resulting data size can get large as the schema is included in every single message along with the schema.
If you’re setting up a Kafka Connect source and want Kafka Connect to include the schema in the message it writes to Kafka, you’d set:
value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true
The resulting message to Kafka would look like the example below, with schema and payload top-level elements in the JSON:
{ "schema": { "type": "struct", "fields": [ { "type": "int64", "optional": false, "field": "registertime" }, { "type": "string", "optional": false, "field": "userid" }, { "type": "string", "optional": false, "field": "regionid" }, { "type": "string", "optional": false, "field": "gender" } ], "optional": false, "name": "ksql.users" }, "payload": { "registertime": 1493819497170, "userid": "User_1", "regionid": "Region_5", "gender": "MALE" } }
Note the size of the message, as well as the proportion of it that is made up of the payload vs. the schema. Considering that this is repeated in every message, you can see why a serialisation format like JSON Schema or Avro makes a lot of sense, as the schema is stored separately and the message holds just the payload (and is compressed at that).
If you’re consuming JSON data from a Kafka topic into a Kafka Connect sink, you need to understand how the JSON was serialised. If it was with JSON Schema serialiser, then you need to set Kafka Connect to use the JSON Schema converter (io.confluent.connect.json.JsonSchemaConverter). If the JSON data was written as a plain string, then you need to determine if the data includes a nested schema. If it does—and it’s in the same format as above, not some arbitrary schema-inclusion format—then you’d set:
value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true
However, if you’re consuming JSON data and it doesn’t have the schema/payload construct, such as this sample:
{ "registertime": 1489869013625, "userid": "User_1", "regionid": "Region_2", "gender": "OTHER" }
…you must tell Kafka Connect not to look for a schema by setting schemas.enable=false:
value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false
As before, remember that the converter configuration option (here, schemas.enable) needs the prefix of key.converter or value.converter as appropriate.
Here are some of the common errors you can get if you misconfigure the converters in Kafka Connect. These will show up in the sinks you configure for Kafka Connect, as it’s this point at which you’ll be trying to deserialize the messages already stored in Kafka. Converter problems tend not to occur in sources because it’s in the source that the serialization is set. Each of these will cause the connector to fail, with a headline error of:
ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)
After this error, you’ll see a further stack trace describing exactly why it erred. Be aware that the above error will be thrown for any fatal error in a connector, so you may well see this for errors unrelated to serialization. To quickly visualize what errors you can expect with which misconfiguration, here’s a quick reference:
If you have non-JSON data on your source topic but try to read it with the JsonConverter, you can expect to see:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: … org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)
This could be caused by the source topic being serialized in Avro or another format.
Solution: If the data is actually in Avro, then change your Kafka Connect sink connector to use:
"value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081",
OR if the topic is populated by Kafka Connect, and you can and would rather do so, switch the upstream source to emit JSON data:
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
This may be the most common error that I see reported again and again on places like the Confluent Community mailing list and Slack group. It happens when you try to use the Avro converter to read data from a topic that is not Avro. This would include data written by an Avro serializer other than the Confluent Schema Registry’s Avro serializer, which has its own wire format.
org.apache.kafka.connect.errors.DataException: my-topic-name at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97) … org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
The solution is to check the source topic’s serialization format, and either switch Kafka Connect’s sink connector to use the correct converter, or switch the upstream format to Avro (which is a good idea). If the upstream topic is populated by Kafka Connect, then you can configure the source connector’s converter as follows:
"value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081",
As described earlier, Kafka Connect supports a special structure of JSON messages containing both payload and schema. If you try to read JSON data that does not contain the data in this structure, you will get this error:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
To be clear, the only JSON structure that is valid for schemas.enable=true has schema and payload fields as the top-level elements (shown above).
As the message itself states, if you just have plain JSON data, you should change your connector’s configuration to:
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
If you want to include the schema in the data, you can either switch to using Avro (recommended), or you can configure Kafka Connect upstream to include the schema in the message:
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true",
To find the error log from Kafka Connect, you need to locate the Kafka Connect worker’s output. The location of this depends on how you launched Kafka Connect. There are several ways to install Kafka Connect, including Docker, Confluent CLI, systemd and manually from a downloaded archive. You’ll find the worker log in:
To change configuration properties for Kafka Connect workers (which apply to all connectors run), set the configuration accordingly:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
$ cd confluent-5.5.0 $ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Let’s say we’ve hit one of the errors shown above, and want to troubleshoot why our Kafka Connect sink connector can’t read from a topic.
We need to check the data on the topic that is being read, and ensure it’s in the serialization format that we think it is. Also, bear in mind that all the messages need to be in this format, so don’t just assume that because you’re now sending messages in the correct format to the topic there won’t be a problem. Existing messages on the topic are also read by Kafka Connect and other consumers.
Below, I describe troubleshooting this from the command line, but there are a few other tools to be aware of:
You can use console tools including kafkacat and kafka-console-consumer. My personal preference is kafkacat:
$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1 { "registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}
Using the excellent jq, you can also validate and format the JSON:
$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq '.' { "registertime": 1493356576434, "userid": "User_8", "regionid": "Region_2", "gender": "MALE" }
If you get something like this, with a bunch of “weird” characters, chances are you’re looking at binary data, as would be written by something like Avro or Protobuf:
$ kafkacat -b localhost:9092 -t users-avro -C -c1 ڝ���VUser_Region_MALE
You should use a console tool designed for reading and deserializing Avro data. Here, I’m using kafka-avro-console-consumer. Make sure you specify the correct Schema Registry URL at which the schema is held:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic users-avro \ --from-beginning --max-messages 1 {"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}
As before, you can pipe the resulting output through jq if you want to format it:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic users-avro \ --from-beginning --max-messages 1 | \ jq '.' { "registertime": 1505213905022, "userid": "User_5", "regionid": "Region_4", "gender": "FEMALE" }
When run in distributed mode, Kafka Connect uses Kafka itself to store metadata about its operations, including connector configuration, offsets and so on.
These Kafka topics themselves can be configured to use different converters through the internal.key.converter/internal.value.converter settings. However, these settings are purely for internal use, and indeed have been deprecated as of Apache Kafka 2.0. You should not change these, and you will get warnings from Apache Kafka as of version 2.0 if you do try to configure them.
A lot of the time, Kafka Connect will bring data in from places where there is already a schema, and retaining that schema is just a matter of using a suitable serialization format such as Avro. All downstream users of that data then benefit from the schema being available to them, with the compatibility guarantees that something like Schema Registry provides. But what if there is no explicit schema?
Maybe you’re reading data from a flat file using the FileSourceConnector (which is not recommended for production, but is often used for PoCs). Or, perhaps you’re pulling data from a REST endpoint using the REST connector. Since both of these, along with others, have no inherent schema, it’s up to you to declare it.
Sometimes you’ll just want to pass the bytes that you’ve read from the source through and put them on a topic. But most of the time, you’ll want to do the Right Thing and apply a schema so that the data can be used. Doing it once as part of the ingestion, instead of pushing the problem onto each consumer (potentially multiple), is a much better pattern to follow.
You can write your own Kafka Streams application to apply schema to data in a Kafka topic, but you can also use KSQL. This post shows how to do it against JSON data pulled from a REST endpoint. Let’s look here at a simple example of applying a schema to some CSV data. Obviously to be able to do this, we have to know the schema itself!
Let’s say we’ve got a Kafka topic testdata-csv with some CSV data in it. It looks like this:
$ kafkacat -b localhost:9092 -t testdata-csv -C 1,Rick Astley,Never Gonna Give You Up 2,Johnny Cash,Ring of Fire
By eyeballing it, we can guess at there being three fields, maybe something like:
If we leave the data in the topic like this, then any application wanting to use the data—whether it’s a Kafka Connect sink, bespoke Kafka application or whatever—will need to guess this schema each time. Or, just as bad, the developer for each consuming application will need to constantly go back to the team providing the data to check about the schema and any changes to it. Just as Kafka decouples systems, this kind of schema dependency forces a hard coupling between teams, and it’s not a good thing.
So what we’ll do is simply apply a schema to the data using KSQL, and populate a new, derived topic in which the schema is present. From KSQL, you can inspect the topic data:
ksql> PRINT 'testdata-csv' FROM BEGINNING; Format:STRING 11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up 11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire
The first two fields here (11/6/18 2:41:23 PM UTC and NULL) are the timestamp and key of the Kafka message, respectively. The remaining fields are from our CSV file. Let’s register this topic with ksqlDB and declare the schema:
ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \ WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED'); Message ---------------- Stream created ----------------
Observe that ksqlDB now has a schema for the stream of data:
ksql> DESCRIBE TESTDATA_CSV; Name : TESTDATA_CSV Field | Type ------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) ID | INTEGER ARTIST | VARCHAR(STRING) SONG | VARCHAR(STRING) ------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Check that the data is as expected by querying the ksqlDB stream. Note that at this point we’re simply acting as a Kafka consumer against the existing Kafka topic—we’ve not changed or duplicated any data yet.
ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest' ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV; 1 | Rick Astley | Never Gonna Give You Up 2 | Johnny Cash | Ring of Fire
Finally, create a new Kafka topic, populated by the reserialized data with schema. ksqlDB queries are continuous, so in addition to sending any existing data from the source topic to the target one, ksqlDB will send any future data to the topic too.
ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV; Message ---------------------------- Stream created and running ----------------------------
Verify the data using the Avro console consumer:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic TESTDATA \ --from-beginning | \ jq '.' { "ID": { "int": 1 }, "ARTIST": { "string": "Rick Astley" }, "SONG": { "string": "Never Gonna Give You Up" } } […]
You can even check the Schema Registry for the registered schema:
$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson' { "type": "record", "name": "KsqlDataSourceSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "ID", "type": [ "null", "int" ], "default": null }, { "name": "ARTIST", "type": [ "null", "string" ], "default": null }, { "name": "SONG", "type": [ "null", "string" ], "default": null } ] }
Any new messages written to the original topic (testdata-csv) are automagically processed by KSQL, and written to the new TESTDATA topic in Avro. Now any application or team wanting to use this data can simply work with the TESTDATA topic, and take advantage of the Avro-serialized data with declared schema. You can also use this technique to change the number of partitions in a topic, the partitioning key and replication factor.
Kafka Connect is a very simple yet powerful tool to use for integrating other systems with Kafka. One of the most common sources of misunderstanding is the converters that Kafka Connect offers. We’ve covered that Kafka messages are just key/value pairs, and it’s important to understand which serialization you should use and then standardize on that in your Kafka Connect connectors. Lastly, if you’d like to look at the code samples used in this blog post, they are available on GitHub.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.