Mainly introduce the Kafka producer configuration parameters, only take some of the commonly used, follow-up time, will also add some, more detailed parameters, you can refer to the “Kafka official website”, the content of the parameters, mainly select some explanations in the “Apache Kafka Practice” book and the official website refer to
bootstrap.servers
This parameter specifies a set of host:port pairs used to create a connection to the Kafka broker server, for example: kl:9092, k2:9092, k3:9092.
If you have a large number of machines in your Kafka cluster, you only need to specify some brokers instead of listing all the machines. Because no matter how many machines are specified, the producer will find the well through this parameter and find all brokers in the cluster; Specifying multiple machines for this parameter is for failover only. In this way, even if a broker is down, the producer can still connect to the Kafka cluster through other brokers specified in this parameter after restarting.
Also, if the broker side does not explicitly configure listeners to use IP addresses, it is better to configure this parameter as a hostname instead of an IP address. Because Kafka uses FQDN (Fully Qualified Domain Name) internally.
Any message sent to the broker by
key.serializer must be in the format of a universe array, so the individual components of the message must be serialized before they can be sent to the broker.
This parameter is used to serialize the key of the message. This parameter specifies the fully qualified name of the class that implements the org.apache.kafka.common.serialization.Serializer interface.
Kafka provides ready-made serializers by default for most primitive types. The producer program does not specify a key when sending a message, and this parameter must also be set; Otherwise, the program will throw a ConfigException exception, indicating that the “key.serializer” parameter has no default value and must be configured.
value.serialize
is similar to key.serializer, except that it is used to serialize the message body (i.e., message value) part and convert the message value part into a domain array.
Value.serializer and key. serializer can set the same value or different values. As long as the consumer side and consumption data are consistent;
It is important to note that both parameters must be fully qualified class names, org.apache.kafka.common.serialization.Serializer
acks
The acks parameter is used to control the durability of the producer production message; For producers, Kafka cares about the persistence of “committed” messages. Once a message is successfully committed, the message is considered “not lost” as long as any copy of the message is “alive”.
It is common to come across complaints that Kafka’s producers lose messages, but this confuses the concept that those supposedly “lost” messages are not actually successfully written to Kafka. In other words, they are not successfully committed, so Kafka does not guarantee the durability of these messages;
Of course, the producer API does provide a callback mechanism for the user to handle send failures. Specifically, when the producer sends a message to the Kafka cluster, the message is sent to the broker where the leader of the specified topic partition is located, and the producer waits for the return of the message from the leader broker to write the result (of course, not an infinite wait, there is a timeout) to determine that the message was successfully submitted. Once this is all done, the producer can continue to send new messages.
What Kafka can guarantee is that the consumer will never read a message that has not yet been committed; Obviously, when the leader broker sends the write result back to the producer needs to be carefully considered, which will directly affect the persistence of the message and even the throughput of the producer side (the faster the producer receives the leader broker response, the next message can be sent); The acks parameter on the producer side is used to control this thing.
ACKS specifies the number of replicas that the leader broker must ensure that it has successfully written the message before sending a response to the producer. Currently, ACKS has 3 values: 0, 1, and all.
acks = 0: Set to 0 to indicate that the producer completely ignores the processing results of the leader broker. At this point, the producer immediately starts sending the next message after sending the message, and does not wait for the leader broker to return the result at all.
Since the sending result is not received, the callback
of producer.send is completely useless in this case, that is, the user cannot sense any failure in the sending process through the callback mechanism, so acks=O producer does not guarantee that the message will be sent successfully.
But there are pros and cons to everything, and since there is no need to wait for the response result, the throughput of the producer is usually the highest in this setting.
acks = all or -1: Indicates that when a message is sent, the leader broker not only writes the message to the local log, but also waits for all other replicas in the ISR to successfully write to their respective local logs before sending the response to producer.
Obviously, when acks=all is set, as long as at least one copy of the ISR is in the “live” state, then the message will definitely not be lost, so the highest message persistence can be achieved, but usually the throughput of the producer is also the lowest in this setting.
acks = 1: is a compromise between 0 and all, and is the default parameter value.
After the producer sends the message, the
leader broker only writes the message to the local log and then sends the response result to the producer without waiting for other replicas in the ISR to write the message. So as long as the leader broker is alive, Kafka can ensure that the message is not lost. This is actually a compromise between achieving proper message persistence while guaranteeing throughput on the producer side.
To summarize: acks parameter controls the producer to achieve different degrees of message persistence, it has three values, corresponding advantages and disadvantages to use scenarios as shown in the table.

The buffer.memory
parameter specifies the buffer size in bytes used by the producer side to cache messages, and the default value is 33554432, which is 32MB.
As mentioned earlier, due to the asynchronous message sending design architecture, the Java version producer starts by first creating a memory buffer to hold the message to be sent, and then another dedicated thread is responsible for reading the message from the buffer to perform the actual send. The size of this memory space is specified by the buffer.memory parameter.
If the producer writes messages to the buffer faster than the dedicated I/0 thread can send messages, it will inevitably cause the buffer space to continue to increase. At this time, the producer will stop the work at hand and wait for the I/0 thread to catch up, if the I/0 thread still cannot catch up with the progress of the producer after a period of time, it will throw an exception; If the producer program wants to send messages to many partitions, then this parameter needs to be set carefully to prevent too small memory buffers from reducing the overall throughput of the producer program.
compression.type
sets whether the producer side compresses messages, the default value is none, that is, the message is not compressed.
The introduction of compression on the producer side of Kafka can significantly reduce the network I/O transfer overhead and improve the overall throughput, but it will also increase the CPU overhead of the producer machine. In addition, if the compression parameters on the broker side are set differently from producer, the broker side will additionally use CPU resources to perform the corresponding decompression-recompression operation on the message when writing the message.
Kafka currently supports 3 compression algorithms: GZIP, Snappy, and LZ4. According to actual experience, the performance of producer combined with LZ4 is the best; LZ4 > Snappy > GZIP;
The retries
Kafka broker may fail to send messages due to transient failures (such as transient leader elections or network jitter) when processing write requests. This kind of failure is usually self-recovering, and if these errors are wrapped in an exception in the callback function and returned to the producer, the producer program does not have much to do but simply try to send the message again in the callback function. Instead of this, it is better to automatically implement retries within the producer. Therefore, the Java version producer automatically implements retries internally, provided that the retries parameter is set.
This parameter represents the number of retries to make, and the default value is 0, which means no retries occur.
In actual use, setting retries can cope well with those transient errors, so it is recommended that users set this parameter to a value greater than 0.
However, when considering the settings of retries, there are two points to note.
1. Retry may cause repeated sending of messages;
For example, due to transient network jitter, the broker has successfully written a message but has not successfully sent a response to the producer, so the producer will consider the message to be sent and start the retry mechanism. To counter this risk, Kafka requires users to perform deduplication on the consumer side. Happily, the community has started supporting “exact-once” processing semantics in version 0.11.0.0, which avoids similar problems by design.
2. Retry may cause the message to be out of order;
Currently, producer caches multiple message sending requests (5 by default) in memory; If for some reason a retry of message sending occurs, it can cause the message flow to be out of order. To avoid out-of-order occurrences, the Java version producer provides the max.in.flight.requets.per.connection parameter. Once the user sets this parameter to 1, producer will ensure that only one request can be sent at a time.
In addition, producer pauses between retries to prevent frequent retries from impacting the system. This time is configurable, specified by the parameter retry.backff.ms, and the default is 100 milliseconds. Since the leader “general election” is the most common transient error, it is recommended that users calculate the average leader election time through testing and set the values of retries and retry.backff.ms based on this time.
batch.size batch.size
is one of the most important parameters of producer! It plays an important role in tuning producer throughput and latency performance metrics.
The producer encapsulates multiple messages destined for the same partition into a batch, and when the batch is full, the producer sends all the messages in the batch. However, the producer does not always wait for the batch to be full before sending messages, and it is likely that the producer will send the batch when the batch has a lot of free space. Obviously, the size of the batch is very important.
Generally speaking, a small batch contains a small number of messages, so the
number of messages that can be written to a send request is also very small, so the throughput of the producer will be very low; A batch is very large, which puts a lot of pressure on memory usage, because the producer allocates a fixed amount of memory for the batch regardless of whether it can be filled or not.
Therefore, the setting of the batch.size parameter is actually a manifestation of the trade-off between time and space. The default value of the batch.size parameter is 16384, which is 16KB. This is actually a very conservative number. If you increase the value of this parameter reasonably during actual use, you will usually find that the throughput of the producer has increased accordingly.
linger.ms
parameter controls the delay behavior of message sending. The default value of this parameter is 0, which means that the message needs to be sent immediately, and there is no need to care whether the batch is full.
Most of the time this is reasonable; After all, we always want messages to be sent as quickly as possible; However, this will reduce produc throughput, after all, the more messages included in each request sent by produce, the more produce can dilute the overhead of sending requests to more messages to improve throughput; If you want to set, use it with the above parameter batch.size; A trade-off consideration for the sending of messages;
The explanation given on the official website of max.request.size is that this parameter is used to control the
size of the request sent by the producer.
In fact, this parameter controls the maximum message size that the producer side can send.
Because requests have some header data structures, requests that contain a message are larger than the message itself. However, it is safe to consider it as the maximum size requested. If the producer wants to send a very large message, then this parameter is to be set. The default 1048576 bytes (1MB)
request.timeout.ms
when the producer
sends a request to the broker, the broker needs to return the processing result to the producer within the specified time frame. The default is 30 seconds.
If the broker does not send a response to the producer within 30 seconds, the request is considered to have timed out, and the timeoutException is explicitly thrown in the callback function. The default 30 seconds is sufficient for normal cases, but if the load sent by the producer is large, the timeout situation is easy to encounter, and the parameter value should be adjusted appropriately.
Author: “
Badger Under the Sunflower”
Original link: https://blog.csdn.net/qq_28410283/article/details/88570141