[Webinar] Bringing Flink to On-Prem and Private Clouds. Register Now
We are excited to announce ksqlDB 0.24! It comes with a slew of improvements and new features. Access to Apache Kafka® record headers will enable a whole host of new use cases involving analytics and processing existing Kafka topics. JSON functions make working with unstructured data much easier. Finally, the LIMIT clause allows you to restrict the number of records emitted from a pull query, allowing you to control how much data you handle client side. Check out the changelog for a complete list of updates.
A Kafka record consists of a key, value, metadata, and optional headers. ksqlDB allows users to access record keys, values, and various metadata. Starting in 0.24, users can also access headers by declaring a column to be populated by the record’s headers. Headers typically contain metadata about records, which ksqlDB could use for routing or processing.
With the HEADERS syntax, users can access either the full header or a particular key in the header using SQL, as shown in the example below.
CREATE STREAM orders ( name STRING, price DECIMAL(4,2), headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS ) WITH ( kafka_topic='orders'; value_format='json', partitions=6; ); SELECT * FROM orders EMIT CHANGES;+--------+--------+---------------------------------------+ | NAME | PRICE | HEADERS | +--------+--------+---------------------------------------+ | chips | 2.30 | [{‘currency’,’VVNE’}] | +--------+--------+---------------------------------------+ | coffee | 3.05 | [{‘currency’,’Q0FE’},{‘id’,’MjQ2..’}] | +--------+--------+---------------------------------------+ | burger | 8.72 | [] | +--------+--------+---------------------------------------+
Access a specific key in a topic’s headers:
CREATE STREAM uploads ( name STRING, photo BYTES, image_format BYTES HEADER(‘format’) ) WITH ( kafka_topic=’uploads’; value_format=’json’, partitions=6; );SELECT name, photo, from_bytes(image_format, ‘hex’) as format FROM uploads EMIT CHANGES; +--------+---------+--------+ | NAME | PHOTO | FORMAT | +--------+---------+--------+ | stars | JKwg.. | jpeg | +--------+---------+--------+ | horse | Eqwq.. | png | +--------+---------+--------+ | ocean | MxKs.. | NULL | +--------+---------+--------+
ksqlDB 0.24 comes with new functions aiding in processing JSON-formatted strings. JSON is the de-facto format for web applications and many other scenarios such as IoT. As a result, many of our customers send JSON messages to Kafka.
With the newly added functions, ksqlDB users can process JSON structures encoded in string fields and serialize any ksqlDB data type to a JSON string.
IS_JSON_STRING checks if a given string contains valid JSON. This function is helpful to ensure that downstream results will be parsed successfully. Consider a stream of messages imported from an external system. As there is no control over the quality of the imported messages, it might be a good idea to filter out invalid records to prevent duplicated error messages downstream:
CREATE STREAM raw_messages (message STRING) WITH (kafka_topic='messages', VALUE_FORMAT='JSON'); CREATE STREAM json_messages AS SELECT message as json_message FROM raw_messages WHERE IS_JSON_STRING(message);
JSON_CONCAT merges two or more JSON structures. Consider a similar example where a stream contains multiple columns containing JSON objects that are only useful together in the application context. With ksqlDB we can merge them together:
CREATE STREAM raw_messages (message1 STRING, message2 STRING) WITH (kafka_topic='messages', VALUE_FORMAT='JSON'); CREATE STREAM json_messages AS SELECT message1, message2, JSON_CONCAT(message1, message2) FROM raw_messages WHERE IS_JSON_STRING(message1) AND IS_JSON_STRING(message2);
JSON_RECORDS and JSON_KEYS extract key-value pairs and keys accordingly from JSON-formatted strings:
SELECT JSON_RECORDS(json_message) as keys FROM json_messages;+-------------------------------------------------------------------+ |KEYS | +-------------------------------------------------------------------+ |{type="deprecated", amount=123.5, id=1} | |{type="standard", amount=10, id=null} |
|{type="extra", amount=5, id=3} |SELECT JSON_KEYS(json_message) as keys FROM json_messages;
+------------------------------------------------------------------------------------+ |KEYS | +------------------------------------------------------------------------------------+ |[type, amount, id] | |[type, amount, id] | |[type, amount, id]
JSON_ARRAY_LENGTH computes the length of the array encoded as JSON string:
SELECT json_message as phones, JSON_ARRAY_LENGTH(json_message) as phones_length FROM json_messages;+----------------------------------------------------------------------+-------------+ |PHONES |PHONES_LENGTH| +----------------------------------------------------------------------+-------------+ |["(984) 459-0666", "(253) 803-6544"] |2 | |["(232) 891-6803"] |1 | |["(342) 603-4952", "(891) 987-2476", "(792) 932-4901"] |4 |
Finally, TO_JSON_STRING converts any ksqlDB data type back to JSON string:
INSERT INTO raw_messages (message) VALUES (TO_JSON_STRING(STRUCT( c := ARRAY[5, 10, 15], d := MAP( 'x' := STRUCT(e := 'v1', f := true), 'y' := STRUCT(e := 'v2', f := false) ))));INSERT INTO raw_messages (message) VALUES (TO_JSON_STRING(STRUCT( c := ARRAY[2, 4, 8], d := MAP( 'x' := STRUCT(e := 'v3', f := true), 'y' := STRUCT(e := 'v4', f := true) ))));
INSERT INTO raw_messages (message) VALUES (TO_JSON_STRING(STRUCT( c := ARRAY[3, 6, 9], d := MAP( 'x' := STRUCT(e := 'v5', f := false), 'y' := STRUCT(e := 'v6', f := false) ))));
SELECT * FROM raw_messages;
+------------------------------------------------------------------------------------+ |MESSAGE | +------------------------------------------------------------------------------------+ |{"C":[5,10,15],"D":{"x":{"E":"v1","F":true},"y":{"E":"v2","F":false}}} | |{"C":[2,4,8],"D":{"x":{"E":"v3","F":true},"y":{"E":"v4","F":true}}} | |{"C":[3,6,9],"D":{"x":{"E":"v5","F":false},"y":{"E":"v6","F":false}}} |
Pull queries now support the LIMIT clause that you might already be familiar with. Users can now restrict the number of rows returned by executing a pull query over a STREAM or a TABLE by using the LIMIT clause. The syntax of pull queries is:
SELECT select_expr [, ...] FROM table/stream [ WHERE where_condition ] [ AND window_bounds ] [ LIMIT num_records ];
Execute a pull query with LIMIT clause over a STREAM:
CREATE STREAM STUDENTS (ID STRING KEY, SCORE INT) WITH (kafka_topic='students_topic', value_format='JSON', partitions=4);SELECT * FROM STUDENTS LIMIT 3;
+---------------+---------------+ |ID |SCORE | +---------------+---------------+ |ayala |97 | |spock |48 | |janice |87 | Limit Reached
Similarly, users can execute a pull query with a LIMIT clause over a TABLE as well.
You can learn more about pull queries here.
Starting from 0.24, users can specify a key_schema_id or value_schema_id in stream and table-creation commands. The schema specified by the ID will be looked up in the schema registry and used to create the logical schema as well as serialize and deserialize data. You can learn more in the documentation and the KLIP.
Examples:
Create a stream that uses an explicit key schema and value schema in Schema Registry. ksqlDB will infer the key and value columns according to the respective Avro schemas.
CREATE STREAM pageviews WITH ( KAFKA_TOPIC='pageviews-input', FORMAT='AVRO', KEY_SCHEMA_ID=1, VALUE_SCHEMA_ID=2 );
Create a persistent query that writes to a table and applies an explicit value schema. Record values will be written to the output topic according to this schema.
CREATE TABLE pageviews_count WITH ( VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=3, KAFKA_TOPIC='pageviews-count-output' ) AS SELECT pageId, count(*) FROM pageviews GROUP BY pageID
We’re excited to bring these features out and improve the product. For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent, and join the community to ask questions and find new resources.
GenAI thrives on real-time contextual data: In a modern system, LLMs should be designed to engage, synthesize, and contribute, rather than to simply serve as queryable data stores.
In this final part of the blog series, we bring it all together by exploring data streaming platforms (DSPs), event-driven architecture (EDA), and real-time data processing to scale AI-powered solutions across your organization.