Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Announcing ksqlDB 0.27.1

Written By

We are excited to announce the ksqlDB 0.27.1 release! This iteration brings forth new functionality for dynamic data access against multi-schema topics, a host of improvements relating to working with protocol buffers, support for synchronizing resource creation, and further null-handling improvements. Read below to learn in more detail about each of the changes. As usual, the changelog contains the full list of updates and improvements.

Handling multi-schema Protobuf/Avro topics in ksqlDB

In some scenarios, events defined by different schemas can be published to the same Apache Kafka® topic. These cases are handled smoothly in Kafka and Schema Registry, but until now, processing them with ksqlDB was quite complex. In ksqlDB 0.27.1 we are introducing a new function that simplifies the handling of such schemas.

For instance, look at this SR example taken from Putting Several Event Types in the Same Topic, which uses an Avro schema to handle multiple schema messages in Kafka:

subject: all-types-value
[
	"io.confluent.examples.avro.Customer",
	"io.confluent.examples.avro.Product"
]

When using ksqlDB, the queries executed show the messages as STRUCT types with each schema type field wrapped inside.

For example, let’s create the stream ALL_TYPES that contains different records with different schemas:

ksql> CREATE STREAM ALL_TYPES WITH (kafka_topic='all-types', value_format='AVRO');
ksql> DESCRIBE ALL_TYPES;
Name: ALL_TYPES
Field|Type                                                                
-----------------------------------------------------------------------------------------------------------------------------------------
 Customer | STRUCT<customer_id INTEGER, customer_name VARCHAR(STRING), customer_email VARCHAR(STRING), customer_address VARCHAR(STRING)>
 Product  | STRUCT<product_id INTEGER, product_name VARCHAR(STRING), product_price DOUBLE>                                          	 
-----------------------------------------------------------------------------------------------------------------------------------------
ksql> SELECT * FROM ALL_TYPES;
+---------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+
|Customer                                                                                       	|Product                                                                                	|
+---------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+
|null                                                                                           	|{product_id=1, product_name=rice, product_price=100.0}                                 	|
|{customer_id=100, customer_name=acme, customer_email=acme@google.com, customer_address=1 Main St       |null                                                                                   	|
|{customer_id=101, customer_name=micro, customer_email=micro@google.com, customer_address=2 Main St}    |null                                                                                   	|
|null                                                                                           	|{product_id=2, product_name=beans, product_price=50.0}                                 	|
Query Completed

The above query can be customized to access the Customer events and fields only, such as:

ksql> SELECT `Customer`->`customer_id`, `Customer`->`customer_name`, `Customer->`customer_address` FROM ALL_TYPES WHERE `Customer` is not null

However, dealing with several types of events and possibly hundreds of fields may sound daunting. In ksqlDB 0.27.1, we added a small but powerful improvement to handle these types of cases. In this release, we included support for the star wildcard (*) on STRUCT types which can dynamically access the struct fields without knowing the field names in advance. Similar to the SELECT * statement, you can now run SELECT struct_name->* statements too.

Let’s take the previous example for customer and product events. Instead of selecting the fields one by one and writing a long query, you can execute the following query which will return a table with all fields of the specified struct:

ksql> SELECT `Customer`->* FROM ALL_TYPES WHERE `Customer` IS NOT NULL;
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|customer_id                             	|customer_name                           	|customer_email                          	|customer_address                        	|
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|100                                     	|acme                                    	|acme@google.com                         	|1 Main St                               	|
|10`                                     	|micro                                   	|micro@google.com                        	|2 Main St                               	|
Query Completed

The star character can be used on any STRUCT no matter the topic format. Multiple event types can be deserialized correctly with Avro and Protobuf formats only, though. Go and play with it and wait for more SR improvements in future releases.

Protobufs without Schema Registry

This release introduces a new serialization format that allows working with Protocol Buffer messages without Schema Registry: PROTOBUF_NOSR. This complements the existing PROTOBUF format, which does require Schema Registry. Given an existing topic of Protobuf-serialized records, it’s no longer necessary to reprocess data upstream to allow ksqlDB to process it, because the new format does not rely on a Schema Registry schema ID in the serialized bytes.

When leveraging PROTOBUF_NOSR, the data is simply raw Protobuf messages serialized into bytes, so they can be produced and consumed by clients in any language Protobuf supports.

Consider the following stream:

CREATE STREAM persons (key INT KEY, name STRING, id INT, email STRING, phones ARRAY<STRUCT<number STRING, type INT>>) with (kafka_topic='persons', partitions=1, value_format='PROTOBUF_NOSR');

This DDL corresponds to the following Protobuf message:

syntax = "proto3";

package protobuf; option java_package = "io.confluent.examples.protobuf";

message Person { string name = 1; int32 id = 2; string email = 3;

message PhoneNumber { string number = 1; Int32 type = 2; }

repeated PhoneNumber phones = 4; }

Assume that some records are serialized with a Protobuf library and produced to the persons topic. Then, we can query the records in ksqlDB:

ksql> select * from persons limit 3;
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|KEY                           |NAME                          |ID                            |EMAIL                         |PHONES                        |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|0                             |Name Name0                    |0                             |0me@gmail.com                 |[{NUMBER=123-456-780, TYPE=1},|
|                              |                              |                              |                              | {NUMBER=987-654-320, TYPE=0}]|
|1                             |Name Name1                    |1                             |1me@gmail.com                 |[{NUMBER=123-456-781, TYPE=1},|
|                              |                              |                              |                              | {NUMBER=987-654-321, TYPE=0}]|
|2                             |Name Name2                    |2                             |2me@gmail.com                 |[{NUMBER=123-456-782, TYPE=1},|
|                              |                              |                              |                              | {NUMBER=987-654-322, TYPE=0}]|
Query Completed
Query terminated

Or, by leveraging the Protobuf message definition from above, we can consume from Python:

from kafka import KafkaConsumer
from person_pb2 import Person
from google.protobuf.text_format import MessageToString, Parse

consumer = KafkaConsumer('persons', auto_offset_reset='earliest') for msg in consumer: p = Person() p.ParseFromString(msg.value) print(f"{int.from_bytes(msg.key, 'big')} -> {MessageToString(p, as_one_line=True)}")

✗ python consumer.py 0 -> name: "Name Name0" email: "0me@gmail.com" phones { number: "123-456-780" type: 1 } phones { number: "987-654-320" } 1 -> name: "Name Name1" id: 1 email: "1me@gmail.com" phones { number: "123-456-781" type: 1 } phones { number: "987-654-321" } 2 -> name: "Name Name2" id: 2 email: "2me@gmail.com" phones { number: "123-456-782" type: 1 } phones { number: "987-654-322" }

Protobuf content type

ksqlDB’s REST API now supports a Protobuf content type for Pull queries where the rows are serialized in the Protobuf format. The content type is supported for querying the /query and /query-stream endpoints.

You can specify this serialization format in the Accept header:

Accept: application/vnd.ksql.v1+protobuf

The following example shows a curl command that issues a Pull query on a table called CURRENTLOCATION with the Protobuf content type:

curl -X "POST" "http://localhost:8088/query" \
     -H "Accept: application/vnd.ksql.v1+protobuf" \
     -d $'{
  "ksql": "SELECT * FROM CURRENTLOCATION;",
  "streamsProperties": {}
}'

Response:

[{"header":{"queryId":"query_1655152127973","schema":"`PROFILEID` STRING KEY, `LA` DOUBLE, `LO` DOUBLE","protoSchema":"syntax = \"proto3\";\n\nmessage ConnectDefault1 {\n  string PROFILEID = 1;\n  double LA = 2;\n  double LO = 3;\n}\n"}},
{"row":{"protobufBytes":"CggxOGY0ZWE4NhF90LNZ9bFCQBmASL99HYRewA=="}},
{"row":{"protobufBytes":"Cgg0YTdjN2I0MRFAE2HD07NCQBnM7snDQoVewA=="}},
{"row":{"protobufBytes":"Cgg0YWI1Y2JhZBGKsOHplbJCQBmMSuoENIVewA=="}}]

The protoSchema field in the header corresponds to the content of a .proto file that the proto compiler uses at build time.

Users can use the protoSchema field to deserialize the protobufBytes into Protobuf messages. See more details in the documentation.

Assert the existence of resources

You can now assert the existence of topics and schemas with the ASSERT command. This is useful while running scripts or using the migration tool to avoid timing issues with sequential dependencies. For example, in the following script the CREATE CONNECTOR statement creates a topic foo after the connector is created, and the second statement creates a stream that reads from foo. However, since the creation of the connector is not instantaneous, the second command could fail due to foo not existing yet.

CREATE SOURCE CONNECTOR foo-source WITH (‘topic=’foo’, ‘connector.class’=’sample-source’);
CREATE STREAM foo WITH (kafka_topic=’foo’, value_format=’json’’);

With ASSERT TOPIC, we can ask ksqlDB to wait for foo to exist before running the next statement.

CREATE SOURCE CONNECTOR foo-source WITH (‘topic=’foo’, ‘connector.class’=’sample-source’);
ASSERT TOPIC foo TIMEOUT 10 seconds;
CREATE STREAM foo (name STRING, id INTEGER) WITH (kafka_topic=’foo’, value_format=’json’’);

Now ksqlDB will wait up to 10 seconds for foo to exist before running the CREATE STREAM statement.

We can also assert the existence of schemas. In the following example, we check that a schema with id 1 exists before creating a stream with that schema.

ASSERT SCHEMA ID 1;
CREATE STREAM bar WITH (kafka_topic=’bar’, value_format=’avro’, value_schema_id=1);

Allow aggregations without GROUP BY clauses

A common point of friction for new users to ksqlDB was that they were forced to have a GROUP BY clause for their aggregate queries, even if it logically made sense without. For instance take the following aggregate query, which would return with this error.

ksql> SELECT COUNT(*) FROM twitter WINDOW TUMBLING (size 1 hour)  EMIT CHANGES;
Aggregate query needs GROUP BY clause.

Previously, users had to create a dummy constant column to GROUP BY on, but in 0.27.1 ksqlDB provides syntactic sugar by adding this dummy column for you automatically if you omit a GROUP BY clause in an aggregate push query.

More null handling improvements

Just like in 0.25.1 and 0.26, there are some useful bug fixes around null handling. Notably, if one has a complex field like a Map, Array, or Struct, then referencing into the field when it was null would throw an exception. These exceptions were reported in the ksqlDB processing log.

Consider an example like

CREATE STREAM ExampleStream(id VARCHAR, obj STRUCT<value VARCHAR>) WITH (KAFKA_TOPIC='topic_name', VALUE_FORMAT='JSON',PARTITIONS=1);
SELECT id, obj->VALUE FROM ExampleStream;

When obj is null, processing records for this query would cause a NullPointerException and an error would be logged. Similar issues would happen with Maps and Arrays. With 0.27.1, these situations no longer result in errors in the processing log.

Get started with ksqlDB

Again, thank you for using ksqlDB. Please do not hesitate to contact us with more feedback or comments! For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent, and join the community to ask questions and find new resources.

Get Started

  • Tom Nguyen is an engineer on the ksqlDB team at Confluent. He joined in 2021 after previously building event systems in e-commerce, AI, and banking.

Did you like this blog post? Share it now