[Webinar] Bringing Flink to On-Prem and Private Clouds. Register Now
Co-partitioning is an essential concept of Kafka Streams. It ensures that the behavior of two joined streams is what you’d expect. Say you have a stream of customer addresses and a stream of customer purchases, and you’d like to join them for a customer order stream. You need to ensure the two streams are co-partitioned before executing the join.
In fact, Kafka Streams does not permit joining streams that are not co-partitioned. Co-partitioning is partially verified by the Stream’s client library, so if you try to join with an uneven number of partitions, you’ll get an error, for example: Topics not co-partitioned: [topic1, topic2]
.
Partially verified means that it only verifies that the number of partitions is the same, and as we’ll see, that is just one aspect of co-partitioning. Stream joins are based on the keys of the records. This ensures that records with the same join column are co-located on the same stream task. If Kafka Streams allowed the records not to be co-located, you’d end up with mismatched joins.
Since ksqlDB is built on Kafka Streams, it also requires co-partitioning before joins. We’ll talk about both ksqlDB and Kafka Streams in this post.
There are three criteria for co-partitioning. We’ll look at each in depth below. Still, at a high level they are as follows:
The input records for the join must have the same keying strategy
The source topics must have the same number of partitions on each side
Both sides of the join must have the same partitioning strategy in terms of hashing
If a pair of two streams or tables in either Kafka Streams or ksqlDB have all these elements, they are co-partitioned. Let’s take a look at each element in more detail.
Note: Foreign-key table-table joins in Kafka Streams and ksqlDB do not require co-partitioning. The result table inherits the number of partitions of the left input table.
The keying strategy used for each stream or table must be the same—that is, the type of the keys that a join is made by must be the same, like INT id
and INT customer_id
. It’s also very important that the serialization method for the keys is the same. Otherwise, the underlying bytes won’t align.
The input topics must have the same number of partitions on both sides. For example, say you had one topic with three partitions and another with two. Because key assignment works sequentially through each partition, there’s no guarantee that the events in partition 1 of the first topic would have the same key as the events in the same partition of the second topic.
As you can see in the image above, there are different keys on the corresponding partitions. In topic 1, partition A’s shapes are pink triangles, and in topic 2, partition A’s shape is a black circle. They have different keys; consequently, when it comes to joining between streams built from these tables, we’re going to get problems.
But, if we make the number of partitions the same, the hash function guarantees that messages with the same key will be in the same respective partitions across the two topics, and thus the joins will succeed.
Both sides must also have the same partitioning strategy with respect to the hashing method the producer uses. Otherwise, the correspondence of the events in each topic partition cannot be guaranteed, and you might have mismatches. Be aware of this if you’re using different producer clients. For example, Java-based clients use murmur2, while librdkafka uses consistent_random hash by default, so you want to override the hashing algorithm in the non-Java client (otherwise, you need to write your own partitioner to override the algorithm in the Java producer client).
Whether you’re using Kafka Streams or ksqlDB, you need to double-check that your topics have the same number of partitions, which you can do with the Kafka topics utility: kafka-topics --describe
.
Implementing co-partitioning at the Stream client library level is a matter of manually reading your events to make sure you’ve got the same keys and ensuring that the same serialization method is used for the keys as well.
Let’s set up two topics in Confluent Cloud, one serialized in JSON, called orders_in_json
, and the other serialized in Protobuf, called purchases_in_protobuf
. We’ll consume them using the kcat tool and output the bytes using hexdump, which displays the contents of binary files on Linux and Mac.
When a topic is serialized with Protobuf, it’s prefaced by 0000000 0000 8601
, but when it’s serialized in JSON, it’s prefaced by 0000000
.
So, even though you’ve got the same number of partitions and the same partition strategy, you cannot make a join to create an enriched topic because the keys are not the same when they are serialized differently.
A “key” fact to remember: keys must be serialized using the same method. Values, on the other hand, can be serialized differently.
As mentioned above, you must also check that your producer clients are using the same hashing method for the keys. How you do this can vary depending on your choice of producer client.
You need to have the same number of partitions in each topic. If you need to re-partition, think it through carefully. You may not have to re-partition since you can provide an intermediate topic with Kafka Streams and ksqlDB. If you must re-partition, remember that the ordering guarantees that you previously had will be invalidated since ordering is only guaranteed within, not across, partitions, and this may affect the results of what logic you apply to these partitions.
Kafka Streams provides the KStream.repartition method, which allows you to re-partition a stream. For example, consider you have two streams based on input topics that are widely used across your organization, so changing the number of partitions isn’t possible. But you can use Stream.repartition(Repartitioned.numberOfPartitions())
, and the returned KStream
will have the same number of partitions as the left side of your proposed join, making it eligible for a join. It’s important to note the re-partition method doesn’t affect the source topic.
Let’s say you have a stream of customer addresses and a table with customer orders. You want to join these topics to create an enriched topic, but there’s a slight problem: the customer address stream has a key of VARCHAR customer name
while the customer order table has a key of INT cust_id
. As in Kafka Streams, the serialization format needs to be the same. When keys are different, it’s a case for re-keying, that is, creating a new stream partitioned by the key you want to create the join with. In this case, if the topic you want to make has six partitions, it might look like this:
After starting that operation, you can create a join between customers_rekeyed
and the customer order table.
Again, check that the producer clients feeding the underlying base streams use the same partitioning strategy regarding their hashing method.
If your partition number is imbalanced, you can re-partition using ksqlDB. This sample command creates a new stream based on the one with the wrong number of partitions.
Confluent Developer offers a full tutorial.
So far, you’ve reviewed the criteria for co-partitioning and learned how to ensure your streams are co-partitioned in both Kafka Streams and ksqlDB. To learn more, you can check out these resources:
Code samples for joining streams/tables:
The Kafka Streams documentation also has some useful instructions for ensuring co-partitioning.
Apache Kafka® is at the core of a large ecosystem that includes powerful components, such as Kafka Connect and Kafka Streams. This ecosystem also includes many tools and utilities that […]
Learn what windowing is in Kafka Streams and get comfortable with the differences between the main types.