[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

Announcing ksqlDB 0.25.1

Written By

We are thrilled to announce ksqlDB 0.25! It comes with a slew of improvements and new features. In particular, we improved how UDAFs work with complex types like Structs and Maps to enhance popular functions like latest_by_offset and collect_list. This release also contains multiple improvements to the product, check out the highlights below and the changelog for a complete list of updates.

Improvements to Aggregate Functions

Aggregate Functions allow for powerful ways to transform a column into a new value. Previously, aggregate functions like latest_by_offset or collect_list only worked with primitive types like strings, booleans, and numeric columns. Those restrictions have been lifted, and now you can use collect_list, collect_set, earliest_by_offset, and latest_by_offset with Structs, Arrays, and Maps.

As an example, let us see collect_list in action with a Struct type.

CREATE STREAM assignments (department STRING, Person Struct) WITH (kafka_topic='assignments', value_format='json', partitions=1);

INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Karen', Age := 55)); INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Juliet', Age := 39)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Bellamy', Age := 27)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Will', Age := 39));

SELECT * FROM ASSIGNMENTS;

+-------------------------------------------+--------------------------------------------------------+ |DEPARTMENT |PERSON | +-------------------------------------------+---------------------------------------------------------+ |engineering |{NAME=Karen, AGE=55} | |engineering |{NAME=Juliet, AGE=39} | |sales |{NAME=Bellamy, AGE=27} | |sales |{NAME=Will, AGE=39} |

We can group by department and then collect the list of staff in each department.

SELECT department, collect_list(person) AS staffByDepartment
FROM assignments
GROUP BY department
EMIT CHANGES;
+--------------------------------------------+-------------------------------------------------------------------+
|DEPARTMENT                                       | STAFFBYDEPARTMENT                                                     |
+--------------------------------------------+-------------------------------------------------------------------+
|engineering                                          |[{NAME=Karen, AGE=55}, {NAME=Juliet, AGE=39}]      |
|sales                                                        |[{NAME=Bellamy, AGE=27}, {NAME=Will, AGE=39}]     |

Behind the scenes, these changes required updating how User Defined Aggregate Functions (UDAFs) work. Specifically, one can now create UDAFs where the aggregate and return types vary depending on the SQL type of the input column. For those experienced with implementing UDFs, this should sound similar to the SchemaProvider interface. A UDAF’s aggregate or map function can depend on knowing the input type, so the solution here is different.

Users building UDAFs that operate on general data types will implement three new methods initializeTypeArguments(List argTypeList), getAggregateSqlType(), and getReturnSqlType(). Concrete examples are in the ksqlDB codebase here: latest_by_offset or collect_list.

You can find further documentation in the new “Dynamic UDAFs” section of the ksqlDB documentation page.

Other highlights

This release contains a multitude of other improvements, including the following.

Improved nested exception handling

Previously, if a nested function threw an exception, the column would always be null.

As an example, one may want to try to parse a column of text into a timestamp falling back to -01-01 if the column is invalid.

SELECT
parse_date(date_string, 'yyyy-MM-dd') AS parse_as_date,
parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd') AS parse_as_year,
coalesce(
  parse_date(date_string, 'yyyy-MM-dd'),
  parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd')
) as parse_as_either
FROM inputs
EMIT CHANGES;;

Starting in this release, a function that fails returns null to the calling function. This allows the coalesce and ifnull functions to have more intuitive behavior.

Custom request headers in the Java API

We now allow users to specify custom request headers for requests issued by the Java client and when applying migrations with the ksql-migrations tool. For example, in advanced networking topologies, custom headers can be leveraged by a proxy layer to control the flow of requests to the ksqlDB cluster.

Support for at-least-once semantics in the Java client push query API

This release adds support for push query continuation tokens in the Java client. This allows push queries to also have at-least-once semantics (ALOS) in the client. Users can resume streaming results from a StreamedQueryResult from the last saved continuation token by using the method continueFromLastContinuationToken(). With ALOS in the client, push queries are more robust against network failures, since the client can pick up where it left off.

Get started with ksqlDB

We’re excited to bring these features out and improve the product. For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent Cloud, and join the community to ask questions and find new resources.

Did you like this blog post? Share it now