Initially, BIGO’s messaging platform mainly used the open source Kafka as a data backing. With the increasing data scale and continuous iteration of products, the data scale carried by the BIGO message flow platform has increased exponentially, and downstream online model training, online recommendation, real-time data analysis, real-time data warehouse, and other businesses have put forward higher requirements for the real-time and stability of the message flow platform. Open source Kafka clusters are difficult to support massive data processing scenarios, and we need to invest more manpower to maintain multiple Kafka clusters, which will become higher and higher, mainly reflected in the following aspects:
1. Data storage and message queuing service binding, cluster scaling/partition balancing require a large amount of copy data, resulting in cluster performance degradation 。
2. When the partition replica is not in the ISR (synchronous) state, once a broker fails, data loss may occur or the partition cannot provide read and write services.
3. When the Kafka broker disk fails/occupies too much space, manual intervention is required.
4. The group uses KMM (Kafka Mirror Maker) synchronously across regions, and the performance and stability are difficult to meet expectations.
5. In the catch-up read scenario, PageCache pollution is prone to occur, resulting in poor read and write performance.
6. The number of topic partitions stored on the Kafka broker is limited, and the more partitions, the worse the disk read and write orderliness and the lower the read and write performance.
7. The growth of the scale of Kafka clusters has led to a sharp increase in O&M costs, requiring a lot of manpower to be invested in daily O&M. In BIGO, scaling a machine to a Kafka cluster and balancing partitions requires 0.5 people/day. Scaling down one machine takes 1 person/day.
If you continue to
use Kafka, the costs will continue to rise: scale machines up and down, increase O&M manpower. At the same time, with the growth of business scale, we have higher requirements for the messaging system: the system should be more stable and reliable, easy to scale horizontally, and low latency. In order to improve the real-time, stability, and reliability of message queues and reduce O&M costs, we began to consider whether to do localized secondary development based on open source Kafka, or see if there is a better solution in the community to solve the problems we encountered when maintaining Kafka clusters.
Why Pulsar In
November 2019, we began researching message queues to compare the advantages and disadvantages of current mainstream message flow platforms and align with our needs. In the process of research, we found that Apache Pulsar is a next-generation cloud-native distributed message flow platform that integrates messaging, storage, and lightweight functional computing. Pulsar scales seamlessly, with low latency, high throughput, and supports multi-tenancy and cross-region replication. Most importantly, Pulsar’s storage-compute separation architecture can perfectly solve the problem of Kafka scaling and scaling. The Pulsar producer sends the message to the broker, which writes to the second-tier storage BookKeeper through the bookie client.
Pulsar adopts a tiered architecture design of storage and computing separation, supports multi-tenant, persistent storage, and cross-region data replication in multiple data centers, and has the characteristics of strong consistency, high throughput, and low latency and highly scalable streaming data storage.
1. Horizontal expansion: It can seamlessly expand to hundreds or thousands of nodes.
2. High throughput: has been tested in Yahoo! production environment, supporting publish-subscribe (Pub-Sub) of millions of messages per second.
3. Low latency: It can still maintain low latency (less than 5 ms) under large-scale message volume.
4. Persistence mechanism: Pulsar’s persistence mechanism is built on Apache BookKeeper, which realizes read/write splitting.
5. Read/write splitting: BookKeeper’s read/write splitting IO model greatly exerts the sequential write performance of disks, which is relatively friendly to mechanical hard disks, and the number of topics supported by a single bookie node is not limited.
In order to further deepen our understanding of Apache Pulsar and gauge whether Pulsar can truly meet the needs of our production large-scale messaging Pub-Sub, we began a series of stress tests in December 2019. Since we were using an HDD and did not have an SSD, we encountered some performance issues during the stress test, with the assistance of StreamNative, we respectively sum a series was carried out tuning, Pulsar has improved throughput and stability.
After 3~4 months of stress testing and tuning, we believe that Pulsar is fully capable of solving the various problems we encountered when using Kafka, and launched Pulsar in the test environment in April 2020.
Apache Pulsar at BIGO: Pub-Sub consumption model
In May 2020, we officially launched the Pulsar cluster in production. Pulsar’s scenario in BIGO is mainly the classic production and consumption model of Pub-Sub, with Baina services (data receiving services implemented in C++) on the front end, Mirror Maker and Flink in Kafka, and producers of clients in other languages such as Java, Python, C++, etc. to write data to the topic. The backend is consumed by Flink and Flink SQL, as well as consumer consumers of other languages.
In the downstream, we connect business scenarios such as real-time data warehouse, real-time ETL (Extract-Transform-Load, the process of extracting, transforming, and loading data from the source to the destination), real-time data analysis, and real-time recommendation. Most business scenarios use Flink to consume data in Pulsar topics and process business logic. The client languages used in other business scenarios are mainly distributed in C++, Go, Python, etc. After the data is processed by its own business logic, it is eventually written to Hive, Pulsar topic, and third-party storage services such as ClickHouse, HDFS, and Redis.
Pulsar + Flink live streaming platform
At BIGO, we built a live streaming platform with Flink and Pulsar. Before introducing this platform, let’s take a look at the inner workings of Pulsar Flink Connector. In the Pulsar Flink Source/Sink API, there is a Pulsar topic upstream, a Flink job in the middle, and a Pulsar topic downstream. How do we consume this topic, and how do we process data and write it to the Pulsar topic?
Follow the code example on the left side of the figure above to initialize a StreamExecutionEnvironment and configure it, such as modifying the property and topic values. Then create a FlinkPulsarSource object, fill in the serviceUrl (brokerlist), adminUrl (admin address), and serialization of the topic data, and finally pass the property in, so that the data in the Pulsar topic can be read. The use of Sink is very simple, first create a FlinkPulsarSink, specify the target topic in the sink, then specify TopicKeyExtractor as the key, and call addsink to write data to the sink. This production and consumption model is very simple, very similar to Kafka.
How does the Pulsar topic and Flink’s consumption link? As shown in the following figure, when you create a new FlinkPulsarSource, a new reader object is created for each partition of the topic. It should be noted that the Pulsar Flink Connector uses the reader API to consume, and a reader will be created first, which uses the Pulsar Non-Durable Cursor. The feature of reader consumption is to read a piece of data and commit it immediately, so you may see that the subscription corresponding to the reader does not have backlog information on the monitoring.
In Pulsar version 2.4.2, when the topic subscribed by the non-durable cursor receives the data written by the producer, the data will not be saved in the broker’s cache, resulting in a large number of data read requests falling into BookKeeper, reducing the data read efficiency. BIGO fixed this issue in Pulsar version 2.5.1.
After Reader subscribes to the Pulsar topic and consumes the data in the Pulsar topic, how does Flink guarantee exactly-once? The Pulsar Flink Connector uses a separate subscription, this one using the Durable Cursor. When Flink triggers a checkpoint, the Pulsar Flink Connector will checkpoint the reader’s state (including the consumption location of each Pulsar Topic Partition) to a file, memory, or RocksDB, and when the checkpoint is complete, it will post a Notify Checkpoint Complete notification. After the Pulsar Flink Connector receives the checkpoint completion notification, it submits the consumption offset, that is, message ID, to the Pulsar broker with a separate SubscriptionName, at which point the consumption offset information is actually recorded.
After the Offset Commit is completed, the Pulsar
broker stores the Offset information (represented as Cursor in Pulsar) in the underlying distributed storage system, BookKeeper, which has the advantage of two layers of recovery guarantees when the Flink task is restarted. The first case is to recover from the checkpoint: you can get the last consumed message ID directly from the checkpoint, get the data through this message ID, and the data stream can continue to be consumed. If it is not restored from the checkpoint, after the Flink task is restarted, it will start consuming the offset position corresponding to the previous commit from Pulsar according to the SubscriptionName. This can effectively prevent the problem of checkpoint corruption causing the entire Flink task to fail to start successfully.
The Checkpoint process is shown in the following figure.
Do checkpoint N first, publish a notify Checkpoint Complete after completion, wait for a certain time interval, then do checkpoint N+1, and then perform a notify Checkpoint Complete operation after completion, at this time perform a Durable Cursor Commit, and finally commit to the Pulsar topic On the server side, this can ensure that the checkpoint is exactly-once, and can also ensure the message “keep alive” according to the subscription you set.
What problem does Topic/Partition Discovery solve? When a Flink task consumes a topic, if the topic increases the partition, the Flink task needs to be able to automatically discover the partition. How does the Pulsar Flink Connector do this? Readers who subscribe to topic partitions are independent of each other, each task manager contains multiple reader threads, according to the hash function to map the topic partitions contained in a single task manager, when a new partition is added to the topic, the newly added partition will be mapped to a task manager, and the task manager will create a reader and consume new data after discovering the new partition. The user can set the detection frequency by setting the ‘partition.discovery.interval-millis’ parameter.
In order to lower the threshold for Flink to consume Pulsar topics and make Pulsar Flink Connector support richer new Flink features, the BIGO Message Queuing team added Pulsar Flink SQL DDL (Data Definition Language) and Flink 1.11 support to the Pulsar Flink Connector. Previously, the official Pulsar Flink SQL only supported Catalog, and it was inconvenient to consume and process the data in the Pulsar topic in the form of DDL. In the BIGO scenario, most of the topic data is stored in JSON format, and the JSON schema is not registered in advance, so it can only be consumed after specifying the DDL of the topic in Flink SQL. In response to this scenario, BIGO has done secondary development based on the Pulsar Flink Connector, providing a code framework for consuming, parsing, and processing Pulsar topic data in the form of Pulsar Flink SQL DDL (as shown in the figure below).
In the code on the left, the first step is to configure the consumption of the Pulsar topic,
first specify the DDL form of the topic, such as rip, rtime, uid, etc., and the following is the basic configuration of consuming the Pulsar topic, such as topic name, service-url, admin-url, etc. After the underlying reader reads the message, it will parse the message according to DDL and store the data in a test_flink_sql table. The second step is general logical processing (such as field extraction of the table, join, etc.), and after obtaining relevant statistics or other relevant results, these results are returned and written to HDFS or other systems. The third step is to extract the fields and insert them into a hive table. Since Flink 1.11 has better write support for hive than 1.9.1, BIGO has made another API compatibility and version upgrade to make the Pulsar Flink Connector support Flink 1.11.
BIGO’s real-time streaming platform built on Pulsar and Flink is mainly used for real-time ETL processing scenarios and AB-test scenarios.
Real-time ETL processing
scenarios Real-time ETL processing scenarios mainly use Pulsar Flink Source and Pulsar Flink Sink. In this scenario, Pulsar topics implement hundreds or even thousands of topics, each with its own schema. We need to perform routine processing on hundreds or thousands of topics, such as field conversion, fault tolerance, writing to HDFS, etc. Each topic corresponds to a table on HDFS, and hundreds of topics will be mapped to hundreds of tables on HDFS, and the fields of each table are different, which is the real-time ETL scenario we encountered.
The difficulty of this scenario is the large number of topics. If you maintain one Flink task per topic, the maintenance cost is too high. Previously, we wanted to sink the data in the Pulsar topic directly to HDFS through the HDFS Sink Connector, but the logic inside was very troublesome. In the end, we decided to use one or more Flink tasks to consume hundreds or thousands of topics, each topic with its own schema, directly use readers to subscribe to all topics, perform schema parsing post-processing, and write the processed data to HDFS.
As the program runs, we find that there is also a problem with this scheme: the pressure is unbalanced between operators. Because some topics have a large traffic and some have a small flow, if it is completely mapped to the corresponding task manager by random hashing, some task managers will process high traffic, while some task managers will process very low traffic, resulting in very serious congestion on some task machines, slowing down the processing of Flink streams. So we introduced the concept of slot group, grouped according to the traffic of each topic, the traffic will be mapped to the number of partitions of the topic, and it is also based on the traffic when creating topic partitions, if the traffic is high, create more partitions for the topic, and vice versa. When grouping, topics with small traffic are divided into a group, and topics with large traffic are placed in a separate group, which isolates resources well and ensures that the task manager has a balanced traffic as a whole.
Real-time data warehouse needs to provide hourly or daily tables to provide data query services for data analysts and recommendation algorithm engineers, simply put, there will be many dots in the app application, and various types of dots will be reported to the server. If the original dots are directly exposed to the business side, different business users need to access various original tables to extract data from different dimensions and correlate calculations between the tables. Frequent data extraction and association operations on the underlying basic table will seriously waste computing resources, so we extract the dimensions that users care about from the basic table in advance, and merge multiple dots together to form one or more wide tables, covering 80% ~ 90% of the scenario tasks recommended above or data analysis.
In the real-time data warehouse scenario, real-time intermediate tables are also required, and our solution is that for topic A to topic K, we use Pulsar Flink SQL to parse the consumed data into the corresponding tables. Under normal circumstances, the common practice of aggregating multiple tables into one table is to use join, such as joining tables A to K according to uid to form a very wide wide table; However, joining multiple wide tables in Flink SQL is inefficient. Therefore, BIGO uses union instead of join, making a wide view, returning the view in hours, writing to ClickHouse, and providing real-time queries to downstream business parties. Using union instead of the aggregation of join acceleration tables can control the output of intermediate tables at the hourly level to the minute level.
The output day table may also require joining the offline table stored on the hive or other storage media, that is, the problem of joining between the flow table and the offline table. If you join directly, the intermediate state that needs to be stored in the checkpoint will be relatively large, so we have optimized it in another dimension.
The left part is similar to the hourly table, each topic is consumed using Pulsar Flink SQL and converted into a corresponding table, and the union operation between tables is carried out, and the resulting table is entered into HBase in days (HBase was introduced here to replace its join).
The right side needs to join offline data, use Spark to aggregate offline Hive tables (such as tables a1, a2, a3), and the aggregated data will be written to HBase through a well-designed row-key. The state of the data after aggregation is as follows: assuming that the key of the data on the left fills in the first 80 columns of the wide table, the data calculated by the later Spark task corresponds to the same key, fill in the last 20 columns of the wide table, form a large wide table in HBase, and extract the final data from HBase again and write it to ClickHouse for upper users to query, which is the main architecture of AB-test.
Since its launch in May 2020, Pulsar has been running stably, processing tens of billions of messages per day and incarnating 2~3 GB/s bytes into traffic. The high throughput, low latency, and high reliability provided by Apache Pulsar greatly improve the BIGO message processing capabilities, reduce the O&M cost of message queues, and save nearly 50% of hardware costs. At present, we have deployed hundreds of Pulsar brokers and bookie processes on dozens of physical hosts, adopted the mixed mode of bookie and broker on the same node, migrated ETL from Kafka to Pulsar, and gradually migrated the business of consuming Kafka clusters in the production environment (such as Flink, Flink SQL, ClickHouse, etc.) to Pulsar. As more businesses migrate, traffic on Pulsar continues to rise.
Our ETL job has more than 10,000 topics, each topic has an average of 3 partitions, using a 3-replica storage strategy. Previously, with Kafka, as the number of partitions increased, the disk gradually degraded from sequential reads and writes to random reads and writes, and the read and write performance degraded severely. Apache Pulsar’s storage tiering design easily supports millions of topics, providing elegant support for our ETL scenarios.
Looking forward to
BIGO’s performance in Pulsar broker
load balancing, broker cache hit rate optimization, broker related monitoring, BookKeeper read and write performance, BookKeeper disk IO performance optimization, Pulsar and Flink, Pulsar and Flink SQL A lot of work has been done in terms of combination, which has improved the stability and throughput of Pulsar, and lowered the threshold for the combination of Flink and Pulsar, laying a solid foundation for the promotion of Pulsar.
In the future, we will increase the application of Pulsar in BIGO scenarios to help the community further optimize and improve Pulsar functions, as follows:
1. Develop new features for Apache Pulsar, such as supporting topic policy related features.
2. Migrate more tasks to Pulsar. This work involves two aspects, one is to migrate the previous tasks that used Kafka to Pulsar. Second, new services are directly connected to Pulsar.
3. BIGO is ready to use KoP to ensure a smooth transition of data migration. Because BIGO has a large number of Flink tasks that consume Kafka clusters, we hope to be able to do a layer of KoP directly in Pulsar to simplify the migration process.
4. Continuous performance optimization of Pulsar and BookKeeper. Due to the high traffic rate in the production environment, BIGO has high requirements for the reliability and stability of the system.
5. Continue to optimize BookKeeper’s IO protocol stack. The underlying storage of Pulsar itself is an IO-intensive system, and ensuring high IO throughput at the bottom can improve the throughput of the upper layer and ensure stable performance.
public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇