Prädiktives maschinelles Lernen entwickeln, mit Flink | Workshop am 18. Dezember | Jetzt registrieren
In the world of data engineering, data routing decisions are crucial to successful distributed system design. Some organizations choose to route data from within application code. Other teams hand off the routing of data to a different process after application code has transmitted messages to a central store, like an Apache Kafka® cluster.
Several Confluent customers have requested assistance in routing data internally using ksqlDB. Specifically, they have engaged with Confluent Professional Services to review two patterns:
Both of these patterns are easily implemented using the event streaming database ksqlDB, which is available as a fully managed service in Confluent Cloud. If you prefer to self-manage your infrastructure, you can deploy ksqlDB yourself in your own datacenter or private/public cloud through Confluent Platform. So, I’ll walk through how both patterns could be achieved using ksqlDB.
To quickly set up a sample cluster, use one of the two methods below:
Now let’s get started.
Some companies might have a giant topic with messages from a single source. But not all their downstream applications need all the messages from this topic. To help downstream applications efficiently get only the messages they need, they have a few choices:
There are three pieces of information that we need to route from a source topic to a target topic:
Here’s a sample JSON message:
{ "targetTopic": "topic-1", "content": "foo" }
If you would like, you can also put the target topic information inside the message produced to Kafka. However, that will require you to use regular expressions later on to extract the information that you need. Beware that introducing the use of regular expressions in an event streaming application can have severe performance implications.
This is needed to get all the messages from the source topic.
If you are using the sample message from earlier, it will look like this:
CREATE STREAM source_stream ( targetTopic varchar, content varchar ) with ( kafka_topic='source-topic', value_format='JSON' );
source_stream is simply any name that you desire for this stream, and source-topic is the topic that you are getting your messages from.
value_format='JSON' means that we are reading the messages as JSON. Thus, we can pick up the field directly inside CREATE STREAM. If your message has no particular format and includes your target topic information, you should use 'KAFKA'.
After the stream is successfully created, it will receive all the messages being sent to the source topic. You can validate it by using this query:
SELECT * FROM source_stream EMIT CHANGES;
The result will look like this on the cloud:
Once you have validated that the source stream is working properly, create a stream for each target topic that needs to receive the routed message. It will look like this if you are using the sample JSON message:
CREATE STREAM target_stream_1 with (kafka_topic='topic-1', VALUE_FORMAT='KAFKA') AS # KAFKA is the part to get rid of the wraps; SELECT content FROM source_stream WHERE targetTopic = 'topic-1'; # The filtering part;
Where target_stream_1 is any target stream name that you desire, topic-1 is the topic you want the message to go to, source_stream is the stream that you created in step 1, and topic-1 is an arbitrary string that you want the field to match in order for the message to be routed.
VALUE_FORMAT='KAFKA' will output your message in its raw form. For example, if your content has an XML format, then the new message passed down will be this:
<xml>sample xml</xml>
If you want it to be wrapped around JSON like it was before, then use JSON:
{ "content": "<xml>sample xml</xml>" }
That’s it! Your incoming messages will be routed by these stream applications, respectively.
Here’s a high-level overview of what’s happening:
If you feel strongly about removing the field "targetTopic": "topic-1" and don’t mind the performance impact, you can put the target topic information within the content field. Then the sample message will look something like this:
{ "content": "target-topic,foo" }
You can change the target stream creation above into something along these lines:
CREATE STREAM target_stream_1 with (kafka_topic='target-topic', VALUE_FORMAT='KAFKA') AS SELECT content FROM source_stream WHERE REGEXP_EXTRACT('target-topic', content) = 'target-topic';
If you take this approach, confirm what your performance requirements are and test out the performance impact that this will have on your use case.
To test if the topics are receiving the correct messages, simply go to the Confluent UI’s topic section to read the messages. Remember to set offset to -1 so that all messages will be shown. The output will resemble this:
At times, companies need exact copies of topics within a single cluster and to constantly sync them. To achieve this, either (1) same messages are produced to multiple topics from the producer side or (2) you can utilize ksqlDB to create them and keep them constantly in sync.
Now that we know how to route the message, it’s clear that it is easier to fan out the message because we don’t need a logic condition on where the messages are going. They will be going to all target topics. Therefore, we only need two details:
Follow the steps below to fan out the message.
As before, we have to create a stream for the source topic to get the messages out:
CREATE STREAM source_stream ( content varchar ) with ( kafka_topic='source-topic', value_format='JSON' );
Note that we no longer need the target topic information here. Also, bear in mind that the stream creation content is based on your message. You can learn more about how to properly create a stream in the documentation.
You’ve probably already guessed what comes next. Because fan-out requires no additional logic, we can create a stream for each of the target topics with this query:
CREATE STREAM target_stream_1 with (kafka_topic='topic-1', VALUE_FORMAT='KAFKA') AS SELECT content FROM source_stream;
Make sure all of your stream names are unique and replace topic-1 to your desired topic names.
Although routing and fan-out is possible using ksqlDB, there are a few pros and cons to consider before you take this approach.
This is just the tip of the iceberg of what ksqlDB can do. Ready to check it out for yourself? Head over to ksqlDB.io to get started!
In this final part of the blog series, we bring it all together by exploring data streaming platforms (DSPs), event-driven architecture (EDA), and real-time data processing to scale AI-powered solutions across your organization.
In Part 2 of the series, we take things a step further by enhancing GenAI with the tools it needs to deliver smarter, more relevant responses. We introduce retrieval-augmented generation (RAG) and vector databases (VectorDBs), key technologies that provide LLMs with the context they need.