Kafka Series II: Kafka Producers
Preview
This article has been long coming, I started it in July, but never say never. I have seen many online articles showcasing Kafka usage, but I decided a different approach, in taking a look at the internal code that drives Kafka. Please take a look at this to get a high-level overview of Kafka here, this article will technically deeper into the actual Kafka Producer client code.
My medium search for Kafka internals in vain. One of my reason to look into the code itself to understand what’s going on.
Kafka Producers
As a refresher, in a criminally small overview, Kafka has 3 roles (Producers, Brokers, and Consumers), where they share messages with each other, similar to a Pub-sub.
Kafka Producers → Kafka Brokers → Kafka Consumers
Since we got the basics out of the way, let’s jump into how Kafka Producers send messages.
If you want to parallely look through the code and read the document. Some details are below.
Targeted Kafka Package name - org.apache.kafka.clients.producer;
Check here for github of the package.
Producer Usage
Before, we start taking a look at Kafka producer client code. Let us take a look at how a producer uses the Kafka client library
Below is a demo of how we are producing a message to a Kafka topic. In this case, the message with key "hello_key1" and value "Hello, world! value" is sent to the topic "my-topic".
// This class provides a practical demonstration of how to send a single message to Kafka.
public class KafkaProducerDemo {
public static void main(String[] args) {
// 2. Create a Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
// Create a producer record
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "hello_key1", "Hello, world! value");
// Send the record to Kafka
producer.send(record);
// Close the producer
producer.close();
}
// WIll talk about these properties and how they determine the message flow later in the document :)
private static Properties getProducerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put(ACKS_CONFIG, "-1");
properties.put(RETRIES_CONFIG, "3");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
Kafka Producer Thread Model
In the process of sending the above message, the Kafka library internally has two threads working together. Main thread and Sender thread. Main Thread is responsible for processing the data, validating the location and then temporarily storing it (RecordAccumlator). The Sender thread's only responsibility is to transmit the temporarily stored data
Main Thread appending to RecordAccumulator
Sender Thread
Main Thread Workflow
What is RecordAccumulator?
The Main thread creates a container, temporarily referred to as the RecordAccumulator. Its default size is 32MB and it serves as a buffer.
Inside the buffer is a container, batches designed to hold the data to be sent. The Key corresponds to the targeted partition, while the Value stores the data to be sent. ProducerBatch is a double-ended queue as shown below.
Data Handling
Before data is stored in the buffer, it must go through the processes of an interceptor, a serializer, and a partitioner:
There is an interceptor also here before Serializer, but according to research, its being used rarely in production.
The serializer is self-explanatory. We can specify how to serialize data by setting the key.serializer, value.serializer (above Usage example)
The partitioner determines the partition to which the data needs to be sent by parsing parameters and sends it to the corresponding ProducerBatch.
A workflow chart for the Main thread can be seen below:
Sender Thread Workflow
Now, the Sender thread pulls data from the ProducerBatch and then sends the data to the Kafka Broker via HTTP request. This brings up several questions:
When should the data be fetched?
How is successful message transmission confirmed?
How many requests can be made simultaneously?
1. When Data is pulled from Record Accumulator
In the producer parameters, there is a batch.size
item, which by default is set to 16KB. This configuration controls the size of the ProducerBatch double-ended queue. Once data accumulates to the configured size, the Sender thread pulls the data.
However, what if the data never reaches the configured size? It's not feasible to never fetch data, because, from the user's perspective, it would seem that consumers can't receive produced data, which is unreasonable. Therefore, another configuration, linger.ms comes into play. When the data doesn't reach batch.size
after a certain amount of time, if the Sender thread has waited more than the time set in linger.ms, it will also fetch the data. The default value for linger.ms
is 0ms, meaning data will be immediately pulled once it is available
How is successful message transmission confirmed?
In a production environment, message transmission often encounters difficulties, such as network fluctuations, Kafka Broker downtime, etc., all of which can potentially cause message persistence to fail. This leads us to the question: under what conditions will a Producer consider a message successfully sent? This is where the acks
parameter comes into play, which has three configurable values:
acks=0
: The producer will not wait for any confirmation from the server. The record will be immediately added to the buffer and considered as sent.acks=1
(default value): The Leader will write the record to its local log, but it will not wait for full confirmation from all replica servers before responding. In this case, if the Leader fails immediately after confirming the record, the record will be lost.acks=all
: This is equivalent toacks=-1
. The Leader will wait for the full set of synchronized replicas to confirm the record. This ensures that as long as at least one synchronized replica server is still alive, the record will not be lost. This is the most powerful guarantee.
Below are some important pieces related the acks param
if you expect a response, then acks > 0 in the callback function.
Showing a snippet of the massive code to maintain succinctness, if you directly want to sneakpeak into the code - here
If there's a fear of message transmission failure, configuring the parameter retries can activate a retry mechanism. If sending fails, the Sender thread will automatically retry
Check more here for retries.
How many requests can be made concurrently?
The Sender thread at the producer's end will cache a request queue. By default, a maximum of 5 requests can be cached for each Broker, and this can be changed by configuring the max.in.flight.requests.per.connection value.
Since Kafka 2.X? (don’t quote me on this), the Kafka server can cache the most recent five request metadata sent by the producer, so within five requests, data order can be ensured.
Flow chart for the Sender thread:
If you want to read more on producer parameters used, read here
Different ways to use KafkaProducer API
Now, that you know how Kafka producer client library accumulates and sends data.
Here are some ways to send messages through Kafka Producer. More here
Synchronously sending Kafka Producer Messages
Asynchronously sending Kafka producer messages with a callback function
Conclusion
Hope now, you can understand how Kafka’s Producer internally sends messages. It segregates the duties between these two threads to help achieve the promised low latency with high throughput moto. Feel free to like it or share it, Also wait out for the next articles where I talk how brokers and consumers. Adios.