Author | Liu Bin
Curated | Founded

in 2012, Kingsoft Cloud is one of the top three Internet cloud service providers in China, listed on NASDAQ in May 2020 and has a business scope in many countries and regions around the world. Since its establishment 8 years ago, Kingsoft Cloud has always adhered to the customer-centric service concept and provided safe, reliable, stable and high-quality cloud computing services.

Kingsoft Cloud Log Service is a one-stop service system for log data processing, providing multiple services from log collection, log storage to log retrieval and analysis, real-time consumption, log delivery, etc., supporting log query and monitoring services of multiple business lines, improving the operation and maintenance and operation efficiency of Kingsoft Cloud’s various product lines, and currently has a daily data level of 200 TB.

As a one-stop service system for log data processing, Kingsoft Cloud Log Service needs to have the following features

:

  • data collection: customized development based on Logstash and Filebeat to support more data collection forms.

  • Data query: SQL and ElasticSearch Query String syntax are supported.

  • Data consumption: Based on Pulsar to encapsulate external sockets, some product lines (want to show log scrolling scenarios in the console) can be implemented through the websocket protocol of the entire log service; You can also query all log data (i.e., as a queue) through an exposed REST API.

  • Exception alarm: After retrieving data in the console, save the data and retrieval syntax as alarm items, and support the configuration of the overall alarm policy and alarm mode. After the exception is retrieved, the background will start the corresponding task to achieve real-time alarm.

  • Chart display: Save the statements and query results retrieved in the console as charts (bar charts, line charts, etc.), and when you enter the console again, click the dashboard to see all the query statements and result data that have been saved currently or before.

  • Data heterogeneity: You can customize whether to deliver logs to other cloud product lines, such as sending data from certain logs to object storage, so as to achieve some other operations (such as sending data to Hive data warehouses and then analyzing them).

1Why

Pulsar

During the survey, we compared RocketMQ, Kafka, and Pulsar in terms of basic functionality and reliability, and summarized the advantages and disadvantages of the three (see the table below for comparison results).

We found Pulsar to be well suited for log stream processing. At the BookKeeper level, Pulsar is the component of log stream storage. Pulsar adopts a cloud-native architecture, and the log stream service also adopts a cloud-native, serviceless model, and all services are implemented on the cloud. Pulsar has many flexible enterprise-level features, such as multi-tenancy, tenant storage quota, data ETL, and overall data load balancing strategy. Support the transfer of large amounts of data; The monitoring of message queues is relatively perfect. Let me take a closer look at some of the features we chose to use Pulsar.

separation of computing and

storagePulsar’s producer and consumer are connected to the broker, and the broker, as a stateless service, can be scaled horizontally and scaled up, and the scaling will not affect the overall production and consumption of data; The broker does not store data, and the data is stored in the next layer of the broker (i.e. bookie), which realizes the separation of computing and storage.

Elastic Horizontal Scaling

For cloud-based products, Pulsar can scale brokers up without rebalancing. In contrast, Kafka needs to be rebalanced before scaling in, which can result in high cluster load and impact the overall service.

Secondly, Pulsar topic partitions can also be scaled indefinitely, and after expansion, the entire shard and traffic are automatically balanced through the load balancing policy.

Pulsar MultitenancyPulsar

natively supports multitenancy. In Log Service, there is also the concept of tenants, where each product line (that is, each project) belongs to a tenant, realizing data isolation between product lines. Pulsar clusters support millions of topics (already practiced in Yahoo), and the entire topic is also isolated through tenants, and at the tenant level, Pulsar implements excellent features such as storage quotas, message expiration and deletion, quarantine policies, and supports separate authentication and authorization mechanisms.

load balancing strategy

Pulsar has the concept of bundle at the namespace level, if the current broker load is high, the bundle will perform bundle split operation through the management topic partition policy, automatically balancing the child partitions to other brokers with lower load. When creating a topic, Pulsar will also automatically assign the topic to the broker with the current low load.

Pulsar IO model

write operation, the broker concurrently writes data to BookKeeper; When Bookie reports that the data is successfully written to the broker, only one queue is maintained internally at the broker level. If the current consumption pattern is real-time consumption, you can get data directly from the broker without going through Bookie queries, thereby improving message throughput. In the catch-up read scenario, you need to query bookie to query historical data; Catch-up read also supports data offloading, which offloads data to other storage media, such as object storage or HDFS, to cool store historical data.

Topic Creation, Production, and ConsumptionAfter

creating a topic in the console, record the topic information and tenant information to etcd and

MySQL, and then the two types of services on the right side of the figure listen to etcd, one is the producer class service, which listens for internal operations after creating or deleting topics. The other type is the consumer service, when listening to the operation of creating a new topic, the corresponding service will connect to the Pulsar topic and consume the data on the topic in real time. Then the producer starts receiving data and determines which topic should write the data to, consumer consumption data and write it after judgment, or dump it and write it to other ES or other storage.

> graph 1. Topic creation, production, consumption process

Topic logical abstraction

There are three levels in Pulsar: topic, namespace, and tenant. Because Pulsar does not currently support the namespace-level regular consumption model, we need to take the overall concept up to reduce the amount of back-end Flink tasks and achieve project-level consumption. That is to say, in Log Service, the topic corresponds to the logical shard of Pulsar, and the namespace corresponds to the logical topic of Pulsar. With this change, we have implemented two functions, one is to dynamically increase and decrease the number of shards, and the other is that Flink tasks started in the background can consume data at the level of a single project.

Figure 2. Pulsar topic Logical abstraction diagram

Message Subscription Model

Pulsar provides four message subscription models

:

  1. exclusive mode: when multiple consumers subscribe to a topic with the same subscription name, only one consumer can consume data.

  2. Failover mode (Failover): When multiple consumers subscribe to a Pulsar topic through the same subscription name, if one consumer fails or the connection is interrupted, Pulsar will automatically switch to the next consumer to achieve a single point of consumption.

  3. Shared mode (Shared): a widely used model, if multiple consumers are started, but only one subscriber subscribes to topic information, Pulsar will send data to the consumer in turn through polling; If one consumer goes down or the connection is lost, the message is polled to the next consumer. LogHub uses a shared subscription model, where the entire Hub runs in containers and can dynamically scale and shrink the consumer side according to the overall load or other metrics.

  4. Key_Shared Message subscription model: Maintain data consumption consistency through key hashing.

broker failure recovery

Because the broker

is stateless, the downtime of a certain broker has no impact on the overall production and consumption, and a new broker will play the role of owner, obtain topic metadata from ZooKeeper, and automatically evolve into the new owner, and the storage layer of data will not change.” In addition, there is no need to copy the data in the topic to avoid data redundancy.

Bookie FailbackThe Bookie

layer uses sharding to store information. Due to the multi-copy mechanism of Bookie itself, when a Bookie fails, the system will read the information of the corresponding shard from other Bookie and rebalance it, so the entire Bookie write will not be affected, ensuring the availability of the entire topic.

2Pulsar applies

the

lowest level of the log service system in Log Service is the data collection tool, which we customized based on open source data collection tools (such as Logstash, Flume, Beats). The log pool in the data store is a logical concept that corresponds to the topic in Pulsar. The upper layer of the Log Service system is query analysis and business applications, which refers to retrieval and analysis on the workbench of Log Service or query through SQL syntax. Business applications refer to customizing dashboards and charts in the console to achieve real-time alarms. Both query analytics and business applications support data dumping, i.e. transferring log data to storage media or lower-priced storage devices such as KS3-based object storage, ElasticSearch clusters, or Kafka. The following figure shows the product features of Log Service.

graph 3. The product functions of Log Service <

A href=” http://mp.weixin.qq.com/s?__biz=MzIxMTE0ODU5NQ==&mid=2650245663&idx=1&sn=a88215618a3dbc5c346e57c0fce7556b&chksm=8f5ae043b82d6955bc597cab61caf9e16714200f398ffab81b575518c6ab523d52441de54be8&scene=21#wechat_redirect “> Log Service Architecture DesignWe

design the hierarchical architecture of Log Service based on the product functions of Log Service (as shown in the figure below). The lowest layer is the data collection end, which is responsible for collecting log text data, TP/TCP protocol data, log data in MySQL, etc., and the development of our self-developed collection end is still in progress. The collected data is sent to the corresponding Pulsar topic through the Log Service data portal. We apply the Pulsar cluster to three major sectors, one is to realize the multi-dimensional statistical analysis scenario through the Flink-on-Pulsar framework, because some business lines need to do multi-dimensional aggregation statistics through log data, generate indicator result data, and then transfer it to the business line. The second is to apply the Pulsar cluster to LogHub (microservice-based service), mainly consuming the data of the Pulsar topic, writing the data directly to ES, querying the data of the entire log stream through the console, and also doing retrieval and analysis. The third is to use Pulsar Functions on the console to set up some operators or ETL logic, and do data ETL through the Pulsar Functions module in the background. We use the EalsticSearch cluster to store data retrieval and analysis results, KS3, KMR, KES correspond to some of our internal cloud product lines for storage and computing. The data output part of the upper layer can be divided into two modules, one is the Search API module, which is responsible for providing APIs to the outside world, and performs some actions tightly coupled with logs in the console by calling the API; The second is the Control API module, which supports management operations in the console, such as creating topics, adjusting the number of partitions of topics, and retrieving alarms.

Figure 4. Diagram < a of the layered architecture of Log Service

href=” http://mp.weixin.qq.com/s?__biz=MzIxMTE0ODU5NQ==&mid=2650245663&idx=1&sn=a88215618a3dbc5c346e57c0fce7556b&chksm=8f5ae043b82d6955bc597cab61caf9e16714200f398ffab81b575518c6ab523d52441de54be8&scene=21#wechat_redirect “> communication design

of Log

Service From the perspective of the product architecture of Log Service, the entire service adopts a stateless operation mode, so various services (especially producer and consumer services) share data through etcd. That is, after the console performs create, update, and delete operations, the producer and consumer will sense these actions and change accordingly. In addition, because the producer and consumer run entirely in containers and the service itself is stateless, it can be dynamically scaled. The following diagram describes the communication design of Log Service.

Figure 5. Communication design diagram

< a of Log Service href=" http://mp.weixin.qq.com/s?__biz=MzIxMTE0ODU5NQ==&mid=2650245663&idx=1&sn=a88215618a3dbc5c346e57c0fce7556b&chksm=8f5ae043b82d6955bc597cab61caf9e16714200f398ffab81b575518c6ab523d52441de54be8&scene=21#wechat_redirect ">

log stream processing architecture

According to the requirements of log stream processing, we have designed the following architecture diagram. On the left is the data collection end, the collection end sends the data to the data receiving end (PutDatas), and the receiving end sends the data to the corresponding Pulsar topic. We apply Pulsar mainly to three scenarios.

  1. Add the Flink program to Pulsar to achieve customized ETL multidimensional analysis, statistics, aggregation and other operations.

  2. Use Pulsar in LogHub to consume and store data. After consuming data from Pulsar, the collected log data is written to the ElasticSearch cluster.

  3. Use Pulsar on WebSocket and REST APIs. WebSockets enable real-time scrolling logs to be viewed in the console, and the REST API supports querying data in specific queues. At the same time, we implemented some simple ETL processing through Pulsar Functions, and transferred the processed data to a line-of-business storage medium (such as to a data warehouse, Kafka, or ElasticSearch cluster).

Figure 6. Log stream processing architecture diagram

3 Future planning

With the support of Pulsar, Kingsoft Cloud Log Service has been running well. We expect Log Service to support more functions and fulfill more requirements. In terms of Log Service, our plan is as follows:

  • add sequential consumption capacity (billing and audit scenarios may require sequential consumption capacity).

  • Merging and splitting partitions.

  • Enable a fully containerized deployment. At present, the internal service of Log Service has completed the containerization operation, and in the next step, we will focus on the containerized deployment of all Pulsar clusters.

Currently, Log Service supports about 15 Kingsoft Cloud internal product lines (as shown in the figure below), with a single online data transmission of about 200 TB/day and more than 30,000 topics. When AI services are connected to Pulsar, the overall data volume and number of topics will be greatly improved.

Figure 7. During the testing and use of the product line using Log Service

,

we have a more comprehensive understanding of Pulsar and expect Pulsar to support more features, such as:

    > Remove the dependency on ZooKeeper. ZooKeeper currently maintains all metadata for Pulsar, which is under pressure; And the number of topics in Pulsar is mandatory to depend on the ZooKeeper cluster. If you store the Pulsar metadata information in bookie, you can achieve an infinite increase in the number of topics.

  • Automatically scale partitions. Log data has peaks and valleys, and at peak times, the number of partitions of the current topic is automatically expanded to improve the overall concurrency. In the event of a trough, the number of partitions is scaled down to reduce the pressure on cluster resources.

  • Provides namespace-level regular matching. In the background Flink task, namespace-level data is no longer listened to, reducing the amount of background tasks of Flink.

4 Conclusion

As a next-generation cloud-native distributed message flow platform, Apache Pulsar has its unique advantages and is very suitable for our log stream processing scenarios. The Pulsar community is very active and informative. During our preliminary research, follow-up testing and official launch, StreamNative’s partners have given great support and helped us quickly move our business online.

Currently, Kingsoft Cloud Log Service has more than 30,000 Pulsar topics, processing about 200 TB of data per day and supporting 15 product lines. Since its launch, Pulsar has been running steadily, which has greatly saved us in development and O&M costs. We look forward to the containerized deployment of Pulsar clusters as soon as possible, and we also expect Pulsar to remove the dependency on ZooKeeper and support automatic scaling of partitions. We are willing to work with our friends in the Pulsar community to develop new features to further accelerate the development of Pulsar.

About authorAuthor Liu

Bin is a senior development engineer of Kingsoft Cloud Big Data.

end



public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.

like + Looking, less bugs 👇