[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now
We are thrilled to announce ksqlDB 0.25! It comes with a slew of improvements and new features. In particular, we improved how UDAFs work with complex types like Structs and Maps to enhance popular functions like latest_by_offset and collect_list. This release also contains multiple improvements to the product, check out the highlights below and the changelog for a complete list of updates.
Aggregate Functions allow for powerful ways to transform a column into a new value. Previously, aggregate functions like latest_by_offset or collect_list only worked with primitive types like strings, booleans, and numeric columns. Those restrictions have been lifted, and now you can use collect_list, collect_set, earliest_by_offset, and latest_by_offset with Structs, Arrays, and Maps.
As an example, let us see collect_list in action with a Struct type.
CREATE STREAM assignments (department STRING, Person Struct) WITH (kafka_topic='assignments', value_format='json', partitions=1);INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Karen', Age := 55)); INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Juliet', Age := 39)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Bellamy', Age := 27)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Will', Age := 39));
SELECT * FROM ASSIGNMENTS;
+-------------------------------------------+--------------------------------------------------------+ |DEPARTMENT |PERSON | +-------------------------------------------+---------------------------------------------------------+ |engineering |{NAME=Karen, AGE=55} | |engineering |{NAME=Juliet, AGE=39} | |sales |{NAME=Bellamy, AGE=27} | |sales |{NAME=Will, AGE=39} |
We can group by department and then collect the list of staff in each department.
SELECT department, collect_list(person) AS staffByDepartment FROM assignments GROUP BY department EMIT CHANGES; +--------------------------------------------+-------------------------------------------------------------------+ |DEPARTMENT | STAFFBYDEPARTMENT | +--------------------------------------------+-------------------------------------------------------------------+ |engineering |[{NAME=Karen, AGE=55}, {NAME=Juliet, AGE=39}] | |sales |[{NAME=Bellamy, AGE=27}, {NAME=Will, AGE=39}] |
Behind the scenes, these changes required updating how User Defined Aggregate Functions (UDAFs) work. Specifically, one can now create UDAFs where the aggregate and return types vary depending on the SQL type of the input column. For those experienced with implementing UDFs, this should sound similar to the SchemaProvider interface. A UDAF’s aggregate or map function can depend on knowing the input type, so the solution here is different.
Users building UDAFs that operate on general data types will implement three new methods initializeTypeArguments(List argTypeList), getAggregateSqlType(), and getReturnSqlType(). Concrete examples are in the ksqlDB codebase here: latest_by_offset or collect_list.
You can find further documentation in the new “Dynamic UDAFs” section of the ksqlDB documentation page.
This release contains a multitude of other improvements, including the following.
Previously, if a nested function threw an exception, the column would always be null.
As an example, one may want to try to parse a column of text into a timestamp falling back to -01-01 if the column is invalid.
SELECT parse_date(date_string, 'yyyy-MM-dd') AS parse_as_date, parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd') AS parse_as_year, coalesce( parse_date(date_string, 'yyyy-MM-dd'), parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd') ) as parse_as_either FROM inputs EMIT CHANGES;;
Starting in this release, a function that fails returns null to the calling function. This allows the coalesce and ifnull functions to have more intuitive behavior.
We now allow users to specify custom request headers for requests issued by the Java client and when applying migrations with the ksql-migrations tool. For example, in advanced networking topologies, custom headers can be leveraged by a proxy layer to control the flow of requests to the ksqlDB cluster.
This release adds support for push query continuation tokens in the Java client. This allows push queries to also have at-least-once semantics (ALOS) in the client. Users can resume streaming results from a StreamedQueryResult from the last saved continuation token by using the method continueFromLastContinuationToken(). With ALOS in the client, push queries are more robust against network failures, since the client can pick up where it left off.
We’re excited to bring these features out and improve the product. For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent Cloud, and join the community to ask questions and find new resources.
GenAI thrives on real-time contextual data: In a modern system, LLMs should be designed to engage, synthesize, and contribute, rather than to simply serve as queryable data stores.
In this final part of the blog series, we bring it all together by exploring data streaming platforms (DSPs), event-driven architecture (EDA), and real-time data processing to scale AI-powered solutions across your organization.