Sharing guests: Lin Tanghui, Bilibili, senior development engineer

Editor’s Finish: Zhang Zhenfei Shenzhou New Bridge

Production platform: DataFunTalk

Introduction: With the rapid development of business, how can the storage system of Station B evolve to support the exponential growth of traffic peak? With the further increase in traffic, how to design a stable, reliable and easy to expand system to meet the business requirements of further growth in the future? At the same time, in the face of higher availability requirements, how can KV provide higher availability guarantees for applications through remote multi-live? At the end of the article, some typical business applications in KV storage are introduced.

The full text will be expanded around the following 4 points:

Storage evolution

Design implementation

Scenario & Problem

Wrap up the thinking


Storage evolution

First of all, let’s introduce the early storage evolution of Station B.

For different scenarios, early KV storage includes Redix/Memcache, Redis+MySQL, HBASE.

However, with the rapid growth of the data volume of Station B, this storage selection will face some problems:

First of all, MySQL is a stand-alone storage, and the amount of data in some scenarios has exceeded 10 T, and the stand-alone machine cannot be put down. The use of TiDB was also considered at the time, which is a relational database that is not suitable for data that does not have a strong relationship with playing history.

Second, it is the scale bottleneck of Redis Cluster, because redis uses the Gossip protocol to communicate and pass information, the larger the size of the cluster, the greater the communication overhead between nodes, and the longer the state inconsistency between nodes will persist, it is difficult to scale out.

In addition, HBase has serious long-tail and high cache memory costs.

Based on these problems, we put forward the following requirements for KV storage:

Easy to expand: 100x horizontal expansion;

High performance: low latency, high QPS;

High availability: long tail stability, fault self-healing;

Low cost: comparison cache;

Highly reliable: Data is not lost.


Design implementation

Next, let’s look at how we implemented it based on the above requirements.

1. Overall Architecture

The overall architecture is divided into three parts: Client, Node, and Metaserver. Client is the user access terminal, determines the user’s access mode, the user can use the SDK way to access. Metaserver is primarily about storing metadata information about tables divided into which shards and which nodes those shards are located on. When the user reads and writes, he only needs the put and get methods, and does not need to pay attention to the technical details of the distributed implementation. The core point of Node is Replica, each table will have multiple shards, and each shard will have multiple replica copies, through the Raft to achieve synchronous replication between replicas, to ensure high availability.

2. Cluster topology

Pool: A resource pool. According to different business divisions, it is divided into online resource pools and offline resource pools.

Zone: Availability Zone. It is mainly used for fault isolation, ensuring that the copies of each slice are distributed in different zones.

Node: A storage node that can contain multiple disks and store replicas.

Shard: When a table data volume is too large, it can be split into multiple Shards. The split strategy is Range, Hash.

3. Metaserver

Resource management: Mainly records the resource information of the cluster, including what resource pools, Availability Zones, and how many nodes. When a table is created, each shard records such a mapping relationship.

Metadata distribution: Record which node the shard is on.

Health check: Register all node information to check whether the current node is normal and whether there is disk corruption. Based on this information, it is possible to achieve self-healing of failures.

Load detection: Record disk usage, CPU usage, memory usage.

Load balancing: Set thresholds and redistribute data when they are reached.

Split management: When the amount of data increases, scale out.

Raft Host: When a Metaserver hangs up, it can self-heal the fault.

Rocksdb: Metadata information is persisted for storage.

4. Node

As an enclosure, it mainly contains three parts: background thread, RPC access, and abstraction engine layer

(1) Background thread

Binlog management, when the user writes, will record a binlog log, when the failure can be restored to the data. Because local storage space is limited, Binlog management will store some cold data in S3 and popular data locally. The data recovery function is mainly used to prevent accidental deletion of data. When the user deletes the operation, the data is not really deleted, usually set a time, such as a day, after one day the data will be recycled. If the data is deleted by mistake, you can use the data recovery module to restore the data. The health check checks the health status of the node, such as disk information, whether the memory is abnormal, and then reports it to the Metaserver. The Compaction module is mainly used for data recycling management. The storage engine Rocksdb, implemented in LSM, features append only when written.

RPC Access:

When the cluster reaches a certain scale, if there is no automated O&M, the cost of manual O&M is very high. Therefore, the RPC module has added metric monitoring, including QPS, throughput, delay time, etc., which will be convenient to troubleshoot when there is a problem. The throughput of different services is different, how to achieve multi-user isolation? Through Quota management, quotas are applied for when business is accessed, such as a table that applies for 10K QPS, and when it exceeds this worthwhile, it will restrict the flow of users. Different levels of business will be managed differently.

(2) Abstract engine layer

Mainly to cope with different business scenarios. For example, the big value engine, because LSM has the problem of write amplification, if the value of the data is particularly large, frequent writes will cause the effective writing of the data to be very low. These different engines are transparent to the upper layers and can be done at runtime by selecting different parameters.

5. Split – Metadata Update

In the case of KV storage, different shards will be divided according to the service scale at the beginning, and by default a single shard is 24G size. As the volume of business data grows, data from a single shard cannot be put down, and the data will be divided. There are two ways to split, rang and hash. Here we take the hash as an example to expand:

Suppose a table is designed with 3 shards at the beginning, and when data 4 arrives, it should be saved in shard 1 according to the hash receiving. As the data grows, 3 shards can’t be put down, and you need to split, and 3 shards will split into 6 shards. At this time, data 4 is accessed, according to the hash will be allocated to shard 4, if shard 4 is in a split state, Metaserver will redirect access, or access to the original shard 1. When the shard is completed and the status changes to normal, the access can be received normally, and the user is not aware of this process.

6. Split-data equalization recovery

First of all, you need to split the data first, which can be understood as doing a checkpoint locally, and the checkpoint of Rocksdb is equivalent to making a hard link, usually 1ms can complete the data splitting. After the splitting is completed, the Metaserver will synchronously update the metadata information, such as 0-100 data, after the split, the data of 50-100 of shard 1 is actually not needed, and the data can be recycled through the Compaction Filter. Finally, the split data is distributed to different nodes. Because the whole process is to operate on a batch of data, rather than copying one by one when the master and slave copy like redis is, thanks to such an implementation, the entire splitting process is at the millisecond level.

7. Multi-life disaster tolerance

The previously mentioned splitting and Metaserver to guarantee high availability are still not sufficient for some scenarios. For example, the cluster of the entire computer room is hung up, which is mostly solved by using multi-work in the industry. The multi-activity of our KV storage is also based on Binlog, such as writing a piece of data in the computer room of Cloud Cube, which will be synchronized to the computer room of Jiading through Binlog. If the storage part of the computer room located in Jiading is hung up, the proxy module will automatically cut the traffic to the computer room of Cloud Cube for read and write operations. In the most extreme case, the entire computer room is hung up, and all user access will be centralized in one computer room to ensure availability.


Scenario & Problem

Next, the typical scenarios of KV application in Station B and the problems encountered are introduced.

The most typical scenario is the user portrait, such as recommendation, which is done through the user portrait. Other dynamics, chases, object storage, bullet screens, etc. are stored through KV.

1. Customized optimization

Based on the abstract implementation, it is convenient to support different business scenarios and optimize some specific business scenarios.

Bulkload’s fully imported scenes are mainly used for dynamic recommendation and user portraits. User portraits are mainly T+1 data, before the use of Bulkload, mainly through Hive to write one by one, the data link is very long, it takes about 6 or 7 hours to import 1 billion pieces of data every day. After using Bulkload, you only need to build the data into a rocksdb engine on the hive offline platform, and then upload the data to the object storage on the hive offline platform. After the upload is completed, notify KV to pull it, and after the pull is completed, you can carry out the local Bulkload, and the time can be shortened to less than 10 minutes.

Another scenario is the fixed-length list. You may find that your playback history is only 3,000, and your dynamic history is only 3,000. Because the history is very large, it cannot be stored indefinitely. In order to solve this problem, we developed a customized engine to save a fixed-length list, the user only needs to write to it, when the fixed length is exceeded, the engine will automatically delete.

2. Facing the problem – storage engine

The previously mentioned compaction, in the process of actual use, also encountered some problems, mainly storage engine and raft problems. The storage engine aspect is mainly a problem with Rocksdb. The first is data elimination, which pushes down through different Compactions when data is written. Our playback history will set an expiration time. After the expiration time has passed, assuming that the data is now in the L3 layer, the Compaction will not be triggered when the L3 layer is not full, and the data will not be deleted. In order to solve this problem, we set up a regular Compaction, go back to check whether the key expires at the time of Compaction, and delete this data when it expires.

Another problem is that DEL causes SCAN slow queries. Because when the LSM deletes it, it has to be swept one by one, and there are many keys. For example, the key between 20-40 is deleted, but when the LSM deletes the data, it does not really physically delete it, but does a delete logo. After deletion to do SCAN, will read a lot of dirty data, to filter out these dirty data, when the delete is very much, will cause SCAN to be very slow. In order to solve this problem, two main schemes are used. The first is to set the delete threshold, when the threshold is exceeded, it will force the Compaction to be triggered to delete the data identified by delete. But this will also produce write amplification problems, such as L1 layer of data deleted, delete will trigger a Compaction, L1 file will bring a whole layer of L2 file Compaction, which will bring a very large write amplification problem. In order to solve the write amplification, we added a delayed deletion, in the SCAN time, will count a metric, record the proportion of the currently deleted data to all data, according to this feedback value to trigger the Compaction.

The third is the problem of large Value write amplification, and the current solutions in the industry are achieved through KV storage separation. That’s how we solved it.

3. Facing the problem – Raft

There are two problems at the raft level:

First, our Raft is three copies, and in the event that one copy hangs up, the other two copies can provide service. However, in extreme cases, more than half of the replicas hang up, although the probability is very low, but we still do some operations to shorten the time to restore the system in the event of a failure. The method we use is to reduce the replica, for example, if two of the three replicas are suspended, the cluster will be automatically demoted to single replica mode through a script in the background, so that the service can still be provided normally. At the same time, a process is started in the background to restore the copy, and after the recovery is completed, it is set back to multi-copy mode, which greatly shortens the failure recovery time.

The other is the log flushing problem. For example, in the likes and dynamic scenes, the value is actually very small, but the throughput is very high, and this kind of scene will bring very serious write amplification problems. We use disks, the default is 4k write disk, if each value is dozens of bytes, this will cause a lot of disk waste. Based on such a problem, we will do an aggregate brush, first of all, we will set a threshold, when how many writes, or how many k are written, batch brushing, this batch brushing can increase the throughput by 2 to 3 times.


Wrap up the thinking

1. Application

In terms of applications, we will do the fusion of KV and cache. Because business development does not know much about KV and cache resources, there is no need to consider whether to use KV or cache after the fusion.

Another application improvement is the support for the Sentinel model, which further reduces the cost of replicas.

2. O&M

In terms of operation and maintenance, a problem is slow node detection, we can detect faulty nodes, but how to detect slow nodes, is currently a difficult problem in the industry, and it is also the direction we will strive for in the future.

Another problem is the automatic stamping and balancing, after the disk failure, the current method is to see some alarm matters the next day, and then manually operate. We want to make it an automated mechanism.

3. System

At the system level, the performance optimization of SPDK and DPDK is optimized, and the throughput of KV processes is further improved through these optimizations.

This concludes today’s sharing, thank you.

Share at the end of the article, like, watch, give a 3 combo ~

01/ Sharing guests

Lin Tanghui


Senior Development Engineer

In 2016, he joined Station B, and as the core developer, he experienced the architectural transformation of Station B from monolithic architecture to microservices, and was subsequently responsible for the development of microservice middleware such as message queue service discovery and data transmission. Currently, he is responsible for NoSQL storage, and has built a distributed KV storage system from zero to one, providing high-performance, stable and reliable storage services for the whole station.

02/ Free download materials

03/ Register to watch live PPT for free

04/About us

DataFun: Focus on the sharing and exchange of big data and artificial intelligence technology applications. Founded in 2017, more than 100+ offline and 100+ online salons, forums and summits have been held in Beijing, Shanghai, Shenzhen, Hangzhou and other cities, and more than 2,000 experts and scholars have been invited to participate in sharing. Its public account DataFunTalk has produced 800+ original articles, millions + reads, and 150,000+ accurate fans.

🧐 Share, like, watch, give a 3 combo! 👇