[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

Deep Dive into Handling Consumer Fetch Requests: Kafka Producer and Consumer Internals, Part 4

Écrit par

Recap:

This is the last part of our four chapters:

It’s been a long time coming, but we’ve finally arrived at the fourth and final installment of our blog series. In this series, we’ve been peeling back the layers of Apache Kafka® to get a deeper understanding of how best to interact with the cluster using producer and consumer clients.

At a high level, a fetch request is comprised of two parts:

  1. Request metadata—this is the request configuration.

  2. Fetch request data—the actual data in the message.

Let’s dive in.

Handling the consumer fetch

On the broker

General request handling

Unsurprisingly, much of the fetch request handling process mirrors that of producer requests. In fact, the steps are identical up to the point where I/O threads access data on disk and send it back to the client.

To keep things concise, we won’t rehash the overlapping steps here. If you need a refresher, check out part 2 of this series where we covered the producer request process in detail.

Page cache and disk

This is the moment we’ve all been waiting for: the hobbit—our hero—finally faces the ultimate task of stealing the treasure... or rather, reading the data from disk.

Of course, the I/O threads handle this part. But before diving in, let’s refresh our memory on the storage fundamentals of Kafka.

Kafka topics are commit logs. On disk, these logs are divided into segments. Each segment is composed of a few files, but the two key ones are:

  1. .log file: This holds the actual event data—the raw bytes being stored.

  2. .index file: This is crucial as it maps record offsets to their positions within the .log file, enabling efficient lookups.

When the I/O threads parse a fetch request, they start by checking the offset for the target partition(s). Using that offset along with the .index file, they pinpoint the range of records to retrieve. Right away, they check if there's enough data to meet the fetch.min.bytes requirement specified in the request. If so, they calculate the data’s location and proceed with fetching it.

  • Note: You might wonder—where does fetch.max.bytes fit into all this, and why isn’t it a hard limit? Good question. When data is written to disk in the .log file, it’s serialized as a single chunk (a blob of bytes). The I/O threads won’t break up this chunk, nor are they responsible for deserializing the messages. So, if the requested data exists—even if the blob is larger than fetch.max.bytes—the broker will still return it so the consumer can continue processing. The same logic applies at the partition level with max.partition.fetch.bytes.

At this stage, the I/O threads have located the data to send back to the consumer. Now, one of the following things happens:

  1. If the segment files on disk contain the entire range of data, we're ready to craft a response. ✅

  2. If the data resides in tiered storage, the fetch request is handed off to the tiered fetch threads. 📦🔄

  3. If there’s not enough data to meet the minimum fetch requirements, the unfulfilled request is queued for... ⏳

Mordor Purgatory

Purgatory is a map-like data structure built on a hierarchical timing wheel where fetch requests are temporarily held. If the broker can’t immediately fulfill the request, it waits here until either:

  • The minimum data threshold (fetch.min.bytes) is met, or

  • The maximum wait time (fetch.max.wait.ms) expires.

Once one of these conditions is satisfied, the response is sent back to the consumer.

If a request ends up in purgatory, once either fetch.min.bytes or fetch.max.wait.ms is reached, the broker pulls it out and constructs a response with the appropriate data.

On the other hand, if a request can be fulfilled immediately, it bypasses purgatory entirely and heads straight to the response queue.

  • Note: If your consumers aren't receiving data during polling, it might be because there’s not enough data available to fulfill the request. Double-check your fetch.min.bytes setting to ensure it’s not set too high.

Socket send buffer

As noted earlier, there are similarities between handling producer and fetch requests. However, it’s important to highlight some interesting nuances in how data is sent back to the consumer client. Let’s dive into those differences.

So far, we’ve discussed how a fulfilled response is sent back to the consumer. However, even after identifying the data, it’s not directly included in the response object we built. Why? Because it would be highly inefficient for the broker to internally move large data payloads around.

Instead, to optimize performance, the event data is zero-copied—it goes straight from disk to the socket send buffer, minimizing overhead and speeding up data transfer.

Back to the consumer

Now, we’ve finally made it back to the consumer with a response packed full of data—or at least, that’s the goal. But hold on—we’re not done yet. In reality, the fun is just getting started! 

Deserialization

Remember, Kafka brokers deal exclusively in bytes. So, the event data we receive is just raw byte streams. 

The first step is to take that payload from the response and deserialize it, along with any associated keys. To make this work, you need to configure two critical settings: key.deserializer and value.deserializer. These configurations convert the raw bytes into usable data.

Poll and process

Once the data is deserialized, it’s finally time to apply the business logic you’ve implemented. You know that poll loop you wrote in your code? Yep, now’s when it kicks into action!

Fetching ≠ polling

So far, we’ve focused on the consumer fetch request, but here’s a surprise: not every call to consumer.poll() in your client code triggers a fetch request. 🕵️‍♂️

The process of fetching data from the broker and the act of polling are actually separate. While the poll loop can trigger a fetch request, once the consumer gets data, it caches those records. The consumer will continue polling from its cache until it’s empty. Only then will it issue a new fetch request to the broker. 

Configuring polling

Because consumers cache records, it’s useful to control how many are returned with each poll() call. Luckily, there are configurations for that!

The key setting here is max.poll.records, which caps the number of records fetched in a single poll (default is 500). But why limit this if there’s more data in the cache?

Setting a limit helps monitor consumer performance and keep track of its health. The poll() loop provides a proxy for whether the consumer is alive, along with session timeouts and heartbeats.

To manage this, we not only limit records per poll but also set a time limit for processing with max.poll.interval.ms (default is 5 minutes). If a consumer takes longer, it’s a sign it might be stuck—potentially hung up on some records or errors. If it misses heartbeats or exceeds the session timeout, the consumer group coordinator will revoke its partition assignment and remove it from the group.

Committing records

As we process fetched records, it’s crucial for the consumer to track what it has processed so that, if it restarts, it knows where to pick up. This is handled by committing offsets.

By default, offset commits are automatic when enable.auto.commit=true. In this mode, an offsets handler commits processed records every five seconds based on timestamps. You can adjust this interval with auto.commit.interval.ms.

However, keep in mind that this auto-commit happens solely on a time-based schedule. If you need to commit after a specific number of records, you'll need to use consumer.commit() manually. But beware—doing so can impact performance and is usually not recommended unless absolutely necessary.

Once offsets are committed, the consumer continues to process records from its cache and triggers new fetch requests as needed.

Wash, rinse, and repeat

As long as the consumer continues sending heartbeats to the consumer group coordinator and doesn’t time out, the group will keep running smoothly. And even if a consumer client fails, no worries—the partitions it was handling will automatically be rebalanced to another active instance, ensuring continuous processing.

The end

Wow, that was a lot of information to digest! But instead of feeling overwhelmed, think of it as unlocking a new level of understanding. 

Apache Kafka is a fascinating technology, and while it’s intricate, that complexity is what makes it so powerful—and honestly, pretty exciting to learn about! The fact that you’ve navigated the inner workings of producers and consumers means you’re now better equipped than ever.

The best part? You no longer have to treat Kafka like a mysterious black box. The next time something goes awry with your producer or consumer client, you’ll have the knowledge to diagnose and fix it confidently. 

If you’re curious to learn more about Apache Kafka and data streaming, head over to Confluent Developer and check out the dozens of free courses and tutorials we have available.

  • This blog was a collaborative effort between multiple Confluent employees.

Avez-vous aimé cet article de blog ? Partagez-le !