[Webinar] Bringing Flink to On-Prem and Private Clouds. Register Now
In a previous blog post (How To Survive an Apache Kafka® Outage) I outlined the effects on applications during partial or total Kafka cluster outages and proposed some architectural strategies to handle these types of service interruptions. The applications most heavily impacted by this type of outage are external interfaces that receive data, do not control request flow, and possibly perform some form of business transaction with the outside world before producing to Kafka. These applications are most commonly found in finance and written in languages other than Java—mostly C and C++.
librdkafka is the main underlying client library used in non-JVM environments and has wrapper libraries for Python, .Net, Go, and an ever-expanding list of clients. It has not been written about to the same extent as the Java client, and it is worth examining as its interface and underlying mechanics are fundamentally different.
In this blog post, I will go into the details of message production with librdkafka to help you build a mental model of how it works and how to interact with it correctly. We’ll look at the code used to produce messages and process replies, the data structures used, and the workings of the three main thread types involved in sending a message to Kafka. After reading this, you will understand how to write code that produces messages via this library, how it will behave during error situations, and how your application should detect and respond to them.
Let’s look at librdkafka from the point of view of its underlying C implementation. A C++ layer (rdkafkacpp.h
) wraps this base library in object-oriented constructs; this is easy to understand once you have reviewed the C code. We will use the producer.c, the direct librdkafka client, example from the library’s repository as a starting point and drill into the details. You can compare and contrast with the C++ equivalent by looking at the producer.cpp, .Net client, example.
The C library has been written in a style that should be generally readable to someone from an object-oriented background. The first parameter of most functions is the data structure being operated on. So the function doSomething(thing, paramA, paramB)
can typically be read as thing.doSomething(paramA, paramB)
.
To send a message to an application using the librdkafka C library, the following high-level steps need to occur within the code:
Create an instance of a config struct:
Set a delivery report callback on the config struct that will be called when a message is successfully sent or if sending fails (we will look at this in detail later):
Create an instance of a producer:
Create a message, and subsequently set it with a payload:
Send a message:
Poll the delivery reply queue:
The relationship between the data structures used in the above code is as follows:
A rd_kafka_t
struct is your main entry point to the library. Unlike in Java, which has classes dedicated to production and consumption, this structure can be configured to do either.
The rd_kafka_conf_t
struct contains the configuration for how the library should behave. It contains a pointer to a callback function (dr_msg_cb
).
The callback function is invoked after each message has been processed. It takes the following arguments:
The rd_kafka_t
struct used to send the message.
The message that was sent.
A pointer to an arbitrary application data structure known as the “opaque”. This pointer is set by your application when you invoke the produce function. It allows you to modify the state of the surrounding application when a message is successfully sent or fails to send.
There are three different thread types involved in producing a message to Kafka. Their responsibilities are as follows:
The application thread: This thread performs partitioning where applicable and enqueues sent messages onto librdkafka’s internal queues for processing. From within the application logic, it periodically polls for responses from the library and executes the callback handler for each message response. This polling is critical for librdkafka operations and is covered in detail later in the memory management section.
The main thread: This internal thread performs three tasks:
Periodically checks the Kafka brokers it knows about for metadata around which partition leaders reside on which brokers.
Creates a broker thread to handle communications for each broker.
Partitions messages that were sent but not assigned a partition and enqueues them for processing.
The broker thread: One of these exists for each broker the library is responsible for communicating with. This thread is responsible for consuming enqueued messages corresponding to its broker and handling all socket-level communications. It communicates success or failure to the application thread by writing to a response queue.
Let’s examine the work of these threads in closer detail.
When you send a message via rd_kafka_producev()
or the more general rd_kafka_produce()
API, the application thread executes the following logic.
Each topic partition you produce to has a dedicated queue data structure associated with the rd_kafka_t
instance, known as a partition queue. Librdkafka will attempt to place your sent message onto a partition queue if you explicitly named one instead of a topic or after it runs the partitioner to work out which partition the message should be sent to. If the library determines at any point that a partition queue does not exist for this partition, then the message will be placed onto a UA (unassigned) partition queue. The main thread will read the message and create the necessary partition queue to re-queue it for processing.
The UA and partition queues are a boundary between the application and internal threads communicating with the Kafka cluster. A mutex controls access to each queue. The competing threads must gain exclusive access to the mutex to read or write to these queues. This is the only contention point between threads, and the library’s design ensures that the mutexes are held as briefly as possible.
librdkafka’s main thread processes messages from the UA partition queue in a loop. This thread performs the following tasks:
Gathers metadata about the Kafka cluster and the location of partitions to which the messages are sent. The thread updates an in-memory map of which partitions have leaders on which brokers. Broker threads use this map to determine the partition queues from which they should read messages.
Creates partition queues for each topic partition onto which messages are enqueued and from which broker threads read.
Run the partitioner for unassigned messages and place them in the appropriate partition queue.
The main thread creates a broker thread for each broker in the cluster. This newly created thread is responsible for sending messages to only that broker. It continually polls the partition queues for which its broker is the current leader via information it obtains from the broker-partition map.
Once the broker thread extracts messages from these partition queues, it batches and sends them to the broker. Replies, errors, and timeouts for the message are placed in a reply queue. The mechanics are as follows:
Each broker thread performs the following steps in a loop:
Determines which partitions the thread is responsible for polling. This is done by checking the broker-partition mapping managed by the main thread. Over time, the partitions for which the broker is the leader will change as leadership rebalances through the Kafka cluster.
Extracts a batch of messages out of the corresponding partition queues. The thread will hold the corresponding mutexes until the extract operation is complete or the period defined by linger.ms
has passed.
The thread writes the extracted messages to per-partition transmit queues (xmit_msgq
), and immediately polls from the same queues to retrieve a batch of messages to send. The reason behind this behavior is that it captures failed messages from previous sends. These internal queues have no mutexes on them as only this thread will have access to them—they are an area of working memory on which slower operations are performed.
Serializes the messages into a MessageSet and ProduceRequest. These are data structures of the Kafka protocol.
Sends the request via the network socket to the broker. The serialized request is placed onto an Outbuf queue, from which the socket abstraction reads it.
Places the message batch onto a wait response queue.
Polls the socket for broker responses. The response that it retrieves may be for messages sent in previous sends.
Correlates the broker response to the message batch in the wait response queue.
If a retryable error occurs in processing, place the message batch back onto the transmit queues. Otherwise, enqueue the broker response and the message batch onto the reply queue for the application thread to process.
It is essential to note the following:
A mutex controls access to each queue written and read to from multiple threads (application thread, main thread, and broker threads).
Messages are extracted from the partition queues and written to per-partition transmit queues. These are buffers local to the broker thread for the subsequent slower operations. This act of copying is done to hold the mutex for as short an amount of time as possible.
All other queues are accessed by one broker thread only and are not locked.
librdkafka provides statistics (librdkafka’s equivalent of JMX) around each internal queue, so it is possible to see what happens within a producer instance at a fine-grained level. The library has several Python tools available to help you understand the statistics.
The application thread is responsible for continuously polling the reply queue for events through the rd_kafka_poll()
function. This poll must be explicitly performed, or callbacks will not be executed, and internal memory buffers will overflow.
Each call to rd_kafka_poll()
triggers the following logic:
Fetch a set of replies from the reply queue, this is where all responses from the broker threads are sent. The reply queue contains multiple different types of replies, only one of which is a message reply.
Loop through each reply and invoke the associated callback handler. Each reply has an operation type associated with it, which results in a different callback handler being triggered:
A delivery report message (RD_KAFKA_OP_DR
) will trigger the dr_msg_cb
callback on the rd_kafka_conf_t
struct.
This handler is registered by calling rd_kafka_conf_set_dr_msg_cb()
.
A statistics message (RD_KAFKA_OP_STATS
) will trigger the stats_cb
callback on the rd_kafka_conf_t
struct.
This handler is registered by calling rd_kafka_conf_set_stats_cb()
(See rdkafka_performance.c
for example usage). It is only invoked if statistics.interval.ms
is configured to a non-zero value.
The delivery report callback handler has the following C function signature:
The struct rd_kafka_message_t
defines the following fields that may be used when processing replies:
rd_kafka_resp_err_t err
: Non-zero implies that there has been a permanent error in processing the message.
void *payload
: This is the original payload. In the case of a consumer, if there was an error then this pointer corresponds to the error string, else the original message.
size_t len
: If error, this is the length of the error string, else the length of the original payload.
The opaque parameter is a pointer passed to the rd_kafka_producev()
call. It allows an application to attach a hook back to an arbitrary data structure which can be then be retrieved from the delivery report (e.g., the original application object correlating to the sent message).
Librdkafka defines several error codes that represent various timeouts:
ERR_REQUEST_TIMED_OUT
: The broker failed to write the message within the allotted time.
ERR__MSG_TIMED_OUT_QUEUE
: The message timed out in the producer queue (typically due to the leader not being available).
ERR__MSG_TIMED_OUT
: The message timed out while in-flight to the broker.
Error codes are for informative purposes only, and applications should not decide how to proceed based on them. You can read more details on error codes on GitHub.
You can print out the details of an error by invoking rd_kafka_err2str()
.
To work out how a message reply ought to be treated, the message callback should invoke the function:
The rd_kafka_msg_status_t
enum defines the following constants:
RD_KAFKA_MSG_STATUS_NOT_PERSISTED
: The message was never transmitted to the broker, or failed with an error indicating it was not written to the log.
Application retry risks consistent ordering but not duplication.
This is the message status that will be returned for timed-out messages.
RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED
: The message was transmitted to the broker, but no acknowledgment was received.
Application retry risks consistent ordering and duplication of the payload on the Kafka topic.
RD_KAFKA_MSG_STATUS_PERSISTED
: The message was written to the log and acknowledged by the broker. There is no reason for the application to retry.
Note: this value should only be trusted with acks=all
, since it indicates that an acknowledgment was received from the leader broker. Where acks=1
, it is entirely possible to have received this response and to lose the message should the broker terminate before the operating system can flush the page cache to disk or the broker has replicated the message to the ISR set.
Retrying a send from the application when sending has already failed is a bad idea. librdkafka contains retry logic that preserves message ordering and prevents duplication through idempotent production. My previous post gives you some design options for handling this situation.
Librdkafka holds on to a message in memory until the reply related to its processing is fetched from the reply queue and the delivery report callback is triggered. To not overrun the system's memory, a pair of configuration settings prevent an application thread from sending in too much data. These are:
queue.buffering.max.message
: Default 100,000
queue.buffering.max.kbytes
: Default 1,048,576 (1GB); this is the higher priority setting of the two
If either limit is reached, calls to rd_kafka_producev()
will return RD_KAFKA_RESP_ERR__QUEUE_FULL
enumerated by rd_kafka_resp_err_t
(Kafka broker errors are represented by positive numbers. Internal errors are negative). The application should invoke rd_kafka_poll()
, and attempt to resend the message.
Since messages are not evicted from memory unless their reply is processed, the application thread must regularly poll the reply queue.
In this post, I discussed the details of message production with librdkafka. We looked at the code used to produce messages and process replies, the data structures used, and the workings of the three main thread types involved in sending a message to Kafka. You should now have a mental model of how sends work in non-Java language environments. You should also be better able to reason about behavior during error situations and how your application should detect and respond to them.
If you want to discuss reliability with people who deal with it daily, please contact Confluent Professional Services.
There is a class of applications that cannot afford to be unavailable—for example, external-facing entry points into your organization. Typically, anything your customers interact with directly cannot go down. As […]
When developing streaming applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers.