[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

Co-Partitioning with Apache Kafka

Written By

What is co-partitioning?

Co-partitioning is an essential concept of Kafka Streams. It ensures that the behavior of two joined streams is what you’d expect. Say you have a stream of customer addresses and a stream of customer purchases, and you’d like to join them for a customer order stream. You need to ensure the two streams are co-partitioned before executing the join.

In fact, Kafka Streams does not permit joining streams that are not co-partitioned. Co-partitioning is partially verified by the Stream’s client library, so if you try to join with an uneven number of partitions, you’ll get an error, for example: Topics not co-partitioned: [topic1, topic2]

Partially verified means that it only verifies that the number of partitions is the same, and as we’ll see, that is just one aspect of co-partitioning. Stream joins are based on the keys of the records. This ensures that records with the same join column are co-located on the same stream task. If Kafka Streams allowed the records not to be co-located, you’d end up with mismatched joins. 

Since ksqlDB is built on Kafka Streams, it also requires co-partitioning before joins. We’ll talk about both ksqlDB and Kafka Streams in this post. 

Criteria for co-partitioning

There are three criteria for co-partitioning. We’ll look at each in depth below. Still, at a high level they are as follows:

If a pair of two streams or tables in either Kafka Streams or ksqlDB have all these elements, they are co-partitioned. Let’s take a look at each element in more detail.

Note: Foreign-key table-table joins in Kafka Streams and ksqlDB do not require co-partitioning. The result table inherits the number of partitions of the left input table.

Keying strategies

The keying strategy used for each stream or table must be the same—that is, the type of the keys that a join is made by must be the same, like INT id and INT customer_id. It’s also very important that the serialization method for the keys is the same. Otherwise, the underlying bytes won’t align. 

Partitions

The input topics must have the same number of partitions on both sides. For example, say you had one topic with three partitions and another with two. Because key assignment works sequentially through each partition, there’s no guarantee that the events in partition 1 of the first topic would have the same key as the events in the same partition of the second topic.

As you can see in the image above, there are different keys on the corresponding partitions. In topic 1, partition A’s shapes are pink triangles, and in topic 2, partition A’s shape is a black circle. They have different keys; consequently, when it comes to joining between streams built from these tables, we’re going to get problems. 

But, if we make the number of partitions the same, the hash function guarantees that messages with the same key will be in the same respective partitions across the two topics, and thus the joins will succeed.

Partitioning strategy

Both sides must also have the same partitioning strategy with respect to the hashing method the producer uses. Otherwise, the correspondence of the events in each topic partition cannot be guaranteed, and you might have mismatches. Be aware of this if you’re using different producer clients. For example, Java-based clients use murmur2, while librdkafka uses consistent_random hash by default, so you want to override the hashing algorithm in the non-Java client (otherwise, you need to write your own partitioner to override the algorithm in the Java producer client). 

Practical considerations for co-partitioning

Whether you’re using Kafka Streams or ksqlDB, you need to double-check that your topics have the same number of partitions, which you can do with the Kafka topics utility: kafka-topics --describe

Double-checking co-partitioning in Kafka Streams 

Keying strategy

Implementing co-partitioning at the Stream client library level is a matter of manually reading your events to make sure you’ve got the same keys and ensuring that the same serialization method is used for the keys as well. 

Let’s set up two topics in Confluent Cloud, one serialized in JSON, called orders_in_json, and the other serialized in Protobuf, called purchases_in_protobuf. We’ll consume them using the kcat tool and output the bytes using hexdump, which displays the contents of binary files on Linux and Mac. 

❯ kcat -Ct orders_in_json -c1 | hexdump
0000000 227b 726f 6564 7472 6d69 2265 313a 3135
0000010 3232 3533 3532 3838 3231 222c 726f 6564
❯ kcat -Ct purchases_in_protobuf -c1 | hexdump
0000000 0000 8601 00a1 0308 0612 7567 7469 7261
0000010 0818 0822 020a 4811 0410 0218 000a     
000001d
❯ kcat -Ct orders_in_protobuf -c1 | hexdump
0000000 0000 8601 7ba1 6f22 6472 7265 6974 656d
0000010 3a22 3431 3139 3236 3733 3636 3433 2c31

When a topic is serialized with Protobuf, it’s prefaced by 0000000 0000 8601, but when it’s serialized in JSON, it’s prefaced by 0000000

So, even though you’ve got the same number of partitions and the same partition strategy, you cannot make a join to create an enriched topic because the keys are not the same when they are serialized differently. 

A “key” fact to remember: keys must be serialized using the same method. Values, on the other hand, can be serialized differently. 

Hashing method

As mentioned above, you must also check that your producer clients are using the same hashing method for the keys. How you do this can vary depending on your choice of producer client. 

Re-partitioning

You need to have the same number of partitions in each topic. If you need to re-partition, think it through carefully. You may not have to re-partition since you can provide an intermediate topic with Kafka Streams and ksqlDB. If you must re-partition, remember that the ordering guarantees that you previously had will be invalidated since ordering is only guaranteed within, not across, partitions, and this may affect the results of what logic you apply to these partitions. 

Kafka Streams provides the KStream.repartition method, which allows you to re-partition a stream. For example, consider you have two streams based on input topics that are widely used across your organization, so changing the number of partitions isn’t possible. But you can use Stream.repartition(Repartitioned.numberOfPartitions()), and the returned KStream will have the same number of partitions as the left side of your proposed join, making it eligible for a join. It’s important to note the re-partition method doesn’t affect the source topic. 

Ensuring co-partitioning in ksqlDB

Keying strategy

Let’s say you have a stream of customer addresses and a table with customer orders. You want to join these topics to create an enriched topic, but there’s a slight problem: the customer address stream has a key of VARCHAR customer name while the customer order table has a key of INT cust_id. As in Kafka Streams, the serialization format needs to be the same. When keys are different, it’s a case for re-keying, that is, creating a new stream partitioned by the key you want to create the join with. In this case, if the topic you want to make has six partitions, it might look like this: 

sql
CREATE STREAM customers_rekeyed 
  WITH (PARTITIONS=6) AS 
  SELECT * 
  FROM   customer_addresses 
  PARTITION BY cust_id;

After starting that operation, you can create a join between customers_rekeyed and the customer order table. 

Hashing method

Again, check that the producer clients feeding the underlying base streams use the same partitioning strategy regarding their hashing method. 

Number of partitions

If your partition number is imbalanced, you can re-partition using ksqlDB.  This sample command creates a new stream based on the one with the wrong number of partitions. 

CREATE STREAM repartitioned_stream WITH (KAFKA_TOPIC = 'topic', VALUE_FORMAT = 'JSON', PARTITIONS = 2, REPLICAS = 2) AS SELECT * FROM orginal_stream;

Confluent Developer offers a full tutorial

Where to go from here

So far, you’ve reviewed the criteria for co-partitioning and learned how to ensure your streams are co-partitioned in both Kafka Streams and ksqlDB. To learn more, you can check out these resources: 

Did you like this blog post? Share it now