Prädiktives maschinelles Lernen entwickeln, mit Flink | Workshop am 18. Dezember | Jetzt registrieren
Recap:
This is the last part of our four chapters:
How Producers Work: See what the producer does behind the scenes to help prepare your raw event data for the broker.
Handling the Producer Request: Learn how your data goes from the producing client all the way to disk on the broker.
Preparing the Consumer Fetch: See how consumers issue fetch requests.
Handling the Consumer Fetch: You are here!
It’s been a long time coming, but we’ve finally arrived at the fourth and final installment of our blog series. In this series, we’ve been peeling back the layers of Apache Kafka® to get a deeper understanding of how best to interact with the cluster using producer and consumer clients.
At a high level, a fetch request is comprised of two parts:
Request metadata—this is the request configuration.
Fetch request data—the actual data in the message.
Let’s dive in.
Unsurprisingly, much of the fetch request handling process mirrors that of producer requests. In fact, the steps are identical up to the point where I/O threads access data on disk and send it back to the client.
To keep things concise, we won’t rehash the overlapping steps here. If you need a refresher, check out part 2 of this series where we covered the producer request process in detail.
This is the moment we’ve all been waiting for: the hobbit—our hero—finally faces the ultimate task of stealing the treasure... or rather, reading the data from disk.
Of course, the I/O threads handle this part. But before diving in, let’s refresh our memory on the storage fundamentals of Kafka.
Kafka topics are commit logs. On disk, these logs are divided into segments. Each segment is composed of a few files, but the two key ones are:
.log file: This holds the actual event data—the raw bytes being stored.
.index file: This is crucial as it maps record offsets to their positions within the .log
file, enabling efficient lookups.
When the I/O threads parse a fetch request, they start by checking the offset for the target partition(s). Using that offset along with the .index
file, they pinpoint the range of records to retrieve. Right away, they check if there's enough data to meet the fetch.min.bytes
requirement specified in the request. If so, they calculate the data’s location and proceed with fetching it.
Note: You might wonder—where does fetch.max.bytes
fit into all this, and why isn’t it a hard limit? Good question.
When data is written to disk in the .log
file, it’s serialized as a single chunk (a blob of bytes). The I/O threads won’t break up this chunk, nor are they responsible for deserializing the messages. So, if the requested data exists—even if the blob is larger than fetch.max.bytes
—the broker will still return it so the consumer can continue processing. The same logic applies at the partition level with max.partition.fetch.bytes
.
At this stage, the I/O threads have located the data to send back to the consumer. Now, one of the following things happens:
If the segment files on disk contain the entire range of data, we're ready to craft a response. ✅
If the data resides in tiered storage, the fetch request is handed off to the tiered fetch threads. 📦🔄
If there’s not enough data to meet the minimum fetch requirements, the unfulfilled request is queued for... ⏳
Purgatory is a map-like data structure built on a hierarchical timing wheel where fetch requests are temporarily held. If the broker can’t immediately fulfill the request, it waits here until either:
The minimum data threshold (fetch.min.bytes
) is met, or
The maximum wait time (fetch.max.wait.ms
) expires.
Once one of these conditions is satisfied, the response is sent back to the consumer.
If a request ends up in purgatory, once either fetch.min.bytes
or fetch.max.wait.ms
is reached, the broker pulls it out and constructs a response with the appropriate data.
On the other hand, if a request can be fulfilled immediately, it bypasses purgatory entirely and heads straight to the response queue.
Note: If your consumers aren't receiving data during polling, it might be because there’s not enough data available to fulfill the request. Double-check your fetch.min.bytes
setting to ensure it’s not set too high.
As noted earlier, there are similarities between handling producer and fetch requests. However, it’s important to highlight some interesting nuances in how data is sent back to the consumer client. Let’s dive into those differences.
So far, we’ve discussed how a fulfilled response is sent back to the consumer. However, even after identifying the data, it’s not directly included in the response object we built. Why? Because it would be highly inefficient for the broker to internally move large data payloads around.
Instead, to optimize performance, the event data is zero-copied—it goes straight from disk to the socket send buffer, minimizing overhead and speeding up data transfer.
Now, we’ve finally made it back to the consumer with a response packed full of data—or at least, that’s the goal. But hold on—we’re not done yet. In reality, the fun is just getting started!
Remember, Kafka brokers deal exclusively in bytes. So, the event data we receive is just raw byte streams.
The first step is to take that payload from the response and deserialize it, along with any associated keys. To make this work, you need to configure two critical settings: key.deserializer
and value.deserializer
. These configurations convert the raw bytes into usable data.
Once the data is deserialized, it’s finally time to apply the business logic you’ve implemented. You know that poll loop you wrote in your code? Yep, now’s when it kicks into action!
So far, we’ve focused on the consumer fetch request, but here’s a surprise: not every call to consumer.poll()
in your client code triggers a fetch request. 🕵️♂️
The process of fetching data from the broker and the act of polling are actually separate. While the poll loop can trigger a fetch request, once the consumer gets data, it caches those records. The consumer will continue polling from its cache until it’s empty. Only then will it issue a new fetch request to the broker.
Because consumers cache records, it’s useful to control how many are returned with each poll()
call. Luckily, there are configurations for that!
The key setting here is max.poll.records
, which caps the number of records fetched in a single poll (default is 500). But why limit this if there’s more data in the cache?
Setting a limit helps monitor consumer performance and keep track of its health. The poll()
loop provides a proxy for whether the consumer is alive, along with session timeouts and heartbeats.
To manage this, we not only limit records per poll but also set a time limit for processing with max.poll.interval.ms
(default is 5 minutes). If a consumer takes longer, it’s a sign it might be stuck—potentially hung up on some records or errors. If it misses heartbeats or exceeds the session timeout, the consumer group coordinator will revoke its partition assignment and remove it from the group.
As we process fetched records, it’s crucial for the consumer to track what it has processed so that, if it restarts, it knows where to pick up. This is handled by committing offsets.
By default, offset commits are automatic when enable.auto.commit=true
. In this mode, an offsets handler commits processed records every five seconds based on timestamps. You can adjust this interval with auto.commit.interval.ms
.
However, keep in mind that this auto-commit happens solely on a time-based schedule. If you need to commit after a specific number of records, you'll need to use consumer.commit()
manually. But beware—doing so can impact performance and is usually not recommended unless absolutely necessary.
Once offsets are committed, the consumer continues to process records from its cache and triggers new fetch requests as needed.
As long as the consumer continues sending heartbeats to the consumer group coordinator and doesn’t time out, the group will keep running smoothly. And even if a consumer client fails, no worries—the partitions it was handling will automatically be rebalanced to another active instance, ensuring continuous processing.
Wow, that was a lot of information to digest! But instead of feeling overwhelmed, think of it as unlocking a new level of understanding.
Apache Kafka is a fascinating technology, and while it’s intricate, that complexity is what makes it so powerful—and honestly, pretty exciting to learn about! The fact that you’ve navigated the inner workings of producers and consumers means you’re now better equipped than ever.
The best part? You no longer have to treat Kafka like a mysterious black box. The next time something goes awry with your producer or consumer client, you’ll have the knowledge to diagnose and fix it confidently.
If you’re curious to learn more about Apache Kafka and data streaming, head over to Confluent Developer and check out the dozens of free courses and tutorials we have available.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.
In this post, the second in the Kafka Producer and Consumer Internals Series, we follow our brave hero—a well-formed produce request—which is on its way to be processed by the broker and have its data stored on the cluster.