id=”js_tags” class=”article-tag__list”> included in the collection #Pulsar 1
-
Apache Pulsar
-
1. Overview of Kafka
-
2. Pulsar architecture
-
Pull & Push Optional request mode2.4.4
-
Consume ACK and unACK
-
2.4.5 Data Retention
-
Topic Allocation
-
2.3.3 Topic Lookup
-
Broker LoadManager thread
-
Bundle and ownership
-
topic Allocation process
-
design advantages
-
Storage node 2.1.3
-
. Consistency guarantee
-
2.1 Pulsar vs Kafka
-
Pulsar Architecture
-
Multitenancy and Topic Lookup
-
2.4 Produce / Consume Strategy
-
3. Bookkeeper architecture
- evenly shared3.4.2 Read
is expected3.4.3 Read
-
result is out
of order3.3.1
-
Three types of files3.3.2
-
ADD
-
Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment
-
3.1.3
-
Conclusion
-
3.1 Concept3.2
-
3.4 Read flow
-
4.1
-
Horizontally Scaling Broker
-
4.2 Horizontally Scaling Bookie
-
5. Pulsar Consistency
-
Conclusion: Broker failure recovery in seconds 5.2.1
-
scenario
-
5.2.2 Process
-
5.2.3 Conclusion: Bookie failure recovery in
seconds5.1.1
-
mechanism5.1
-
Consistency
-
mechanism5.2
-
Bookie Auto Recovery: Ensemble Change
-
5.3 Broker Recovery:Fencing
-
6. Distributed Log and Raft
-
7.2
-
disadvantages of Pulsar
>2.4.1 Three write routing
strategies2.4.2 Four read delivery strategies2.4.3
2.3.1 Multitenancy 2.3.2
2.1.1 Data collection 2.1.2
2.2
2.3
- 3.4.1 Read is
operation3.3.3 Conclusion3.1.1 Features 3.1.2
Architecture 3.3 Write flow
4. Horizontal expansion
- 5.3.1 Scenario 5.3.2 Process 5.3.3
Redundant copies5.1.2 Consistency
7. Summary
>7.1 Advantages
of Pulsar
The
are briefly summarized below Pulsar’s research.
Apache Pulsar
content
:
-
Kafka: Advantages and disadvantages.
-
Pulsar: Multi-tenancy, Topic Lookup, Production Consumption Model
-
Bookkeeper: Component Concept and Read and Write Flow
-
Scale : Broker or Bookie’s Horizontal Consistency
-
: Broker or Bookie Ensures Log Consistency
-
After Crash Distributed Log & Raft Algorithm
-
Summary
Horizontal
1. Kafka
Overview1.1 Existing problemsMain
problems
:
-
load balancing requires manual intervention: manually generate assignment execution plans corresponding to heterogeneously configured brokers.
-
Uncontrollable failure recovery: After the broker restarts, new data on the partition needs to be copied and indexes rebuilt, and the read and write requests on it are transferred to other brokers, which may cause an avalanche in the cluster in the traffic surge scenario.
Other issues
:
> Cross-data center backup requires maintenance of additional components: MirrorMaker officially admits that redundant replication across computer rooms relies on third-party components such as uber’s uReplicator
Note: Desensitized.
1.3 Advantages
< ul class="list-paddingleft-2">
mature ecology, easy to integrate with existing components such as Flink.
There are many references to complete official documents and books.
The model is easy to use: partition has replication, which is stored as a segment and index.
1.4 Disadvantages
Computing
and storage coupling
-
storage nodes have a state: read and write can only go to the partition leader, and broker restart in high-load clusters is prone to single points of failure and even avalanches.
-
Manual load balancing: Cluster capacity scaling must be manually assigned to the new broker to distribute the load of reads and writes.
Comics Comparison: https://jack-vanlightly.com/sketches/2018/10/2/kafka-vs-pulsar-rebalancing-sketch
2.
Pulsar Architecture
2.1 Pulsar VS Kafka Pular
Kafka | ||
---|---|---|
data collection | Topic, Partition | |
Storage Node and Read/Write Components | Bookkeeper Bookie | Pulsar |
Client | SDK | |
data storage unit | Ledgers -> Fragments | Partition -> Segments |
data consistency guarantees | Ensemble Size | metadata.broker.list |
Write Quorum Size(QW) | Replication Factor | |
Ack Quorum Size(QA) | request.required.acksNote |
: (QW+1)/2 <= QA <= QW <= Ensemble Size <= Bookies Count
2.1.1 Data collection
-
Kafka:topic is divided into partitions, and each partition is persisted in the form of a directory on the leader broker and its multi-copy brokers.
-
Pulsar: There are also multiple partitions, but a partition is only owned by one broker, and a partition is evenly distributed across multiple bookie nodes for persistent storage.
2.1.2 Storage node
-
Kafka: directly persisted to broker, read and written directly by the Client SDK.
-
Pulsar: Decentralized persistence to bookie, read and write by the bookkeeper client embedded in the broker.
2.1.3. Consistency Guarantee
-
Kafka: With multiple broker clusters, multiple replicas per partition, producer specifies a send acknowledgment mechanism guarantee.
-
Pulsar: With a multi-broker cluster, broker Quorum Write to bookie and return the Quorum ACK guarantee.
2.2 Pulsar architecture
2.3 Multi-tenancy and Topic Lookup
2.3.1 multi-tenancy
-
topic is divided into three levels:
persistent://tenant/namespace/topic
, corresponding to the division of Department ->app -> topics
, set the expiration time in namespaces, ACL access authentication control. -
Advantages: Topic resources are isolated by tenant and mixed in the same cluster to improve cluster utilization.
2.3.2 Topic Assign
the Broker’s LoadManager thread
-
Wroker: Wait to allocate all topic partition
>Leader: i.e. Broker Leader, similar to Kafka Controller, summarizing all The load of the broker distributes the topic partition reasonably.
bundles
in the bundle with ownership
-
maintain bundle rings (number of brokers) in ZK in Namespaces 2~3 times), the topic partition falls into the bundle according to
hash(topic_partition)%N
. -
The broker is uniquely bound to the bundle, which holds ownership of all topic partitions in the bundle and is used for broker recovery to ensure high availability.
Topic allocation process
- report load
-
: LoadManager Worker is responsible for reporting load metrics to ZK
zk> get /loadbalance/brokers/localhost:8080{ "pulsarServiceUrl": "pulsar://localhost:6650", "cpu": { "usage": 23, "limit": 50 }, "memory": { "usage": 1, "limit": 10 }, "msgThroughputIn": 100, "msgThroughputOut": 100}
-
bundle distribution: LoadManager Leader aggregates the load of other brokers and distributes bundles based on load
zk> get /loadbalance/leader{"serviceUrl":"http://localhost:8080","leaderReady":false}
-
Allocation result:
zk> ls /namespace/public/default[0x00000000_0x40000000, 0x40000000_0x80000000, 0x80000000_0xc0000000, 0xc0000000_0xffffffff]
zk> get /namespace/public/default/ 0x80000000_0xc0000000
{"nativeUrl":"pulsar://localhost:6650","httpUrl":"http://localhost:8080","disabled":false}
The design advantage is
different from kafka logging all metadata such as topic ISR to zk, pulsar only records the number of partitions of the topic, does not record the mapping relationship between topic and broker, and the number of zk metadata is very small, so it supports million-level topic
> zk> get /admin/partitioned-topics/public/default/persistent/partitioned-topic-1{"partitions":2}
|
2.3.3 Topic
Lookup
- Client
-
initiates a Lookup request to any BrokerA, such as
persistent://public/default/test-topic-1
BrokerA -
calculation The value of
hash(topic_partition)%N
under the default namespace gets the bundle corresponding to the topic partition, so as to find out the ownership -
BrokerA returns the owner BrokerX address.
BrokerX
2.4 Produce / Consume Policy
2.4.1 Three write routing strategies
-
SinglePartition
-
KEY writes to a fixed partition, similar to
hash(key) mod len(partitions)
write to the specified partition. -
Without KEY, a partition is randomly selected to write all messages to the producer.
-
CustomPartition: Users can customize the partitioning strategy specific to the message, such as Java implementing the
MessageRouter
interface.
> RoundRobinPartition (default): Sends messages evenly to brokers through polling in batching units for maximum throughput.
2.4.2 Four read and delivery strategies
-
Failover: Failover consumption, one-to-one, multiple alternatives, guaranteed orderly consumption, high consumer availability, batch ACK, and high availability.
-
: Shared consumption, many-to-many
-
Round Robin distributes messages, similar to Consumer Group but does not guarantee orderly consumption.
-
Message retransmission can only be precisely controlled when ACK:Consumer crash is taken.
-
Horizontally expand the Consumer directly fetch throughput. Unlike kafka, you must first expand the partition before you can expand the consumer
-
Key_Shared: the compromise mode of sharing consumption by KEY, many-to-many, exclusive and shared.
-
KEY hash The same message will be consumed by the same consumer to ensure orderly consumption.
-
Only one ACK
-
horizontal extension Consumer improves read throughput.
>Exclusive (default): exclusive consumption, one-to-one, to ensure orderly consumption , which can batch ACK, is a special case of Failover and does not guarantee high availability.
Shared
2.4.3 Pull & Push Optional Request Mode
-
The Consumer can register the MessageListener interface locally to wait for Broker Push messages.
>Consumer can receive messages synchronously or asynchronously.
2.4.4 Consume ACK and unACK
- ACK
-
by item, batch
-
: If the consumer consumption is wrong, you can request re-consumption, and send the broker after canceling the ACK The message is resent.
-
exclusive, failover: Only the last submitted ACK can be canceled, and a single consumer can be rolled back controllably.
-
shared, key_shared: Analogy with ACK, consumers can only cancel
ACK cancel ACK
the ACK
issued by the previous article and the __consumer_offsets mechanism is similar, after the broker receives the ACK of each consumer, it will update the
consumer’s consumption progress cursor, and persist to a specific ledger.
2.4.5 Data Retention
-
actively retained by default: the slowest subscription accumulated messages cannot be deleted, and in the worst case, after a subscription goes offline, the cursor will still remain in the message Streaming, which causes the message expiration mechanism to fail.
-
Message expiration: Two dimensions of time or size set limits, but only for messages that are actively retained before
-
: Force the old slow cursor to be moved to the TTL point in time, if TTL == Retention, it is forced to expire like kafka
TTL
=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RulZEmz2GY7o4Bd3hAmGhwnuMy0ytNlsfEh5ymsVBC8sxGyXYhGzsxicyjia1ib1L3vaibFtvBjaHN4uw/640?wx_fmt=png”>
two indicators
-
Topic Backlog: The number of messages between the cursor of the slowest subscription and the latest message.
-
Storage Size: The total space of the topic.
-
deleted by segment granularity, with whether the Last Motify Time is earlier than Retention as the criterion for expiration, consistent with kafka
-
Note: bookie is not synchronously expired, space freeing is a background process periodically cleaned up
3. Bookkeeper architecture
append-only distributed KV logging system, K is (Ledger_id, Entry_id
) binary, V is (MetaData, RawData)
binary data.
3.1 Concept
3.1.1 Feature
< ul class="list-paddingleft-2">
efficient write: append-only disk sequential write.
High fault tolerance: Redundant replication of logs through Bookie Ensemble.
High throughput: Directly scales Bookie horizontally to improve read and write throughput.
3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment
-
Ensemble Size: Specifies the number of bookies to write to a log.
-
Ensembles: Specifies the target bookies collection to write a log.
-
Write Quorum: Specifies the number of bookie to write to a log.
-
ACK Quorum: Specifies the number of bookie that a log to check has been written.
-
Segment / Ledger: A piece of log to be written.
-
Fragment: A log that is written.
3.1.3 Conclusion
< ul class="list-paddingleft-2">
Client will pick out bookie with Round Robin’s strategy, and write entry
clients in turn Only wait for ACK Quorum brokers to return Append ACK to be considered successful.
A segment/ledger contains multiple fragments
The entries within a fragment are continuously distributed on the Ensembles Bookies in bands.
In a cycle, a Bookie will store three components of the Entry
3.2 architecture
of discontinuous (EnsembleSize – WriteQuorum
).
-
zk/etcd: strongly consistent
-
store: ledger metadata.
-
Service discovery: bookie’s registry, bookie discovers each other, and the client reads all bookie addresses of the cluster.
-
Bookie: Storage node, only allows
ADD
/READ
two operations, does not guarantee consistency, does not guarantee availability, and has simple functions. -
Client: Implement redundant replication logic, ensure data consistency, and implement complex and most importantly.
metadata store
- metadata
3.3 Writing Process
3.3.1 Three types of files
- Journal
-
WAL
-
Cleanup: When the Write Cache completes the flush disk drop, it will be automatically deleted.
-
Entry Logs
-
concept: the log file of the real disk, save the entries of different ledgers in an orderly manner, and maintain the Write Cache to speed up the search of the hot log.
-
Cleanup: The bookie background GC thread periodically checks whether its associated ledgers have been deleted on zk and automatically cleans up if so.
-
Index Files
-
Implementation: Optional RocksDB and File Store indexes.
> Concept: Used to persist the transaction log of the bookie operation ledger, receive different ledger entries written from multiple ledger clients, directly and efficiently append to memory, and then fsync sequentially write to disk with low latency.
> concept: a side effect of efficient sequential writes is that mapped indexes from (ledger_id, entry_id)
to Entry_Log
must be maintained at the periphery. In order to achieve efficient read, the index file will be separated when Flush Cache is used.
3.3.2 The ADD operation
-
Clients confusingly sending logs of different ledgers to Bookie.
-
Bookie writes additional Journals and writes to Write Cache in order (Write Cache internally uses SkipList to achieve dynamic order, while ensuring efficient reading and writing)
-
After the WriteCache is full, Flush separates the index file and the log file on the disk.
-
Delete the old Journal, create a new Journal and continue to append writes, and so on.
3.3.3 Conclusion The
broker internally persists its stored entry logs for each ledger and builds indexes to improve read efficiency.
3.4
KEY
- sent by the client to
-
: returned directly in Write Cache.
-
Cold KEY: Read the index file corresponding to ledger_id, find the entry log corresponding to entry_id according to index and return.
entry_id ledger_id the
client
3.4.1 Reads are evenly
shared
as round-robin writes, and Cleint will also poll Ensembles for evenly shared reads, and there is no leader read bottleneck.
3.4.2 Read is expected
If a Bookie read
response is really slow, the client will initiate a read request to other copies of Bookie and wait at the same time to ensure low read latency.
3.4.3 The read result
is out of order
Client writes to bookie is polled out of order, so the message read from Ensembles is out of order, and you need to reorder it by itself by pressing entry_id on the client to ensure an orderly response.
4. Horizontal Expansion
4.1 Horizontal Scaling
After the new broker
joins the cluster, the broker leader will transfer part of the topic ownership of the high-load broker to the new broker to distribute the read and write pressure.
4.2 Horizontal scaling Bookie
new After Bookie
joins the cluster, the broker senses through ZK and writes the new entry log of ledger to the new Bookie to improve the read and write throughput and storage capacity of the storage layer.
5. Redundant
replication and consistency assurance of the Pulsar Consistency
5.1 consistency mechanism
logs are implemented by the Bookkeeper Client.
5.1.1 The redundant copy
is
written by multiple copies of Eensembles’ QW and QA as above, ensuring that each log is indeed persisted to bookie.
5.1.2 Consistency mechanism
sliding window: [0, ..., READABLE ... LAC], [LAC+1, ... WAIT_QUOROM ..., LAP]
- LAP(Last Add
-
Pushed): The last entry_id issued by the client LAC (Last Add
-
Confirmed): Client The entry_id of the last ACK received is the boundary of consistency.
Three preconditions for achieving consistency:
-
write ledger can only be appended in Append-Only mode, and become Read-Only when full.
-
must be ACK acknowledged sequentially in the order of the LAP: ensure that the LAC is a consistency boundary, the front log is readable, and the subsequent log is waiting for multi-copy replication.
The LAC
5.2 Bookie Auto Recovery: Ensemble Change
5.2.1 Scenario
bookie crash
After the bookie crash goes offline, you need to restore the number of replicas.
5.2.2 Flow
-
Leader Bookie 5 as Daemon Auditor, constantly sending heartbeats to other Bookies to keep alive.
-
timeout, read zk finds ledger x
[0, 7)
entry_id interval needs to be transferred from 4 to new Bookie -
finds Bookie 6 with less load, and according to Ensembles finds redundant data distributed in
{B1, B2, B3, B5}
-
According to the polling to spread the copy read pressure, copy the entry logs to Bookie
-
one and modify the ZK metadata, replacing copy 4 of LAC0 with 6
Auditor finds Bookie 4
6 one by
5.2.3 Conclusion: Bookie failure recovery in seconds
-
write request fast transfer:
After Bookie 6 joins Ensembles, it directly replaces Bookie 4 to continue Append logging. Because replica recovery is an asynchronous replication of the auditor thread of each node inside each Ensembles, it does not cause the client’s write interruption, and the entire recovery process is almost transparent to the client.
-
LAC dividing line records Ensemble Change history:
In ZK’s ledger metadata, each ensembles update caused by Recovery is recorded, that is, the distribution of each entry log interval of the ledger is recorded.
The following metadata records ledger16 at LAC46, Bookie 3183 offline, and then Bookie 3182 online from LAC47 to continue processing requests:> get /ledgers/00/0000/L0016ensembleSize: 3quorumSize: 2ackQuorumSize: 2lastEntryId: -1state: OPENsegment { ensembleMember: "10.13.48.57:3185" ensembleMember: "10.13.48.57:3184" ensembleMember: "10.13.48.57:3183" firstEntryId: 0}segment { ensembleMember: "10.13.48.57:3185" ensembleMember: "10.13.48.57:3184" ensembleMember: "10.13.48.57:3182" firstEntryId: 47}Note:
The top right can be seen that the metadata of each ledger in ZK hardcodes the IP of Bookie, if the IP changes after Bookie restarts during container deployment, it will cause the copy of the old ledger to be invalidated, so DaemonSet or should be selected when deploying on k8s StatefulSet
5.3 Broker
Recovery: Fencing
5.3.1 Scenario
Broker
crash, or Broker and ZK network partition resulting in brain splitting, partition ownership transfer is required.
5.3.2 Process
- After the
-
heartbeat of Broker1 times out, ZK transfers the ownership of the topic partition to Broker2
-
Broker2 makes a Fencing ledger_X request to Ensemble, and Bookies puts the ledger_X in a Fencing non-writable state.
-
failed to write data and received a FenceException, indicating that the partition has been taken over by the broker, actively abandoning the ownership
-
Client After receiving the exception, it disconnects from Broker1 and establishes a persistent connection between Topic Lookup and Broker2.
-
At the same time, Broker2 performs Forwarding Recovery on the entry logs after ledger_X LAC1 one by one (if the number of entry copies in the unknow state has actually reached WQ, the entry is considered to be written successfully, and LAC1 is self-incremented to LAC2
-
). Broker2 updates the metadata of ledger_X, sets it to the CLOSE state, creates a new ledger, and continues to process the client’s write requests.
Broker1
5.3.3 Conclusion: Broker failure recovery in seconds
-
does not reuse the old ledger, reduce the complexity If the
old ledger_X is reused, it is necessary to ensure that the LAC of all ensemble is consistent, and at the same time involves the strongly consistent replication of the tail entry, which is logically complex. Direct CLOSE ensures that the old ledger will never be written again. -
The recovery logic is simple, the time is short
from the client’s point of view, only need to wait for two processes:after the waiting, directly write data to the new ledger of the new broker, the broker does not participate in any data redundancy replication process, so it is stateless, and can be directly scaled horizontally to improve throughput.
-
ZK transfers partition ownership.
-
The new broker performs Forwarding Recovery on the trailing entry of the UNKNOWN status
6. Distributed Log
vs. Raft
6.1 concept vs.
Raft | DL | |
---|---|---|
role | leader With Followers | Writer (broker) and Bookies |
failover | term | ledger_id |
replication | Majority AppendEntries RPC | Quorum Write |
consistency | Last Committed Index | Last Add Confirmed(LAC) |
brain split | Majority Vote | Broker Fencing |
6.2 Process comparison
6.3 Summary
< ul class="list-paddingleft-2"> the
existence of LAC and LAP, so that the entry can be evenly distributed and stored in each bookie in the form of embedded sequential metadata.
DL differs from Raft in that
the
data of each bookie node is not asynchronously replicated from a single node, but is directly polled and distributed by the client.
-
In order to ensure that bookie can quickly append logs, bookkeeper designed the Journal Append-only sequential write log mechanism.
-
To ensure that bookie can quickly read messages
based
on(lid, eid),
bookkeeper designed the Ledger Store
Therefore, the identity of each bookie storage node is equal, and there is no concept of leader and follower of traditional consistency algorithms, which perfectly avoids the problem that read and write can only go to the leader, resulting in the leader being easy to become a single point of bottleneck.
At the same time, you can directly add new Bookie to improve read and write throughput and reduce the load of other old Bookie.
7. Summarize
the
advantages of 7.1 Pulsar
and
directly solve the existing problems of manual expansion and slow failure recovery of the Kafka container platform.
-
stability and high availability: fast failure recovery of Broker / Bookie in seconds.
-
Horizontal linear scaling: Storage and computing are separated, which can increase the read and write throughput of the broker, reduce the cluster load and increase the storage capacity of the Bookie.
-
Capacity expansion load balancing: After Bookie expands, a new ledger will be created on the new Bookie and the load will be automatically evenly shared.
7.2 The disadvantages of Pulsar
-
many concepts, complex systems, and high threshold for hidden bug fixes.
-
There are few endorsements, and only Tencent Finance and Zhaopin Recruitment are used in China.
The address of this document is https://yinzige.com/2020/04/24/pulsar-survey/
end
public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇