[Virtual Event] GenAI Streamposium: Learn to Build & Scale Real-Time GenAI Apps | Register Now

Guide to Consumer Offsets: Manual Control, Challenges, and the Innovations of KIP-1094

Written By

Consumer offsets are at the heart of Apache Kafka®'s robust data handling capabilities, as they determine how data is consumed, reprocessed, or skipped across topics and partitions. In this comprehensive guide, we delve into the intricacies of Kafka offsets, covering everything from the necessity of manual offset control to the nuanced challenges posed by offset management in distributed environments. We further explore the solutions and enhancements introduced by KIP-1094 (available in Kafka 4.0.0), offering a closer look at how it addresses these challenges by enabling more accurate and reliable offset and leader epoch information retrieval.

PART I: Key Principles and Foundations

Offsets

In Kafka, an offset represents the position of a message within a partition of a topic. It’s essentially a sequential ID number assigned to each message. A consumer uses offsets to track its progress when reading through a partition. When the consumer reads a message, it receives the message along with its offset and can then commit the next offset to indicate that every previous record with a smaller offset was successfully processed. For instance, if the message at offset 25 is read, offset 26 is then committed. It's important to note that the committed offset refers to the next offset the consumer intends to read, not the offset of the last successfully processed message. This committed offset is then used to resume consumption from the correct position in case the consumer restarts or a rebalance occurs within the consumer group. The committed offsets are stored in an internal topic called __consumer_offsets

The consumer does not commit every offset by default, which can significantly reduce the throughput of the consumer. Each commit operation involves a request to the broker, and if done for every message, it can lead to a high number of requests, causing increased latency and reduced performance. Therefore, an automatic commit policy is enabled by default, which commits offsets periodically at a set interval. This interval is controlled by the auto.commit.interval.ms property, which defaults to 5,000 milliseconds (5 seconds). However, this automatic commit can be disabled by setting the enable.auto.commit property to false. When auto-commit is disabled, the application code must manually commit offsets using the consumer’s commit API. This can be done synchronously or asynchronously, depending on the application's requirements.

Control Records vs Data Records

In addition to the normal read-process-write cycle, Kafka transactions provide a way to ensure exactly-once semantics (EOS). This means that data is processed without loss or duplication, maintaining consistency and accuracy. Kafka transactions allow atomic writes to multiple partitions, ensuring that either all messages in a transaction are successfully written or none are. To support this functionality, Kafka uses control records to mark the end of transactions. These records help ensure that all messages within a transaction are either fully committed or aborted. Control records also advance the offset, just like data records, although they’re not directly visible to applications. These control records are written to the same data logs as regular records, ensuring a seamless integration of transactional metadata with the message stream. 

Leader Epoch

Each Kafka partition has a designated leader, which is the broker responsible for handling all reads1 and writes for that partition. When a producer sends a message to a topic, it's sent to the leader of the relevant partition. Similarly, when a consumer reads from a topic, it reads from the leader of the partition. To maintain replication and ensure data durability, partitions can also have multiple followers. Followers replicate the leader's data by fetching updates and writing them to their own logs, staying synchronized with the leader. This replication protocol ensures high availability and fault tolerance in Kafka's distributed architecture. If a leader fails, one of the followers is chosen as the new leader. This process ensures that there is always a leader available to handle client requests, maintaining the availability of the partition.

The leader epoch is a monotonically increasing number that represents a continuous period of leadership for a single partition. This helps maintain a consistent view of the data across different replicas. When a new leader is elected, it increments the leader epoch, ensuring that all followers can identify the latest leader and synchronize their logs accordingly. Figure 1 illustrates that when Broker 1 has the leader partition for Pi, the leader epoch is 1. After failure of Broker 1, Pi on Broker 2 is elected as the leader partition, and the leader epoch is incremented.

Figure 1: Leader epoch

PART II: In-Depth Analysis and Insights

Committing Leader Epoch 

The consumer can (and should) optionally commit the leader epoch along with the offset to ensure consistency and accuracy in offset management. The leader epoch is stored in the __consumer_offsets topic as well. It’s a crucial piece of metadata that helps identify the correct leader of a partition at the time the offset was committed. This is particularly important during a leader change and consumer group rebalances.

When a partition's leader changes (due to broker failure, broker restart, or manual reassignment), the Kafka broker updates the metadata to reflect the new leader for the affected partitions. Consumers in a consumer group rely on this metadata to know which broker to contact for fetching data. Upon detecting a leader change, consumers will receive an error (either LEADER_NOT_AVAILABLE or NOT_LEADER_FOR_PARTITION) when attempting to fetch data from the old leader (which might have recovered in the meantime and rejoined as a follower). This triggers the consumer to refresh its metadata by contacting the Kafka brokers, ensuring that it retrieves the updated information about the new leader. This process helps maintain data availability and ensures that consumers redirect their requests to the correct leader partition.

Figure 2A: Phase 1

However, things don’t always go so smoothly. In a complex, multi-layered distributed system where applications are stacked on top of each other, unexpected surprises are inevitable. In some cases, a consumer might not realize that it needs to refresh its metadata. To illustrate this more clearly, let’s look at the following example scenario, also shown in Figure 2A, Figure 2B, and Figure 2C.

  • Phase 1: In a consumer group consisting of Consumer A and Consumer B, partition Pi is assigned to Consumer A. As shown in Figure 2A, the partition leader is located on Broker 1, while a replica (follower) resides on Broker 2.

    Figure 2B: Phase 2

  • Phase 2:

    • As illustrated in Figure 2B, Broker 1 fails, triggering a leader change for partition Pi. Consequently, the partition on Broker 2 gets the leader role, and the leader epoch is bumped to 2. 

    • When Consumer A attempts to fetch data, it may initially receive a LEADER_NOT_AVAILABLE error, prompting a metadata refresh. After this, Consumer A updates its metadata, including the latest leader epoch for partition Pi.

    • After updating its metadata, Consumer A continues fetching messages from partition Pi starting from offset Xi with leader epoch 2. It then commits the next offset Xj but without including the leader epoch, since committing the leader epoch is optional for the consumer, as previously mentioned.

      Figure 2C: Phase 3

  • Phase 3: 

    • As shown in Figure 2C, Broker 1 recovers. Trying to resume the role of leader for partition Pi results in making it a zombie leader2.

    • After a consumer group rebalance, partition Pi is reassigned to Consumer B. However, due to network delays or metadata propagation issues, Consumer B retains a stale leader epoch for Pi, meaning it never refreshed its metadata after the rebalance.

    • When Consumer B attempts to fetch the message in offset Xj from the former leader (using the stale leader epoch), issues arise. The zombie accepts Consumer B's fetch request and responds with an OFFSET_OUT_OF_RANGE error, as it has not received any new messages from the producer (Xi+1 < Xj).

While this may seem like a rare edge case, it has been observed in real-world scenarios, making it a genuine issue that needs to be addressed. (See KAFKA-16248; it’s the law of distributed systems that if it can happen, it’s only a question of when it will happen.) If, during Phase 1 and particularly Phase 2, Consumer A had committed the leader epoch along with the next offset (as shown in Figure 3), then when partition Pi was assigned to Consumer B, it would have been forced to refresh its metadata. This is because the leader epoch of the committed offset Xj would be greater than the zombie leader’s epoch (2 > 1). This metadata refresh would ensure that Broker 2 is recognized as the current leader, and Consumer B would send the fetch request to the correct broker, avoiding the OFFSET_OUT_OF_RANGE error.

Figure 3: Committing the next offset along with the corresponding leader epoch

This highlights that if a consumer is aware of the current leader epoch along with the offset it needs to fetch, it can easily detect when its metadata is outdated and refresh it accordingly. This makes committing the leader epoch alongside the next offset both necessary and beneficial. In other words, when a consumer commits an offset along with its leader epoch, it helps other consumers in the group recognize and ignore stale metadata. If all consumers in the group follow this practice, a consumer with outdated metadata can use the committed leader epoch as a reference to trigger a metadata refresh, preventing it from fetching from an outdated leader.

Committing Specific Offsets

Manual offset committing in Kafka allows the users to have finer control over when offsets are committed, as opposed to automatic offset committing, which happens periodically. To commit offsets manually, commitSync or commitAsync methods are provided by the Kafka consumer API. The commitSync method blocks until the commit request is acknowledged by the broker, ensuring that the commit has been successful before proceeding. This can be useful for ensuring reliability, but it may reduce overall throughput since the consumer might be idle while waiting for the commit to complete. On the other hand, the commitAsync method allows the consumer to continue processing records while the commit request is being handled in the background. This can improve throughput but introduces the risk of commit failures, which need to be handled appropriately. If a commit fails, one can use a callback to retry the commit or handle the failure in another way. For example, one valid strategy could be to ignore a single failed commit and rely on the next commit to succeed. If a failure occurs in between, only a few additional records would need to be reprocessed, which remains acceptable under at-least-once semantics (ALOS), ensuring that each record is processed at least once. However, completely disregarding all commit errors is not recommended. There should be a threshold for the number of consecutive commit errors the application can safely ignore before it becomes a potential problem.

Both commitSync() and commitAsync() commit the offset based on the last poll call. The offsets committed using these APIs will be used on the first fetch after every rebalance and also on startup, although developers may occasionally want greater control by committing more or less frequently, i.e., in smaller or larger “batches” than the intervals provided by poll. In such cases, a map of partitions to offset metadata can be passed to the commitSync and commitAsync APIs to manage offsets explicitly. Below is a sample code snippet in which we commit after processing every message. This is merely a demonstration of the commitSync API. However, in practice, committing every single offset is not advisable, as it can significantly reduce consumer throughput, increase latency, and degrade performance.

public static void main(String[] args) {
   Properties props = new Properties();
   props.setProperty("bootstrap.servers", "localhost:9092");
   props.setProperty("group.id", "test");
   props.setProperty("enable.auto.commit", "false");

   KafkaConsumer<Integer, String> consumer =
      new KafkaConsumer<>(props, new IntegerDeserializer(), new StringDeserializer());
   consumer.subscribe(List.of("manual-topic"));
   Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();

   while (true) {
      ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<Integer, String> record : records) {
         long offset = record.offset();
         System.out.println("Received message: (" 
            + record.key() + ", " + record.value() 
            + ") at offset " + offset);

         offsetAndMetadataMap.put(new TopicPartition(record.topic(),
            record.partition()), new OffsetAndMetadata(offset + 1));

         consumer.commitSync(offsetAndMetadataMap);
      }
   }
}

Besides commitSync and commitAsync, precise offset management is essential in applications requiring EOS. The application needs to commit offsets corresponding to successfully processed transactions using the KafkaProducer to ensure that each record is processed exactly once. As illustrated in the following code snippet, after processing the records, the offset of the next record for each topic partition is sent to the transaction, and the producer commits them within the ongoing transaction. If an error occurs during processing, the transaction is aborted to ensure that there is no partial processing. When a transaction is aborted, the corresponding offsets are not committed either.

public static void main(String[] args) {
   KafkaProducer producer = new KafkaProducer(new Properties());
   TopicPartition tp = null;
   long offset = 0;
   try {
      // ... begin transaction
      producer.beginTransaction();
      // ... read from input topic
      // ... transform
      // ... write to output topic
      // ... commit offsets
      producer.sendOffsetsToTransaction(
         Collections.singletonMap(tp, 
         new OffsetAndMetadata(offset + 1)), "consumer-group-id");
      // ... commit transaction
      producer.commitTransaction();
    } catch (Exception e) {
      // ... abort transaction
      producer.abortTransaction();
    }
}

How to Know Which Offset to Commit

In all these cases, the desired offset to be committed is in fact the offset of the next record to be processed, ideally combined with the leader epoch for enhanced accuracy and consistency. There are several methods to retrieve the next record offset:

1) Consumer poll

Following common wisdom, the offset of the last record returned by poll + 1 (like the above examples) can be considered the next offset to be processed. However, this isn't always accurate since poll returns data records, but control records also advance the offset. Therefore, relying solely on the last record's offset from poll might not reflect the true next offset. 

Figure 4 illustrates that if the last message returned by the poll is followed by a control record, then the offset of that message + 1 would refer to a control record. This is not the accurate offset to be committed, since by definition the committed offset must be the offset of the next record to be consumed.

Figure 4: Control records in partition

An issue arises when the upstream producer stops writing new records or has very low throughput; then the reported lag3 does not go to zero. This is because if the last message or messages are control messages, as shown in Figure 5, the consumer's position steps over these markers (control records). However, no new data is returned after them. In other words, when no commits happen after these markers, it causes the lag to remain at the count of these control messages, which can be confusing to users (see ).

Figure 5: Non-zero lag due to control messages at the end of partition

2) Consumer position 

By definition, position retrieves the offset of the next record to be fetched for a given TopicPartition (if a record with that offset exists), although the primary limitation of position is that it only returns the next offset without including additional metadata, such as the leader epoch. As we’ve explained, the leader epoch is essential for offset commits to ensure that all consumers in the consumer group share coherent and consistent metadata before fetching. We discussed this scenario in greater detail earlier in this post.

By default, the Kafka consumer maintains an in-memory position and returns it when requested. In certain scenarios, the position may not exist, although even if the consumer has a position and detects new metadata, it should validate that all partitions have a valid position. This ensures that there is no log truncation, meaning the position should be less than the end offset (position < end offset). If the position is invalid or not present due to some issue, the consumer must fetch the position from the broker and validate it. If the consumer cannot validate the existing position with the new leader or retrieve and validate a new position, a TimeoutException may occur. This sequence of events can lead to significant problems, especially if the application does not handle the TimeoutException gracefully.

Conclusion:

  • The Consumer poll returns only data records. Therefore, the last fetched record’s offset + 1 is not necessarily the next offset since it may refer to a control record. Committing the offset of a control record may end up non-zero lag in some scenarios.

  • The consumer position can’t fully handle the case since it returns only the next offset and no other metadata such as leader epoch. Moreover, by using position, TimeoutException must be considered and handled gracefully. Additionally, the OFFSET_OUT_OF_RANGE may occur if a leader election happens in parallel to a rebalance and there is a zombie leader.

  • Dealing with distributed system edge cases is tricky, messy, and full of surprises. That’s why we need a rock-solid solution to commit the correct offset of the next data record along with its trusty leader epoch metadata. Say hello to KIP-1094!

PART III: KIP-1094: “Add a new constructor method with nextOffsets to ConsumerRecords

KIP-1094 addresses the need for a more accurate and reliable way to manage consumer offsets in Kafka. The primary motivation behind this KIP is to expose the next offset to be fetched after an invocation of Consumer poll. This is achieved by introducing a new constructor method with nextOffsets to the ConsumerRecords class. The new constructor initializes the next offsets as well as the leader epoch wrapped in an OffsetAndMetadata object.

// the current constructor
@Deprecated
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records)

public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records, Map<TopicPartition, OffsetAndMetadata> nextOffsets) {}

/**
* Get the next offsets that the consumer will consume.
*/
public Map<TopicPartition, OffsetAndMetadata> nextOffsets();

This enhancement ensures that the correct next offset, along with the correct leader epoch, is provided directly by the records returned from Consumer poll. The new constructor method with nextOffsets also deprecates the current ConsumerRecords constructor, which does not include the next offsets. This change aims to prevent potential bugs in the future and improve the overall functionality of the consumer API. 

The first rejected alternative was caching the leader epoch offset ranges inside the consumer. This approach was deemed insufficient because it’s the producer that commits the offset in the case of EOS. Therefore, relying on the consumer to cache these ranges would not provide the necessary accuracy and reliability for offset management.

The second rejected alternative was adding the new method Consumer.positionWithMetadata, which would complicate the consumer interface. Additionally, the method would suffer from a race condition in which the position might change by the time the application wants to commit the offset, leading to incorrect offset management. This complexity and potential for errors led to the rejection of this approach.

Overall, KIP-1094 significantly improves how offsets are managed in Kafka. By exposing the next offset and leader epoch, it allows for more precise and reliable offset commits, reducing the likelihood of errors and improving the consistency of consumer operations. This enhances the robustness of the Kafka consumer API and ensures better handling of offsets in various scenarios.

As illustrated in the following code snippet with KIP-1094, we can retrieve the correct offset of the next data record along with the leader epoch from ConsumerRecords returned by poll and commit it through commitSync. The same approach can be used for any other method that needs manual specific offset committing.

public static void main(String[] args) {
   Properties props = new Properties();
   props.setProperty("bootstrap.servers", "localhost:9092");
   props.setProperty("group.id", "test");
   props.setProperty("enable.auto.commit", "false");

   KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>
      (props, new IntegerDeserializer(), new StringDeserializer());

   consumer.subscribe(List.of("manual-topic"));
   Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();

   while (true) {
      ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));

      for (ConsumerRecord<Integer, String> record : records) {
         System.out.println("Received message: (" 
            + record.key() + ", " + record.value() + ") at offset " + record.offset());

         TopicPartition recordTopicPartition = 
            new TopicPartition (record.topic(), record.partition());

         OffsetAndMetadata recordOffsetAndMetadata =        
            records.nextOffsets().get(recordTopicPartition);

         offsetAndMetadataMap.put(recordTopicPartition, recordOffsetAndMetadata);
         consumer.commitSync(offsetAndMetadataMap);
      }
  }
}

Conclusion

Managing consumer offsets in Kafka is a critical aspect of ensuring data consistency, reliability, and performance—and it’s surprisingly complex. In this guide, we’ve explored the fundamental principles of offsets, the challenges associated with manual offset control, and the innovations introduced by KIP-1094. By understanding the importance of committing offsets along with leader epochs, we can avoid issues related to stale metadata and ensure accurate data processing. The enhancements provided by KIP-1094, such as the new constructor method with nextOffsets in the ConsumerRecords class, offer a more precise and reliable way to manage offsets, ultimately improving the robustness of Kafka's consumer API. These advancements are essential for maintaining the integrity of data streams and ensuring seamless operation in distributed environments.

‎ 

References:

  1. "Fetch from Follower" (FFF) is a feature in Kafka 2.3+ (KIP-392) that allows consumers to fetch data from the nearest replica instead of the leader. This reduces cross-availability zone (AZ) data transfer costs. To enable FFF, configure the client.rack property on the client to match the broker.rack property of the broker in the same AZ.

  2. A "zombie leader" in the context of Kafka refers to a leader that continues to operate and attempt to perform its duties despite having lost its leadership status due to network partitions or other issues.

  3. In the context of Kafka, "lag" refers to the difference between the latest offset of a partition and the current offset of the consumer group. Essentially, it measures how far behind the consumer is in processing the messages produced to the topic.

‎ 

Apache®, Apache Kafka®, and Kafka® are registered trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by the use of these marks. All other trademarks are the property of their respective owners.

  • Alieh Saeedi is a Senior Software Engineer on the Cloud Applications team at Confluent. She joined Confluent after earning her doctorate in informatics with a focus on data integration. Currently, she works on building new systems for improving Kafka Streams.

Did you like this blog post? Share it now