This is the second of a series of posts (part 1 | part 3) that dive deep into key improvements made to ksqlDB to prepare for production availability in Confluent Cloud.
This post assumes familiarity with ksqlDB and ksqlDB’s interactive deployment mode. Familiarity with transactions in Apache Kafka® is also helpful. If you need some background, check out the following:
This post talks about some of the work that’s gone into preserving the integrity of ksqlDB’s internal state in interactive mode. This is important for ksqlDB in the cloud since all servers are deployed in interactive mode.
In interactive mode, ksqlDB servers that belong to the same cluster share the same ksql.service.id and command topic. The command topic is a Kafka topic containing all the statements that modify the ksqlDB metastore, the set of streams, tables, and queries present on a server. All data definition language (DDL) statements such as Create Stream, Create Table, and query-creating statements are enqueued onto the command topic before the server executes it. Users constantly add/delete new streams/tables and start/stop queries, and each of these actions requires writing statements to the command topic. When users submit these statements, they only submit to one server, so the command topic is used to ensure that all servers execute the same DDL statements and remain in sync.
Each ksqlDB server runs a Kafka consumer that polls the command topic for new records. When a new statement is written to the command topic, each server eventually consumes the record with the statement, and then the statement is validated and executed on each server. By executing the same set of DDL statements, every server in the cluster is able to maintain a consistent set of streams, tables, and queries.
The name for this Kafka topic is derived from the ksql.service.id. A newly deployed ksqlDB server or a restarted server checks to see if a command topic associated with its ksql.service.id exists during startup. If it exists, the server processes the statements in the command topic in order to recreate the metastore. This mechanism allows servers to be provisioned and restarted while keeping all DDL statements intact during the downtime.
It’s critical that every node in the cluster has the exact same view of the current streams, tables, and queries. This means that every node has to execute, in the same order, the exact same commands on every run of the command topic. To ensure this always holds true, a protocol has been implemented between the server endpoints and command topic consumers based on Kafka transactions.
Let’s explore the flaws in the old protocol by issuing the following DDL statement:
CREATE STREAM CLICKSTREAM (USERID BIGINT, PAGEID STRING) WITH (TOPIC='CLICKSTREAM’, VALUE_FORMAT='JSON');
The interactive deployment mode design treats the command topic as both a request queue and a log.
When a statement from the command topic fails validation/execution, the error is categorized as either a statement validation error or non statement validation error. In the previous design, non statement validation errors were considered transient (e.g., something timed out) and the statement was retried, while statement validation errors were skipped. There are two kinds of statement validation errors that can occur:
CREATE STREAM PAGEVIEWS_STREAM (USERID LONG, PAGEID STRING) WITH (TOPIC='PAGEVIEWS’, VALUE_FORMAT='JSON'); CREATE STREAM PAGEVIEWS_COPY AS SELECT * FROM PAGEVIEW
During an execution of the command topic, the streams PAGEVIEWS_STREAM and PAGEVIEWS_COPY are both successfully created. In the future, the server is restarted, but this time the Kafka topic PAGEVIEWS has been deleted before the restart. As a result, PAGEVIEWS_STREAM fails to be created. In addition, PAGEVIEWS_COPY isn’t created since it depends on PAGEVIEWS_STREAM existing.
CREATE STREAM CLICKSTREAM (USERID LONG, PAGEID STRING) WITH (TOPIC='CLICKSTREAM’, VALUE_FORMAT='JSON');"
While at the same time, server B receives:
CREATE STREAM CLICKSTREAM (ACCOUNTID STRING) WITH (TOPIC='CLICKSTREAM’, VALUE_FORMAT=AVRO);"
Since the stream CLICKSTREAM doesn’t exist yet on server A or server B, they both validate and produce the statement they received to the command topic. Assuming that server A’s statement was produced first, server B’s statement would fail validation when executing the command topic since stream CLICKSTREAM already exists.
For external errors, it doesn’t make sense to corrupt ksqlDB’s state by skipping a statement due to a change in an external system. However, internal errors can’t be reconciled and need to be skipped. Unfortunately, skipping all statements that encounter statement validation errors was necessary since it was difficult to programmatically categorize the statement validation error as internal or external.
In the new protocol, we treat the command topic as a log rather than a request queue by never skipping any statements. This requires two main changes.
With these two changes, we are able to treat the command topic like a log by never skipping a statement during execution and therefore are always able to rebuild the same state.
We realized that we could coordinate between ksqlDB servers by integrating Kafka’s transaction mechanism into the protocol that validates and produces statements to the command topic. One key feature of Kafka transactions is the use of the transactional.id for the Kafka producer. The main purpose for setting a transactional.id on a producer is to fence off leftover “zombie” producers from producing to the topic once a transaction has been initialized.
This functionality could also be leveraged to prevent multiple ksqlDB servers from writing to the command topic concurrently. We could augment the existing workflow by changing the existing producer to a transactional one by setting the ksql.service.id as the transactional.id. Since all the servers in the cluster have the same ID, every server in the cluster would have a producer with the same transactional.id, allowing the servers to fence each other off when starting a transaction. This is a big win for us since we don’t need to implement our own protocol for coordinating between the servers when validating and producing statements to the command topic.
The following is the new workflow for producing a statement to the command topic:
The command topic consumer’s isolation_level config can be set to read_committed, which only allows statements that were part of a successfully committed transaction to be returned by the consumer. Using Kafka transactions in this way guarantees that there’s only a single writer to the command topic at a given time, preventing conflicting statements from being written to the command topic.
By incorporating Kafka transactions into validation, there will be no conflicting statements present in the command topic and it will never skip a statement during execution. ksqlDB servers are now guaranteed to deterministically recreate the same metastore every time the command topic is processed. Users can be confident that their ksqlDB cluster can reliably stay in sync across restarts and scale ups.
It’s exciting to see how many improvements have been made to ksqlDB in the cloud since its original launch, and the team is always eager to provide new first-class features and ongoing improvements for our users. You can get a sneak peek of what the team is working on in our GitHub repository.
Future improvements that we’re looking to include:
We’re always open to community contributions, and if working on a hosted event streaming database sounds exciting to you, we’re hiring!
For further reading, check out:
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.