Illustration of the message caching model for Kafka producers

before reading this article, i hope you can think about the following questions, and reading the article with questions will get better results.

  1. When sending a message, when the broker hangs up, can the message body still be written to the message cache?
  2. When the message is still stored in the cache, if the Producer client hangs up, is the message lost?
  3. What happens when the latest ProducerBatch still has spare memory, but the next message is too big to add to the previous Batch?
  4. So how much memory should be allocated when creating ProducerBatch?

Source Address: Schematic of the Kafka Producer message caching model

What is the message accumulator RecordAccumulator

In order to improve the send throughput and performance of the Producer client, kafka chose to temporarily cache the messages until certain conditions are met, and then send them in batches, which can reduce network requests and improve throughput.

And the one that caches this message is the RecordAccumulator class.

overall model diagram

the above figure is the cache model of the entire message storage, and we will explain it one by one.

message caching model

insert a description of the image here

the diagram above shows the message caching model, where the production messages are temporarily stored.

  1. For each message, we place them in different Deque<ProducerBatch > queues according to the TopicPartition dimension . TopicPartition is the same and will be in the same Deque < ProducerBatch >.
  2. ProducerBatch : Represents a message for the same batch, when the message is actually sent to the broker side, it is sent by batch, and the batch may contain one or more messages.
  3. If no ProducerBatch queue is found for the message, a queue is created.
  4. Find the Batch at the end of the ProducerBatch queue, find that Batch can also stuff this message, and then stuff the message directly into this Batch
  5. Find the Batch at the end of the ProducerBatch queue, find the remaining memory in the Batch, and do not stuff this message enough, and create a new Batch
  6. When the message is sent successfully, batch is released.

The memory size of ProducerBatch

So how much memory should be allocated when creating ProducerBatch?

first of all, the conclusion: when the message estimated memory is greater than match.size, it is created according to the message estimated memory, otherwise it is created according to the size of the match.size (default 16k).

Let’s look at a piece of code that estimates the size of the memory when creating ProducerBatch

RecordAccumulator#append

    /**  **/       // 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));       // 申请内存       buffer = free.allocate(size, maxTimeToBlock);

copy the code

  1. Assuming that a message is currently produced as M, and message M cannot find a ProducerBatch (non-existent or full) that can hold the message, then a new ProducerBatch needs to be created at this time
  2. estimated message size compared with the match.size default size of 16384 (16kb).

Source Address: Schematic of the Kafka Producer message caching model

so, how is the forecast for this news estimated? is it purely the size of the message body?

DefaultRecordBatch#estimateBatchSizeUpperBound

Estimating the size of the Batch required is an estimate because the compression algorithm is not taken into account from the additional overhead

    /**    * 使用给定的键和值获取只有一条记录的批次大小的上限。    * 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。    **/    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);    }

copy the code

  1. ESTIMATE THE SIZE OF THIS MESSAGE M + THE SIZE OF A RECORD_BATCH_OVERHEAD
  2. RECORD_BATCH_OVERHEAD is some basic meta information in a Batch, occupying a total of 61B
  3. The size of message M is not only the size of the message body, the total size = (key, value, headers) size + MAX_RECORD_OVERHEAD
  4. MAX_RECORD_OVERHEAD: The maximum footprint of a message header is 21B

In other words, to create a ProducerBatch, at least 83B .

For example, if I send a message “1”, the estimated size is 86B, which is the maximum value compared to match.size (default 16384). Then the maximum value of 16384 is taken when applying memory.

Regarding the structure of Batch and the structure of messages, we will go back to a separate article.

memory allocation

We all know that the cache size in RecordAccumulator is defined at the beginning, controlled by buffer.memory, the default 33554432 (32M)

Producer write blocking can occur when the production speed is greater than the sending speed.

And the frequent creation and release of ProducerBatch, which leads to frequent GCs, all kafka have the concept of a cache pool, which will be reused, but only the fixed (batch.size) size can use the cache pool.

PS: The following 16k refers to the default value of batch.size.

Creation and release of Batch

1. MEMORY THERE IS MEMORY AVAILABLE IN THE 16K CACHE POOL

(1). When creating a Batch, it will go to the cache pool and get a piece of memory from the team head to use by ByteBuffer.

(2). When the message is sent and the Batch is released, the ByteBuffer is placed in the queue of the cache pool and byteBuffer.clear is called to empty the data. so that it can be reused next time

insert a description of the image here

2. MEMORY THERE IS NO AVAILABLE MEMORY IN THE 16K CACHE POOL

(1). When creating a Batch, go to the memory in the non-cache pool and get a part of the memory to create the Batch. Note: The acquisition of memory to Batch here is actually to reduce the memory of the non-cache pool nonPooledAvailableMemory by 16K, and then batch is created normally, do not mistakenly think that the memory transfer has really occurred.

(2). When the message is sent and the Batch is released, the ByteBuffer will be placed in the queue of the cache pool, and byteBuffer.clear will be called to empty the data for reuse next time

insert a description of the image here

Source Address: Schematic of the Kafka Producer message caching model

3. MEMORY NON-16K NON-CACHE POOL MEMORY IS SUFFICIENT

(1). When creating a Batch, go to the nonPooledAvailableMemory memory and get a part of the memory to create the Batch. Note: The acquisition of memory to Batch here is actually to let the nonPooledAvailableMemory reduce the corresponding memory, and then batch is created normally, do not mistakenly think that the memory transfer really occurred.

(2). When the message is sent, batch is released, purely in the nonPooledAvailableMemory pool plus the batch memory size just freed. Of course, this Batch will be dropped by the GC

insert a description of the image here

4. MEMORY NON-16K NON-CACHE POOL MEMORY IS NOT ENOUGH

(1). First try to release the memory in the cache pool into the non-cache pool one by one until the memory in the non-cache pool is enough to create a Batch

(2). When creating a Batch, go to the nonPooledAvailableMemory memory and get a portion of the memory to create the Batch. Note: The acquisition of memory to Batch here is actually to let the nonPooledAvailableMemory reduce the corresponding memory, and then batch is created normally, do not mistakenly think that the memory transfer really occurred.

(3). When the message is sent, batch is freed, purely adding the batch memory size just freed to the nonPooledAvailableMemory. Of course, this Batch will be dropped by the GC

for example: below we need to create a 48k batch, because it exceeds 16k, so we need to allocate memory in the non-cache pool, but the current available memory in the non-cache pool is 0, and it cannot be allocated, at this time we will try to release a part of the memory into the non-cache pool in the cache pool.

If the first ByteBuffer (16k) is not enough, continue to release the second one until a total of 48k after releasing 3, find that the memory is enough at this time, and then create a Batch.

insert a description of the image here

note: here we are involved in the non-cache pool memory allocation, which simply refers to the increase and decrease of the memory number.

questions and answers

  1. When sending a message, when the broker hangs up, can the message body still be written to the message cache?

When the broker hangs up, the Producer prompts for the following warning ⚠️, but during the sending of the message

this message body can still be written to the message cache, and only to the cache.


WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available

copy the code

insert a description of the image here

  1. What happens when the latest ProducerBatch still has spare memory, but the next message is too big to add to the previous Batch?

A new ProducerBatch will be created.

  1. So how much memory should be allocated when creating ProducerBatch?

The message that triggers the creation of ProducerBatch is estimated to be larger than batch.size, and is created in estimated memory. Otherwise, it is created as batch.size.

there is one more question for everyone to think about:

When the message is still stored in the cache, if the Producer client hangs up, is the message lost?