1. decoupling
  2. asynchronous processing For example, e-commerce platforms, flash sale activities. The general process will be divided into: 1: risk control, 2: inventory locking, 3: order generation, 4: SMS notifications 、5: Update the data
  3. to

  4. split the flash sale activity business through the message system, and put the business that does not need to be processed urgently behind to process slowly; The process was changed to: 1: Risk control, 2: Inventory locking, 3: Messaging system, 4: Generate orders, 5: SMS notifications, 6: Update data
  5. Flow control 1. After the gateway receives the request, it puts the request into the message queue 2. The service on the backend obtains the request from the message queue and completes the subsequent second-kill processing process. The result is then returned to the user. Advantages: Controlled traffic Disadvantages: It will slow

down the

process 😛 roducer to generate data consumers to the Kafka cluster: Consumer to Kafka to get data, process data, consume data Kafka’s data is pulled by consumers to the data topics in Kafka: topic partitions:p artition By default, a topic has a partition, and you can set multiple partitions (partitions are distributed and stored on different nodes of the server)

3. Kafka’s cluster architecture

In a Kafka cluster, a kafka server is a broker Topic is just a logical concept, partition is reflected on disk as a directory Consumer Group: When consuming data, you must specify a group id. Specifying a group id assumes that program A and program B specify the same group id number, so both programs belong to the same consumer group For example, if there is a topic A program A to consume this topic A, then program B can no longer consume topic A (program A and program B belong to a consumer group) For example, program A has consumed the data in topic A, and now it is still possible to re-consume the data of topic A, but after re-specifying a group ID number, it can be consumed. There was no effect between different consumer groups. Consumer groups need to be customized, and the consumer name program is automatically generated (unique). Controller: A master node inside a Kafka node. With zookeeper

4, Kafka disk sequential write to

ensure data writing performance Kafka writes data

: sequential write, when writing data to disk, it is to append data, no random write operation. Experience: If a server disk reaches a certain number of disks, the disk also reaches a certain number of revolutions, the speed of sequential writing (appending write) data to the disk is similar to the speed of writing memory, and the production message is first written to the OS cache memory through the kafka service, and then written to the disk through sync order

5. Kafka zero-copy mechanism ensures high performance of reading data

The consumer reads the data process:

  1. the consumer sends a request to the Kafka service
  2. Kafka service to go to OS Cache Cache Read Data (Cache Reads Data Without Disk)
  3. reads data from disk to OS Cache in OS Cache
  4. to copy data into Kafka applications
  5. Kafka sends data (copy)
  6. to the socket

  7. cache The socket cache is transmitted to the consumer via the network card

kafka Linux SendFile technology — Zero copy

1. The consumer sends a request to the Kafka service 2. The Kafka service goes to the OS cache to read the data (if the cache does not go to the disk to read the data) 3. The data is read from the disk to the OS Cache cache 4. The OS Cache sends the data directly to the network card

5. The data is transmitted to the consumer through the network card

6、 Kafka logs are stored in segments to save

a topic in Kafka, and partitions are generally set; For example, you create a topic_a and then specify that the topic has three partitions. In fact, on three servers, three directories are created. Server 1 (kafka1) creates the directory topic_a-0:. Below the directory is our file (storage data), the kafka data is message, and the data is stored in the log file. Log files end with log files, and data files are called log files in kafka. By default, there are n multiple log files (segmented storage) under a partition, and one log file defaults to 1G. Server 2 (kafka2): Create directory topic_a-1: Server 3 (kafka3): Create directory topic_a-2:

7, Kafka binary to find location data

         Each message in Kafka has its own offset (relative offset), which exists on the physical disk, in position Position: physical position (where on the disk) that is to say, a message has two positions: offset: relative offset (relative position) position: disk physical position sparse index:          Kafka uses a sparse index to read the index, and whenever Kafka writes a 4k size log (.log), it writes a record index to the index. A binary lookup is used

8, high concurrency network design

(first understand NIO) The network

design part is the

best part of the design in kafka, which is also the reason to ensure high concurrency and high performance of Kafka

Reactor Network Design Pattern 1: Reactor Network Design Pattern 2: Reactor Network Design Pattern 3: Kafka Ultra-High Concurrency Network Design:

9、 Kafka redundant replicas ensure high availability


are replicas in Kafka, Note: Before 0.8, there was no replica mechanism 。 When you create a topic, you can specify partitions or the number of replicas. Replicas have roles: leader partition: 1, write data, read data operations are operated from the leader partition. 2. An ISR (in-sync-replica) list will be maintained, but the values in the ISR list will be deleted according to certain rules The producer sends a message, and the message must first be written to the leader partition After writing, the message must be written to other partitions in the ISR list, and the message is submitted only after the writing is completed. follower partition: Synchronize data from the leader partition.

10. Excellent Architecture Thinking – Summary

Kafka – High Concurrency, High Availability

, High Performance High Availability: Multi-copy Mechanism High Concurrency: Network Architecture Design Three-tier architecture: multi-selector -> multi-threaded -> Design of queues (NIO) High performance: write data:

  1. Writing data to OS Cache
  2. first to disk is sequential write, and the performance


reading data is very high:

    Zero-copy mechanism to

  1. quickly locate the data to be consumed based on sparse indexes
  2. Reduce the copy of data Reduced application and operating system context switching

11. Kafka production environment construction

11.1 Requirements scenario analysis

E-commerce platforms need to send 1 billion requests to the Kafka cluster every day. Anyway, the general assessment is not a big problem. 1 billion requests -> 24 coming, under normal circumstances, from 12:00 to 8:00 in the morning every day is actually not a lot of data. 80% of requests are processed in an additional 16 hours. 16 hours to process -> 800 million requests. 16 * 0.2 = 3 hours 80% of the data processed for 800 million requests, which means that 600 million data

is processed in 3 hours. Let’s simply calculate the peak QPS of 600 million/3 hours = 55,000/s QPS = 55,010

billion requests * 50kb = 46T Need to store 46T of data per dayIn

general, We will set two copies of 46T * 2 = 92T The data in Kafka is retained for the last 3 days. 92T * 3 days = 276T I’m talking about 50kb here either one message or 50kb is not (merge the logs, multiple logs are merged together), usually, a message is only a few b, or it may be hundreds of bytes.

11.2 Evaluation of the number of

physical machines

1) First analyze whether we need virtual machines or physical machines When a cluster like Kafka mysql hadoop is built, we use physical machines in production. 2) The total number of requests that need to be processed during peak periods is 55,000 requests per second, in fact, one or two physical machines can definitely resist. Under normal circumstances, when we evaluate the machine, we evaluate it according to 4 times the peak period. If it is 4 times, the capacity of our cluster will be prepared to 200,000 qps. Such a cluster is a safer cluster. About 5 physical machines are needed. Each receives 40,000 requests.

Scenario summary: 5 physical machines are required to get 1 billion requests, 55,000 qps at the peak, and 276 T of data.

11.3 Disk selection

to handle 1 billion requests, peak 55,000 qps, 276 T of data, need 5 physical machines. 1) SSD SSD, or need ordinary mechanical hard disk SSD hard disk: performance is better, but the price is expensive SAS disk: some aspects of performance is not very good, but relatively cheap. SSD hard disk performance is better, which means that its random read and write performance is better. Suitable for clusters like MySQL. But in fact, his sequential writing performance is similar to that of SAS disks. Kafka’s understanding: It is written in the order in which it is written. So we just use ordinary [mechanical hard disk] on it.

2) We need to evaluate how many disks each server needs 5 servers, a total of 276T, about 60T of data needs to be stored per server. The server configuration in our company uses 11 hard disks, each with 7T. 11 * 7T = 77T

77T * 5 servers = 385T.

Scenario summary:

To solve 1 billion requests, you need 5 physical machines, 11(SAS) * 7T 11.4

memory evaluation

To get 1 billion requests, you need 5 physical machines, 11 (SAS) * 7T

We found that the process of reading and writing data to kafka is based on OS cache, in other words, assuming that our os cashe is infinite, then the entire kafka is equivalent to operating based on memory, if it is based on memory to operate, the performance must be very good. Memory is limited. 1) Give as much memory resources as possible to the OS cache 2) Kafka’s code is written in Scala The core code is written in scala, and the client code is written in Java. All are based on JVM. So we also have to give a part of the memory to the JVM. Kafka’s design does not put many data structures in the JVM. So this JVM of ours doesn’t need much memory. As a rule of thumb, give a 10G on it.

NameNode: JVM also puts metadata (tens of gigabytes), JVM must give a large amount. For example, give a 100G.

Let’s say we have a project of 10 requests, there will be a total of 100 topics. 100 topic * 5 partition * 2 = 1000 partition A partition is actually a directory on the physical machine, and there will be many .log files under this directory. Log is to store data files, and by default, the size of a .log file is 1G. If we want to ensure that the latest .log file data of 1000 partitions is in memory, the performance is the best at this time. 1000*1G = 1000G RAM. We only need to put the latest log to ensure that 25% of the latest data in it is in memory. 250M * 1000 = 0.25 G * 1000 = 250G memory.

250 memory / 5 = 50G memory 50G+10G = 60G


64G memory, another 4G, the operating system Bunsen also needs memory. In fact, Kafka’s jvm can also not give 10G so much. The evaluation came out that 64G is OK. Of course, if you can give a server with 128G of memory, it is the best.

When I just evaluated, I used a topic with 5 partitions, but if it is a topic with a large amount of data, there may be 10 partitions.

Summary: To get 1 billion requests, you need 5 physical machines, 11 (SAS) * 7T, 64G of memory is required (128G is better) 11.5


stress assessment

to assess how many CPU cores are needed per server ( Resources are limited)

We evaluate how many CPUs are needed based on how many threads are running in our service. Threads run on the CPU. If we have more threads but fewer CPU cores, then our machine load will be high and the performance will not be good.

Evaluate how many threads a Kafka server will have when it starts?

Acceptor thread 1 processor thread 3 6~9 threads Processing request thread 8 32 threads Regularly cleaned threads, pulling data threads, regular checking ISR list mechanism and so on. So about a Kafka service starts up with more than a hundred threads.


core = 4, once and for all, dozens of threads, it will definitely fill up the CPU. CPU core = 8, which should easily support dozens of threads. If we have more than 100 threads, or almost 200, then 8 CPU cores will not work. So we suggest here: CPU cores = 16. If you can, it’s best to have 32 CPU cores.

Conclusion: Kafka clusters, the minimum is also given 16 CPU core, if you can give 32 CPU core, it is better. 2

CPU * 8 = 16 CPU Core 4 CPU * 8 = 32 CPU core

Summary: To get 1 billion requests, you need 5 physical machines, 11 (SAS) * 7T, 64G of memory (128G is better), and 16 CPU cores (32 are better).


What kind of network card do we need for the Network Needs Assessment Assessment? Generally, it is either a gigabit network card (1G/s), or a 10,000 Gigabit network card (10G/s).

At the peak, there will be an influx of 55,000 requests per second, and 5.5/5 = about 10,000 requests per server. As we said earlier, 10000 * 50kb = 488M is that each server receives 488M of data per second. Data also needs to have replicas, and synchronization between replicas is also a request to go to the network. 488 * 2 = 976m/s to explain: Many companies' data, a request is not as large as 50KB, our company because the host encapsulates the data at the production end and then merges multiple pieces of data together, so our request will be so large.   Explanation: Under normal circumstances, the bandwidth of the network card is not up to the limit, if it is a gigabit network card, we can generally use about 700M.   But in the best case, we still use a 10 Gigabit network card.   If you use 10 Gigabit, it's easy.

11.7 Cluster planning

request volume Plan the number of physical machines Analyze the number of disks and choose what kind of disk to use Memory CPU core network card is to tell you that if there is any demand in the company in the future, evaluate resources, evaluate servers, and evaluate according to my ideas

The size of a message is 50kb -> 1kb 500byte 1Mip hostname hadoop1 hadoop2 hadoop3

Host planning: Kafka cluster architecture: master-slave architecture: controller -> Manage the metadata of the entire cluster through the zk cluster.

  1. zookeeper cluster hadoop1 hadoop2 hadoop3
  2. Kafka Clustering Theoretically, we shouldn’t install Kafka’s ZK-serving services together. But we have limited servers here. So our kafka cluster is also installed in hadoop1 haadoop2 hadoop3 12, kafka



12.1 Common O&M Tools Introduction

KafkaManager – page management tool

12.2 Common O&M command

scenario 1: The amount of topic data is too large, and the

number of topics is

to be increased at the beginning of the topic creation, the amount of data is not large, and the number of partitions given is not large.

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6kafka-topics.sh -- alter --zookeeper hadoop1:2181,hadoop2:2181,ha

broker id:

hadoop1:0 Hadoop2:1 Hadoop3:2 Suppose a partition has three copies: partition0:a,b,c a:

leader partition b,c:

follower partition ISR:{a,b,c

} If a follower partition does not pull data to the leader partition for more than 10 seconds, the partition is removed from the ISR list.

Scenario 2: Increase

the replica factor of the core topic

If you need to increase the replica factor vim test.json script for core business data, save the following line of JSON script {

version": 1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”: 1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1, 2]}]}

execute the json script above:

 kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

scenario three: For unbalanced topics, manually migrate vi topics-to-move.json

{“topics”: [{“topic”: “test01”}, {“topic”: “

test02"}], "version": 1}  Write all your topics here
 kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate

Write all your new broker

machines here, which will say that all partitions are evenly distributed across each broker, including the new broker will generate a migration plan at this time, which can be saved to a file: expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json -- executekafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

This kind of data migration operation must be done at night during the low peak hours, because it will migrate data between machines, which is very bandwidth-intensive –generate: Generates a migration plan based on the given list of topics and brokers. Generate does not actually perform message migration, but calculates the message migration plan for use by the execute command. –execute: The migration is carried out according to the given message migration plan. –verify: Check if the message has been migrated.

Scenario 4: If a broker

leader partition is

too many normal cases, our leader partition is load balanced between servers. hadoop1 4 hadoop2 1 hadoop3 1

Now each business party can apply to create a topic by itself, and the number of partitions is automatically allocated and subsequently dynamically adjusted. Kafka itself will automatically distribute the leader partition evenly across each machine, which can ensure that the read and write throughput of each machine is uniform But there are exceptions, that is, if some brokers are down, This will cause the leader partition to be too concentrated on a small number of other brokers, which will cause the read and write request pressure of a few brokers to be too high, and the other offline brokers are folloer partitions after restarting, and the read and write requests are very low. There is a parameter that causes the cluster load imbalance, auto.leader.rebalance.enable, which is true by default, Check if the leader load is balanced every 300 seconds (leader.imbalance.check.interval.seconds) If the unbalanced leader on a broker exceeds 10%, leader.imbalance.per.broker.percentage, the broker will be elected Configuration parameter: auto.leader.rebalance.enable The default is true leader.imbalance.per.broker.percentage: The ratio of unbalanced leaders allowed per broker. If each broker exceeds this value, the controller triggers the balancing of the leader. This value represents a percentage. 10% leader.imbalance.check.interval.seconds: default value 300 seconds


, Kafka producer 13.1

Producer sending message principle

13.2 The principle of producer sending messages—a basic case demonstration

13.3 How to improve throughputHow to improve

throughput: Parameter

1: buffer.memory: set the buffer for sending messages, the default value is 33554432, that is, 32MB Parameter 2: compression.type : The default is none, not compressed, but you can also use lz4 compression, the efficiency is still good, after compression can reduce the amount of data, improve throughput, but will increase the CPU overhead on the producer side Parameter 3: batch.size : Set the size of the batch, if the batch is too small, it will lead to frequent network requests and reduced throughput; If the batch is too large, it will cause a message to wait for a long time to be sent, and it will put a lot of pressure on the memory buffer, too much data is buffered in memory, the default value is: 16384, that is, 16kb, that is, a batch full 16kb is sent, generally in the actual production environment, the value of this batch can be increased to improve throughput, if a batch is set large, there will be a delay. It is generally set according to the size of the message. If we have less news. With the parameter linger.ms, this value is 0 by default, which means that the message must be sent immediately, but this is not right, generally set a 100 milliseconds and the like, so that is, the message is sent out into a batch, if within 100 milliseconds, the batch is full of 16kb, naturally will be sent.

13.4 How to handle exceptions

    > LeaderNotAvailableException: This is that if a machine is hung up, the leader copy is unavailable at this time, which will cause you to fail to write, and you have to wait for other follower copies to switch to the leader copy before you can continue writing, at this time, you can retry the send; If you restart the broker process of kafka, it will definitely cause the leader switch, and it will definitely cause you to write an error, which is the LeaderNotAvailableException.
  1. NotControllerException: This is the same reason, if the broker where the controller is located, then there will be a problem at this time, you need to wait for the Controller to re-elect, and the same is to try again.
  2. NetworkException: Network exception timeout a. Configure the retries parameter, he will automatically retry b. But if it does not work after several retries, we will provide an Exception for us to deal with, and after we get the exception, we will process the message separately. We will have a backup link. Send unsuccessful messages to Redis or write to the file system, or even discard.

13.5 Retry mechanism

Retries introduce some problems:

  1. messages are duplicated Sometimes some problems such as leader switching, need to retry, set retries can be, but message retry will lead to repeated sending problems, such as network jitter caused him to think that it was not successful, so he retried, in fact, people are successful
  2. Out-of-order message retries can lead to out-of-order messages because messages that may come after you are sent. So you can set the “max.in.flight.requests.per.connection” parameter to 1, which guarantees that the producer can only send one message at a time. The default interval between two retries is 100 milliseconds, which is set with “retry.backoff.ms” Basically, in the development process, 95% of the exception problems can be solved by the retry mechanism.

13.6 ACK Parameters in Detail

The producer side sets request.required.acks=0; as long as the request has been sent, even if it is sent, it does not care whether it is written successfully. The performance is very good, if it is analyzed by some logs, it can withstand the situation of losing data, with this parameter, the performance will be very good. request.required.acks=1; sends a message that is considered successful when the leader partition is successfully written. However, this method also has the possibility of losing data. request.required.acks=-1; this message can only be written after all copies have been written in the ISR list. ISR: 1 copy. 1 leader partition 1 follower partition kafka server: min.insync.replicas:1, if we don’t set it, the default value is 1 A leader partition will maintain an ISR list, this value is to limit the ISR list There must be at least a few copies, such as 2, when there is only one copy in the ISR list. When inserting data into this partition, an error will be reported. Design a scheme without data loss: Scheme without data loss: 1) partition replica >=2 2) acks = -1 3) min.insync.replicas >=2 It is also possible that there is an exception sent: handle the

exception13.7 Custom


partition: 1. No key is set Our messages will be rotated to different partitions. 2. Setthe partitioner that comes with Key Kafka, and a hash value will be calculated according to the key, and this hash value will correspond to a certain partition. If the key is the same, then the hash value must be the same, and the same key value must be sent to the same partition. But in some special cases, we need to customize the partition

public class HotDataPartitioner implements Partitioner  {
private Random random;
public void configure(Map configs) {
random = new Random(); }


public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String key = (String)keyObj; List partitionInfoList = cluster.availablePartitionsForTopic(topic);

Get the number of partitions 0,1,2

int partitionCount = partitionInfoList.size();
Last partition
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition; }}How

to use: Configure this class: props.put(“partitioner.class”, “com.zhss.HotDataPartitioner”);

13.8 Comprehensive Case Demonstration

14.1 Consumer Group Concept The same groupID belongs to the same consumer group 1) Each consumer must belong to a consumer.group, that is, a consumer group, and a partition of the topic will only be assigned to a consumer under a consumer group for processing. Each consumer may be assigned multiple partitions, or a consumer may not be assigned to any partitions 2) If you want to achieve a broadcast effect, you only need to use different group IDs to consume. topicA: partition0、partition1 groupA:consumer1:consumption partition0 consuemr2:consumption partition1 consuemr3: no data consumed groupB: consuemr3: consumption to partition0 and partition1 3) If a consumer in the consumer group hangs up, At this point, the partitions assigned to him are automatically handed over to other consumers, and if he restarts, some partitions are returned to

him14, Kafka consumers

14.1 consumer group concept

The same groupid belongs to the same consumer group 1) Each consumer must belong to a consumer.group, that is, a consumer group, a partition of the topic will only be assigned to a consumer under a consumer group for processing, each consumer may be assigned multiple partitions, or a consumer may not be assigned to any partition 2) If you want to achieve a broadcast effect, That only needs to use different group IDs to consume. topicA: partition0、partition1 groupA:consumer1:consumption partition0 consuemr2:consumption partition1 consuemr3: no data consumed groupB: consuemr3: consumption to partition0 and partition1 3) If a consumer in the consumer group hangs up, At this point, the partitions assigned to him are automatically handed over to other consumers, and if he restarts, some partitions are returned to him

14.2 Basic Case Demonstration

14.3 Offset management


  1. consumer memory data structure to save the consumption of each partition of each topic offset, will commit offset regularly, the old version is to write zk, but such high concurrency requests zk is unreasonable architecture design, zk is to do the coordination of distributed systems, Lightweight metadata storage cannot be responsible for high concurrent reads and writes as data storage.
  2. Now the new version commits the offset sent to the Kafka internal topic: __consumer_offsets, when committing in the past, the key is group.id + topic + partition number, value is the value of the current offset, every once in a while, Kafka internally compacts the topic, which means that each group.id+topic+partition number keeps the latest data.
  3. __consumer_offsets may receive highly concurrent requests, so the default partition is 50 (leader partitiron -> 50 kafka), so that if your kafka deploys a large cluster, such as 50 machines, you can use 50 machines to resist the pressure of offset requests. Consumer -> Data on the broker side Message -> Disk -> offset sequential increments Where to start consuming? -> offset consumer (offset)

14.4 Introduction to the offset monitoring tool

    > A web page management software (kafka manager) Modify the bin/kafka-run-class.sh script, increase the first line JMX_PORT=9988 to restart the kafka process
  1. Another software: mainly monitors the offset of the consumer. is a jar package java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka \ (according to the version: the offset exists kafka, fill in kafka, Fill in zookeeper if there is zookeeper) –zk hadoop1:2181 –port 9004 –refresh 15.seconds –retain 2.days.

14.5 Abnormal Perception of Consumption

heartbeat.interval.ms: consumer heartbeat interval, you must maintain a heartbeat with the coordinator to know whether the consumer is faulty, and then if the fault occurs, it will send a rebalance command to other consumers through the heartbeat to notify them to rebalance the operation session.timeout.ms: How long does kafka not sense a consumer to think that he is faulty, the default is 10 seconds max.poll.interval.ms: If between two poll operations, this time is exceeded, then it will be considered that this consume processing power is too weak, will be kicked out of the consumer group, partition assigned to others to consume, generally speaking, combined with the performance of business processing to set it.

14.6 Core parameter explanation

fetch.max.bytes: Get the maximum number of bytes of a message, it is generally recommended to set it larger, the default is 1M In fact, we have seen this similar parameter in many places before, which means how big can a message be?

    the data sent by the Producer, the maximum size of a message, -> 10M Broker

  1. stores data, and the maximum size of a message ->
  2. 10M
  3. Consumer max.poll.records: The maximum number of messages returned at a poll is 500 by default connection.max.idle.ms: If the socket connection between consumer and broker is idle for more than a certain time, the connection will be automatically recycled at this time, but the socket connection should be re-established next time it is consumed, this recommendation is set to -1, do not recycle enable.auto.commit: Turn on autocommit offsets auto.commit.interval.ms: How often to commit offset, default value 5000 milliseconds _consumer_offset auto.offset.reset: earliest When there is a committed offset under each partition, it is consumed from the committed offset; When there is no committed offset, consume topica -> partition0:1000 partitino1:2000 latest When there is a committed offset under each partition, the consumption starts from the committed offset; When there is no committed offset, consume the newly generated data under the partition none topic, and if there is a committed offset in each partition, the consumption starts after the offset. As long as there is no committed offset in one partition, exception

14.7 is thrown Comprehensive case demonstration

introduction case: second-hand e-commerce platform (happy sending), according to the amount of user consumption, the user stars are accumulated. Order System (Producer) -> A message was sent in the Kafka cluster. Membership System (Consumer) – > Messages are consumed and processed in a Kafak cluster.

14.8 Group Coordinator

Principle Interview Question: How Do Consumers Achieve Rebalance? — According to the coordinator implementation

    what is a coordinator Each consumer group

  1. will choose a broker as its coordinator, who is responsible for monitoring the heartbeat of each consumer in this consumer group. and determine whether it is down, and then turn on rebalance

  2. How to choose a coordinator machine First hash the groupId (number), then modulo the number of partitions in the __consumer_offsets, the default is 50, _consumer_ The number of offsets partitions can be set by offsets.topic.num.partitions, and after finding the partition, the broker machine where the partition is located is the coordinator machine. For example: groupId, “myconsumer_group” -> hash value (number) -> modulo to 50 -> 8 __consumer_offsets Which broker is the 8th partition of this theme, that is the coordinator to know which partition all consumers under this consumer group submit offsets to

  3. Run the process 1) Each consumer sends a JoinGroup request to the Coordinator, 2) The Editor then selects a consumer from a consumer group as the leader, and 3) sends the consumer group to the leader , 4) The leader will then be responsible for formulating the consumption plan, 5) Send it to the Coordinator through the SyncGroup 6) Then the Coordinator will send the consumption plan to each consumer, and they will start socket connection and consume messages from the leader broker of the specified partition

14.9 The

consumer group relies on the coordinator to implement Rebalance

There are three rebalance strategies: range, round-robin, and sticky

For example, we consume a topic with 12 partitions: p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11 Suppose we have three consumers in our consumer group

    > Range strategy The range strategy is according to the sequential number range of partiton p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 is this policy by default;

  1. The round-robin strategy is to poll the distribution of consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11 But the previous two schemes have a problem: 12 -> 2 Each consumer will consume 6 partitions

Suppose consuemr1 hangs: p0-5 is assigned to consumer2, and p6-11 is assigned to consumer3, in which case, the p6 and p7 partitions that were originally on consumer2 are assigned to consumer3.

  1. sticky strategy The latest sticky strategy is to ensure that when rebalanced, the partitions that originally belonged to the consumer still belong to them, and then the excess partitions are evenly distributed to maintain the original partition allocation strategy as much as possible

consumer1:0-3 consumer2: 4-7 consumer3:

8-11 Suppose consumer3 hangs consumer1:0-3,+8,9 consumer2: 4-7,+10,11 15

、Broker management

15.1 Leo, hw meaning

  1. Kafka’s core principle
  2. of how to evaluate a cluster resource
  3. Set up a set of kafka cluster -》 introduces some simple operation and maintenance management operations.
  4. Producer (use, core parameters)

  5. Consumers (principles, uses, core parameters
  6. )
  7. broker some internal principles

The core concept: LEO, HW LEO: is related to the offset offset.

LEO: In kafka, both leader partitions and follower partitions are called replicas.

Every time the partition receives a message, it updates its LEO, which is the log end offset, which is actually the latest offset + 1

HW: High water level LEO has a very important function is to update HW, if the follower and the leader’s LEO are synchronized, then HW can update the data before HW is visible to consumers, and the message belongs to the commit status. After HW, consumers do not consume.

15.2 Leo Update

15.3 HW update

15.4 How the controller manages the entire cluster

1: Competing controller’s /controller/id 2: The directory where the controller service listens: /broker/ids/ is used to sense the broker online and offline /broker/topics/ to create a topic, we created the topic command at that time, provided parameters, ZK address. /admin/reassign_partitions partition redistribution ….

15.5 Delayed Tasks

Kafka’s Delayed Scheduling Mechanism (Extended Knowledge) Let’s first look at where in Kafka there are tasks that need to be delayed scheduling. The first type of delayed task: for example, the acks=-1 of the producer, must wait for both the leader and follower to be written before returning the response. There is a timeout, which defaults to 30 seconds (request.timeout.ms). So after writing a piece of data to the leader disk, there must be a delay task, the expiration time is 30 seconds delay task put in the DelayedOperation Purgatory (delay manager). If 30 seconds ago, if all followers are written to the local disk, then the task will be automatically triggered to wake up, and the response result can be returned to the client, otherwise, the delay task itself specifies a maximum of 30 seconds to expire, if the timeout period is not waited, it will directly timeout and return an exception. The second type of delayed task: when the follower pulls a message to the leader, if it is found to be empty, a delayed pull task will be created at this time After the delay time is up (such as 100ms), an empty data is returned to the follower, and then the follower sends a request to read the message again, but if the leader writes the message during the delay (not yet 100ms), the task will automatically wake up and automatically execute the pull task.

Massive time-lapse tasks need to be scheduled.

15.6  Timewheel Mechanism

    > What would be the time wheel designed? There are many delay tasks inside Kafka, not based on JDK Timer implementation, the time complexity of the insertion and deletion task is O(nlogn), but based on the self-written time wheel to achieve, the time complexity is O(1), relying on the time wheel mechanism, delay task insertion and deletion, O(1).
  1. What is the time wheel? In fact, the time wheel is actually an array. tickMs: 1ms wheelSize: 20 interval: timckMS * whellSize, the total time span of a time wheel. 20ms currentTime: The pointer to the time at that time. a: Because the time wheel is an array, when you want to get the data inside, rely on index, and the time complexity is O(1) b: The corresponding task at a certain position in the array is stored in a doubly linked list, insert into the doubly linked list, delete the task, and the time complexity is also O(1) Example: Insert a task to be executed after 8ms 19ms 3. Multi-level time wheel For example: to insert a task that runs after 110 milliseconds. tickMs: Time wheel interval 20ms wheelSize: Time wheel size 20 interval: timckMS * whellSize, the total time span of a time wheel. 20ms currentTime: The pointer to the time at that time. First
  2. layer time wheel: 1ms * 20 Second layer time wheel: 20ms * 20 Third layer time wheel: 400ms * 20