At present, the product requirements and internal decision-making of major companies are becoming more and more urgent for real-time data, and the ability of real-time data warehouses is needed to empower. The data timeliness of traditional offline data warehouses is T+1, and the scheduling frequency is measured in days, which cannot support the data requirements of real-time scenarios. Even if the scheduling frequency can be set to hours, it can only solve some scenarios with low timeliness requirements, and it is still not elegantly supported for scenes with high practical requirements. Therefore, the problem of using data in real time must be effectively solved.
2. Real-time technology is maturing
Real-time computing frameworks have gone through three generations of development, namely: Storm, SparkStreaming, and Flink, and computing frameworks are becoming more and more mature. On the one hand, the development of real-time tasks can be completed by writing SQL, which can well inherit the architectural design ideas of offline data warehouses at the technical level; On the other hand, the functions provided by the online data development platform are also maturing to support real-time task development, debugging, and operation and maintenance, and the development cost is gradually reduced, which is helpful to do this.
Second, the purpose of real-time data warehouse construction
1. Solve the problem of traditional data warehouse
From the current status of data warehouse construction, real-time data warehouse is a confusing concept, according to traditional experience analysis, data warehouse has an important function, that is, can record history. Usually, data warehouses want to have data from the first day of business launch, and then record it until now. However, real-time stream processing technology is a technology that emphasizes the current processing state, combined with the current construction experience of first-line large factories and the construction status of Didi in this field, we try to position the purpose of real-time data warehouse construction in the company as to solve the problem that cannot be solved due to the low timeliness of the current offline data warehouse data with data warehouse construction theory and real-time technology.
At this stage, the main reasons for us to build a
real-time data warehouse are:
- the
-
company’s business is becoming more and more urgent for the real-time nature of data, and real-time data is needed to assist in decision-making; -
The construction of real-time data is not standardized, the data availability is poor, the data warehouse system cannot be formed, and a large amount of resources are wasted; -
The support of data platform tools for overall real-time development is also maturing, and development costs are decreasing.
2. Application scenarios
of real-time data warehouse< ul class="list-paddingleft-2">
Third, the real-time data warehouse construction plan Next, we analyze several cases of the current real-time data warehouse construction,
hoping that these cases can bring you some inspiration.
1. Didi Hitchhiker real-time data warehouse caseThe real-time data warehouse built by the Didi data team basically meets the various business needs of the hitchhiking business side on the real-time side, preliminarily establishes the
real-time data warehouse of Hitchhiker, completes the overall data layering, includes detailed data and summary data, and unifies the DWD layer.
It reduces the consumption of big data resources, improves data reusability, and can output rich data services.
The specific architecture of the data warehouse is shown in the following figure:
From the data architecture diagram, there are many similarities between the real-time data warehouse and the corresponding offline data warehouse. For example, hierarchical structures; For example, ODS layer, detail layer, summary layer, and even application layer, their naming pattern may be the same. But it is not difficult to find that there are many differences between the two:
- from the current experience of building offline data warehouses, the data detail layer of the data warehouse will
-
be very rich, and the processing of detailed data will generally contain the concept of a light summary layer, and the application layer data in the offline data warehouse is inside the data warehouse, but in the real-time data warehouse, the app The data of the application layer has fallen into the storage medium of the application system, and the layer can be separated from the table of the data warehouse; -
The benefits of less application layer construction: when processing data in real time, each level is built, and the data will inevitably produce a certain delay; -
The advantage of less aggregation layer: When summarizing statistics, often in order to tolerate the delay of some data, some delays may be artificially created to ensure the accuracy of the data. For example, when counting data in order events related across days, you may wait until 00:00:05 or 00:00:10 to make sure that all the data before 00:00 has been received. Therefore, if there are too many layers of aggregation layers, it will increase the human-caused data delay.
-
When building the offline data warehouse, the entire offline data warehouse inside Didi is currently built on the Hive table. However, when building a real-time data warehouse, the same table will be stored in different ways. For example, in common cases, detailed data or summary data will be stored in Kafka, but dimensional information such as cities and channels needs to be stored with the help of databases such as Hbase, mysql or other KV storage.
Next, according to the real-time data warehouse architecture diagram of Hitchhiker, the construction of each layer is specifically expanded:
1.
According to the specific scenarios of hitchhiking, the current data sources of hitchhiking mainly include order-related binlog logs, bubbling and safety-related public logs, and traffic-related tracking logs. Some of these data have been collected and written into data channels such as KAFKA or DDMQ, and some of the data need to be collected with the help of internal self-developed synchronization tools, and finally written to the KAFKA storage medium based on the construction specifications of the ODS layer of the hitchhiker data warehouse.
Naming convention: There are two main types of ODS layer real-time data sources.
-
a DDMQ or Kafka topic that has been automatically produced during offline collection, and this type of data naming method is automatically generated by the collection system as: cn-binlog-databasename-database name eg: cn-binlog-ihap_fangyuan-ihap_ Fangyuan
-
collect and synchronize to the kafka topic by yourself, and the topic naming convention produced is similar to offline: the ODS layer adopts: realtime_ods_binlog_{ source system database/table name}/ods_log_{log name} eg: realtime_ods_binlog_ ihap_fangyuan
one is to
2. DWD detail layer construction
is driven by modeling according to the business process of
hitchhiking, and the most fine-grained detail layer fact table is constructed based on the characteristics of each specific business process. Combined with the data usage characteristics of hitchhiking analysts on the offline side, some important dimension attribute fields of the detailed fact table are appropriately redundant, and the wide table processing is completed, and then based on the current demand for real-time data by the hitchhiking business side, focus on the construction of several major modules such as transaction, finance, experience, security, and traffic. The data of this layer comes from the ODS layer, and the ETL work is completed through the Stream SQL provided by the big data architecture, and the processing of binlog logs mainly performs simple data cleaning, data drift and data out-of-order, and may perform Stream Join on multiple ODS tables, and mainly does general ETL processing and data filtering for hitchhiking scenarios for traffic logs to complete the structured processing of unstructured data and data distribution; In addition to storing the data in Message Queuing Kafka, the data in this layer is usually written to the Druid database in real time for querying detail data and processing data sources as simple summary data.
Naming convention: Table naming at the DWD layer uses lowercase letters, separated by underscores, cannot exceed 40 characters in total length, and should follow the following rules: realtime_dwd_{business/pub}_{data field abbreviation}_[{business process abbreviation}]_[{custom table naming tag abbreviation}]
-
{Business/pub} -
Reference data field division part {Custom table naming label abbreviation}: -
can be based on the name of a certain business abstraction after data warehouse transformation and integration, and the name should accurately express the business meaning represented by the entity -
Example: realtime_dwd_trip_trd_order_base
: Reference business naming {data domain abbreviation}:
The entity name
3. The DIM layer
public dimension layer
,
Naming convention: Table naming at the DIM layer uses lowercase letters, separated by underscores, cannot exceed 30 characters in total length, and should follow the following rules: dim_{business/pub}_{dimension definition}[_{custom naming label}]
:
- >
-
{Dimension definition}: Reference dimension -
{Custom table naming tag abbreviation}: The entity name can be based on the name of a certain business abstraction after data warehouse transformation and integration, and the name should accurately express the business meaning represented by the entity -
Example: dim_trip_dri_base
naming
4. DWM summary layer construction
When building the summary layer of hitchhiking’s real-time data warehouse, there are many similarities with hitchhiking’s offline data warehouse, but its specific technical implementation will be very different.
First: For the processing of some common indicators, such as PV, UV, order business process indicators, etc., we will carry out unified calculations at the summary layer to ensure that the caliber of the indicators is unified in a fixed model. For some personality indicators, from the perspective of indicator reusability, determine a unique time field, and at the same time the field is aligned with other indicators as much as possible in the time dimension, for example, the number of abnormal orders in the line needs to be aligned with the transaction field indicator in the event time.
Second: In the construction of the hitchhiking summary layer, multi-dimensional theme aggregation is required, because the real-time data warehouse itself is theme-oriented, and the dimensions that each theme will care about may be different, so it is necessary to summarize the data according to the dimensions that this theme cares about under different themes, and finally calculate the summary indicators required by the business side. In specific operations, Stream SQL is used to implement a 1-minute summary metric as the minimum summary unit indicator for pv-type indicators, and the indicators in the time dimension are accumulated on this basis. For UV-class indicators, the DRUID database is directly used as the indicator summary container, and the corresponding accurate deduplication and non-accurate deduplication are realized according to the requirements of the business side for the timeliness and accuracy of the summary indicators.
Third: In the process of summarizing layer construction, the processing of derived dimensions will also be involved. In the processing of summary indicators related to hitchhikers, we use the HBase version mechanism to build a zipper table with derived dimensions, and obtain the accurate dimension naming convention of the real-time data at that time by associating the event stream with the HBase dimension
table
: the table naming at the DWM layer uses lowercase letters, the words are separated by underscores, the total length cannot exceed 40 characters, and the following rules should be followed: realtime_dwm_{Business/pub}_{Data Domain Abbreviation}_{Data Master Granularity Abbreviation}_[{Custom Table Naming Tag Abbreviation}]_{Statistical Time Period Range Abbreviation
}:
-
{Business/pub}: Reference business naming -
{Data Domain -
Data Master Granularity Abbreviation}: refers to the main granularity of the data or the abbreviation of the data field, and is also the main dimension in the federated primary key {Custom Table Naming Label Abbreviation} -
: The entity name can do a certain business abstraction name according to the data warehouse transformation and integration, The name should accurately represent the business meaning represented by the entity -
{statistical time period range abbreviation}: 1d: day increment; TD: day accumulation (full); 1h: hourly increments; th: hourly accumulation (full); 1min: minute increments; tmin: minute cumulative (full) -
sample: realtime_dwm_trip_trd_pas_bus_accum_1min
Abbreviation}: Refer to the data domain division part {
The main work of this layer is to write real-time summary data to the database of the application system, including the Druid database for large-screen display and real-time OLAP (the database can also write detailed data to complete the calculation of summary indicators in addition to writing application data), the Hbase database for real-time data interface services, and the mysql or redis database for real-time data products.
Naming convention: Based on the particularity of real-time data warehouses, there are no mandatory requirements.
2. Kuaishou real-time data warehouse scenario-based case
1) Goals and difficulties
First of all, because it is a digital warehouse, it is hoped that all real-time indicators have offline indicators to correspond, and the overall data difference between real-time indicators and offline indicators is required to be within 1%, which is the minimum standard.
The second is data latency, where the SLA
standard is that the data delay for all core reporting scenarios during the activity cannot exceed 5 minutes, including after the job is hooked up and the recovery time, if exceeded, it means that the SLA is not met.
Finally, stability, for some scenarios, such as after the job restart, our curve is normal, and the indicator will not produce some obvious abnormalities due to the job restart.
The first difficulty is the large amount of data. The overall ingress traffic data volume is about trillions per day. In the scene of events such as the Spring Festival Gala, the peak QPS can reach 100 million per second.
The second difficulty is that component dependencies are complex. Maybe some of this link depends on Kafka, some rely on Flink, and some rely on KV storage, RPC interfaces, OLAP engines, etc., we need to think about how to distribute in this link to make these components work normally.
The third difficulty is the complexity of the link. Currently we have 200+ core business jobs, 50+ core data sources, and more than 1000 overall jobs.
2) Real-time data warehouse – The hierarchical model
is based on the above three difficulties, let’s take a look at the data warehouse architecture:
as shown above:
There are three different data sources at the bottom, namely client logs, server-side logs, and binlog logs. In the common base layer is divided into two different levels, one is the DWD layer, doing detailed data, the other is the DWS layer, doing public aggregate data, DIM is what we often call the dimension. We have a theme pre-layering based on offline data warehouse, which may include traffic, users, devices, video production and consumption, risk control, social networking, etc. The core work of the DWD layer is standardized cleaning; The DWS layer is to associate the dimensional data with the DWD layer, and generate some general granularity aggregation level after association. Further up is the application layer, including some dashboard data, multi-dimensional analysis models and business thematic data; At the top is the scene. The overall process can be divided into three steps:
the
first step is to do business data, which is equivalent to connecting business data; The second step is data assetization, which means to do a lot of cleaning of the data, and then form some regular and ordered data; The third step is data commercialization, which can understand that data can feed back the business at the real-time data level and provide some empowerment for the construction of business data value.
3) Real-time data warehouse – Safeguards
based on the hierarchical model above, let’s take a look at the overall safeguards:
The guarantee level is divided into three different parts, namely quality assurance, timeliness guarantee and stability guarantee.
Let’s look at the quality assurance in the blue part first. For quality assurance, you can see that in the data source stage, we have done out-of-order monitoring such as data sources, which we do based on our own SDK collection, as well as the consistency calibration of data sources and offline. The calculation process in the R&D phase has three stages, namely the R&D phase, the go-live phase and the service phase. The research and development stage may provide a standardized model, based on this model there will be some benchmarks, and do offline comparison and verification to ensure that the quality is consistent; The go-live stage is more about service monitoring and indicator monitoring; In the service phase, if there are some abnormal situations, first pull up the Flink status, and if there are some scenarios that do not meet expectations, we will do the overall data repair offline.
The second is timeliness guarantee. For data sources, we also include the latency of data sources in monitoring. In the research and development stage, there are actually two things: the first is stress testing, the regular task will take the peak traffic in the last 7 days or the last 14 days to see if it has task delays; After passing the stress test, there will be some tasks online and restart performance evaluation, which is equivalent to what the restart performance looks like after restoring according to the CP.
The last one is stability assurance, which will be done more in large-scale activities, such as switching drills and graded guarantees. We will limit the current based on the previous stress test results to ensure that the operation is still stable when the limit is exceeded, and there will not be many instability or CP failures. After that, we will have two different standards, one is the cold standby dual computer room, and the other is the hot standby dual computer room. The cold standby dual computer room is: when one single machine room is hung up, we will pull it up from another machine room; Hot standby dual data center : This is equivalent to deploying the same logic once in each of the two data center rooms. That’s our overall safeguards.
3) Kuaishou scenario problems and solutions
1. PV/UV Standardization
1.1 The
first problem in the PV/UV Standardization scenario is PV/UV Normalization, here are three screenshots:
The first picture is the warm-up scene of the Spring Festival Gala, which is equivalent to a way of playing, and the second and third pictures are screenshots of the red envelope sending event and the live broadcast room on the day of the Spring Festival Gala.
In the process of the activity, we found that 60~70% of the demand is to calculate the information in the page, such as:
- how many people
-
came to this page, or how many people clicked into this page; -
how many people came to the event; -
A certain widget in the page, how many clicks and how much exposure it generates.
1.2 The scheme
abstracts this scenario is the following SQL:
Simply put, it is to filter from a table, and then aggregate according to the dimension level, and then generate some Count or Sum operations.
Based on this scenario, our initial solution is shown on the right side of the figure above.
We used Flink SQL’s Early Fire mechanism to fetch data from the Source data source and then do DID bucketing. For example, at the beginning, the purple part is divided into buckets according to this, and the reason for doing the bucket first is to prevent a hot spot in a certain DID. After dividing the bucket, there will be something called Local Window Agg, which is equivalent to adding the same type of data after the data is divided into buckets. Local Window Agg is then combined according to the dimension, and the concept of the barrel is equivalent to calculating the final result according to the dimension. The Early Fire mechanism is equivalent to opening a sky-level window in Local Window Agg and then exporting it every minute.
We encountered some problems in this process, as shown in the lower left corner of the figure above.
There is no problem in the case of normal operation of the code, but if the overall data is delayed or the historical data is traced, such as a minute Early Fire once, because the amount of data will be relatively large when tracing the history, it may lead to 14:00 tracing the history, directly reading the data of 14:02, and the point of 14:01 is lost, what happens after it is lost?
In this scenario, the upper curve in the graph is the result of Early Fire going back to historical data. The abscissa is the minute, the ordinate is the UV of the page as of the current moment, and we find that some points are horizontal, meaning that there is no data result, and then a steep increase, then a horizontal increase, and then another steep increase, and the expected result of this curve is actually the smooth curve at the bottom of the graph.
To solve this problem, we used the Cumulate Window solution, which was also covered in Flink 1.13, and the principle is the same.
The data opens a large sky-level window, and a small minute-level window
opens under the large window, and the data falls to the minute-level window according to the Row Time of the data itself.
Watermark
advances through the event_time of the window, it will perform a release trigger, in this way can solve the problem of backtracking, the data itself falls on the real window, Watermark advances, triggers after the window ends. In addition, this approach can solve the problem of disorder to a certain extent. For example, its out-of-order data itself is a state that is not discarded and will record the latest accumulated data. Finally, semantic consistency, which will be based on the event time, and the consistency of the results calculated offline is quite high in the case of not serious out-of-order. The above is a standardized solution for PV/UV.
2. DAU Computing
2.1 Background Introduction
The following introduces DAU computing:
We have more monitoring of active equipment, new equipment and reflow equipment of the entire market.
An active device is a device that has been in the day; New equipment refers to equipment that has been here on the same day and has not been in history; A backflow device is a device that has been in the same day and has not been in N days. But we may need 5~8 such different topics to calculate these indicators in the calculation process.
Let’s take a look at how the logic should be calculated in the offline process.
First of all, we first count the active devices, merge these together, and then do a day level deduplication under a dimension, and then go to the associated dimension table, which includes the first and last time of the device, which is the time of the first and last access of the device until yesterday.
After getting this information, we can perform logical calculations, and then we will find that the newly added and reflowed devices are actually a sub-label in the active device. The new device is to do a logic processing, the reflow device is to do 30 days of logic processing, based on such a solution, can we simply write a SQL to solve this problem?
In fact, we did this at the beginning, but encountered some problems
: the
first problem is: the data source is 6~8, and the caliber of our market will often be fine-tuned, if it is a single operation, it must be changed in the process of each fine-tuning, and the stability of the single operation will be very poor; The second problem is: the amount of data is trillions, which will lead to two situations, the first is that the stability of a single operation of this magnitude is very poor, and the second is the KV storage used when associating dimension tables in real time, any such RPC service interface cannot ensure service stability in the scenario of trillions of data volumes; The third problem is that we have high requirements for latency, requiring a delay of less than one minute. The entire link should avoid batching, and if there is some single point of task performance, we also need to ensure high performance and scalability.
2.2 Technical solutions
In response to the above problems, introduce how we do it:
As shown in the example in the figure above, the first step is to deduplicate the three data sources of AB C at the minute level according to the dimension and DID, and then get the data source deduplicated at the minute level after deduplication respectively, and then unite them together, and then perform the same logical operation.
This is equivalent to the entry of our data source from trillion to tens of billions, minute level deduplication
and then a day-level deduplication, the generated data source can change from tens of billions to billions of levels.
In the case of billions of levels of data, we then de-associate the data as a service, which is a more feasible state, which is equivalent to the RPC interface of the associative user portrait, and after obtaining the RPC interface, it is finally written to the target topic. This target topic will be imported into the OLAP engine to provide a number of different services, including mobile services, large-screen services, indicator Kanban services, etc.
This solution has three advantages, namely stability, timeliness and accuracy.
The first is stability. Loose coupling can be simply understood as when the logic of data source A and the logic of data source B need to be modified, they can be modified separately. The second is task scalability, because we split all the logic into very fine-grained ones, when there are problems such as traffic in some places, it will not affect the later parts, so it is relatively simple to scale, in addition to the service-oriented post-placement and state controllability. The second is timeliness, we do millisecond latency, and the dimensions are rich, and there are 20+ dimensions to do multi-dimensional aggregation overall. Finally, accuracy, we support data verification, real-time monitoring, model export unification, etc. At this point we have another problem – out of order. For the above three different jobs, each job restart will have a delay of at least two minutes or so, which will cause the downstream data sources to be out of order.
2.3
How do we deal with the above out-of-order situation when the delay calculation scheme encounters the above situation?
We have a total of three processing options:
the first solution is to deduplicate with “did + dimension + minute”, and Value is set to “whether it has been before”. For example, if the same did, 04:01 comes, it will output the result. Similarly, 04:02 and 04:04 will also output results. But if it comes back at 04:01, it will be discarded, but if it comes at 04:00, it will still output the result.
There are some problems with this solution, because we save by the minute, and the size of the state of saving for
20 minutes is twice that of saving for 10 minutes, and the size of this state is a little uncontrollable until the latter, so we changed to solution 2.
The second solution, our approach, involves a hypothetical premise that there is no out-of-order data source. In this case, the key is stored as “did + dimension”, Value is “timestamp”, and it is updated as shown in the figure above. 04:01 A piece of data came and the result was output. 04:02 A piece of data comes, if it’s the same did, then it updates the timestamp and still does the result output. 04:04 is the same logic, then update the timestamp to 04:04, if a piece of 04:01 data comes after it, it finds that the timestamp has been updated to 04:04, it discards this data. This approach greatly reduces some of the states required for itself, but there is zero tolerance for out-of-order, and no out-of-order situation is allowed, and since we can’t solve this problem, we came up with Solution 3.
Scenario 3 adds a ring-like buffer to the timestamp of scenario 2, allowing out-of-order within the buffer.
For example, at 04:01, a piece of data came and the result was output; A piece of data comes at 04:02, it updates the timestamp to 04:02, and it records that the same device also came at 04:01. If 04:04 comes another piece of data, it will make a displacement according to the corresponding time difference, and finally ensure that it can tolerate a certain disorder through this logic.
Comprehensively look at these three scenarios:
Scenario 1 tolerates 16 minutes of out-of-order, and the state size of a single job is about 480G. Although this situation ensures accuracy, the recovery and stability of the job is completely uncontrollable, so we still abandon this solution;
Scheme 2 is a state size of about 30G, which is tolerated for out-of-order 0, but the data is inaccurate, and this scheme is also abandoned because we have very high requirements for accuracy;
Compared with scenario 1, the state of scenario
3 has changed but not increased much, and the overall effect can be achieved as in scenario 1. Scenario 3 tolerates out-of-order for 16 minutes, and if we update a job normally, 10 minutes is completely enough to restart, so we finally chose option 3.
3. Operation scenario
3.1 Background introduction
operation scenarios can be divided into four parts:
The first is data large-screen support, including analysis data between single live broadcast and analysis data of large markets, which need to be delayed in minutes and have high update requirements;
The second is live Kanban support, the
data of live Kanban will have a specific dimension of analysis, specific group of people support, the dimensional richness requirements are relatively high;
The third is the data strategy list, which mainly predicts popular works and explosive models, requires hour-level data, and the update requirements are relatively low;
The fourth is the C-side real-time indicator display, the query volume is relatively large, but the query mode is relatively fixed.
Let’s analyze some of the different scenarios generated by these 4 different states.
There is
basically no difference between the first three types, except that in terms of query mode, some are specific business scenarios and some are general business scenarios.
For types 3 and 4, it has lower requirements for updates, higher requirements for throughput, and no consistency in curves in the process. The fourth query mode is more of a single-entity query, such as querying content, what indicators will be, and the QPS requirements are relatively high.
3.2 Technical solutions
for the above 4 different scenarios, how do we do it?
First of all, look at the basic detail layer (left in the figure), the data source has two links, one of which is the consumption stream, such as the consumption information of the live broadcast, and the viewing/like/comment. After a round of basic cleaning, then do dimension management. These upstream dimension information comes from Kafka, and Kafka writes some content dimensions and puts them in KV storage, including some user dimensions.
After these dimensions are associated, they are finally written to Kafka’s DWD fact layer, and here in order to improve performance, we do the operation of L2 caching.
As shown in the figure above, we read the data of the DWD layer and then do the basic summary, the core is the window dimension aggregation to generate 4 different granularity of data, namely the multi-dimensional summary topic of the large market, the multi-dimensional summary topic of the
live broadcast room, the multi-dimensional summary topic of the author, and the multi-dimensional summary topic of the user, which are the data of the general dimensions.
As shown in the lower part of the figure, based on these general dimension data, we then process the data of the personalized dimension, that is, the ADS layer. After getting these data, there will be dimensional expansion, including content expansion and operation dimension expansion, and then do aggregation, such as e-commerce real-time topic, institutional service real-time topic and big V live real-time topic.
Separating into two links has the advantage of dealing with a generic dimension and the other dealing with a personalized dimension. The requirements for general dimension guarantee will be higher, and the personalized dimension will do a lot of personalized logic. If these two are coupled together, you will find that the task often has problems, and it is not clear which task has what responsibilities, and such a stable layer cannot be built.
As shown on the right, we ended up using three different engines. To put it simply, Redis queries use the C-side scenario, and OLAP queries use the scenarios of large screens and business dashboards.
3. Tencent looks at real-time data warehouse cases
Why should Tencent’s watch business build a real-time data warehouse, because the original amount of reported data is very large, and there are trillions of reports in a day. And the reporting format is confusing. Lack of content dimension information and user portrait information, there is no way to use it directly downstream. The real-time data warehouse we provide is based on the business scenario of Tencent’s information flow, which is related to content dimensions, user portraits, and aggregation of various granularities, and the downstream can be very convenient to use real-time data.
1) Solution selection
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNicYBxniceexgeJvsrcWw7rGRs7FSuK39FXGibsakoibUSbzZtvJVKZ6hXA/640?wx_fmt=png”>
Then take a look at the solution selection of our multi-dimensional real-time data analysis system, we compared the leading solutions in the industry and selected the solution that best suits our business scenarios.
-
The first block is the selection of real-time data warehouses, we choose the industry’s more mature Lambda architecture, its advantages are high flexibility, high fault tolerance, high maturity and low migration cost; The disadvantage is that real-time, offline data with two sets of codes, there may be one caliber modified, the other has not changed, we have to do data reconciliation work every day, if there is an abnormality will be alarmed.
-
The second piece is real-time computing engine selection, because Flink was originally designed for stream processing, SparkStreaming is strictly speaking micro-batching, and Strom is not used much. Looking at Flink’s Exactly-once’s accuracy, lightweight Checkpoint fault tolerance, low latency, high throughput, and high ease of use, we chose Flink as the real-time computing engine.
-
The third block is the real-time storage engine, which requires dimensional indexes, supports high concurrency, pre-aggregation, and high-performance real-time multidimensional OLAP queries. As you can see, Hbase, Tdsql and ES can not meet the requirements, Druid has a defect, it is according to the timing of segments, can not store the same content, on the same segment, calculate the global TopN can only be approximate, so we chose the MPP database engine ClickHouse that has been on fire in the last two years.
2) Design goals and design difficulties
our multi-dimensional real-time data analysis system is divided into three modules
The difficulty is mainly in the first two modules: real-time computing engine and real-time storage engine.
-
It is difficult for the real-time storage engine to support high-concurrency writes, high-availability distributed and high-performance index queries.
For the specific implementation of these modules, take a look at the architectural design of our system.
3) Architecture design
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNbXTmJ30yibCfApBHSr86iaYzldueYkBoyCWuLlxpxsLNbQadInueX5icQ/640?wx_fmt=png”>
The front-end uses the open-source component Ant Design, which leverages the Nginx server, deploys static pages, and back-proxies the browser’s requests to the back-end server.
The background service
is written based on Tencent’s self-developed RPC background service framework, and some L2 caching will be performed.
The real-time data warehouse
part is divided into access layer, real-time computing layer and real-time data warehouse storage layer.
-
access layer is mainly from the original message queue of tens of millions of levels/s, splitting micro-queues of different behavioral data, taking the video of the watch, after splitting, the data is only millions of levels/s;
-
computing layer is mainly responsible for converting rows and columns of multi-line row flow data, and real-time correlation of user portrait data and content dimension data;
-
real-time data warehouse storage layer is mainly designed to meet the needs of the point of view business and is easy to use in the downstream. We temporarily provide two message queues as two layers of real-time data warehouses. The DWM layer is aggregated at the content ID-user ID granularity, that is, a piece of data includes the content ID-user ID and the content data on side B, the user data on side C, and the user portrait data on side C; The other layer is the DWS layer, which is aggregated at the content ID granularity, and one piece of data contains the content ID, B-side data, and C-side data. You can see that the message queue traffic with the content ID-user ID granularity is further reduced to 100,000/s, and the content ID granularity is even more 10,000/s, and the format is clearer and the dimension information is richer.
The real-time
The
The real-time storage
part is divided into a real-time write layer, an OLAP storage layer, and a background interface layer.
-
real-time writing layer is mainly responsible for hash routing to write data; -
The OLAP storage layer uses the MPP storage engine to design indexes and materialized views that meet the business and efficiently store massive data. -
The background interface layer provides an efficient multi-dimensional real-time query interface. 4)
Real-time computing of the two most complex blocks of this system, real-time
computing and real-time storage.
Let’s first introduce the real-time calculation part: divided into real-time correlation and real-time data warehouse.
1. Real-time high-performance dimension table association
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNDSuSFpeqJRchBRCnboicQGNTbtQs2NrnKE5k3H85fsroa72QBKw8WrA/640?wx_fmt=png”>
real-time dimension table association is difficult If you directly associate HBase with million-level/s real-time data streams, it takes hours to associate HBase with 1 minute of data, resulting in serious data delays.
We propose several solutions:
- the first is that in the
-
Flink real-time calculation link, the window aggregation is carried out according to 1 minute, and the multi-row behavior data in the window is converted into a row and multiple columns data format, after this step, The original hour-level correlation time has been reduced to ten minutes, but it is still not enough.
-
is milliseconds, and the speed of accessing Redis is basically 1000 times faster than accessing HBase. To prevent expired data from wasting the cache, the cache expiration time is set to 24 hours, and the cache consistency is guaranteed by listening to write HBase Proxy. This changes the access time from ten minutes to seconds.
-
The third is that many unconventional content IDs will be reported during the reporting process, and these content IDs are not stored in the content HBase, which will cause the problem of cache penetration. Therefore, when calculating in real time, we directly filter out these content IDs to prevent cache penetration and reduce some time.
-
The fourth is that because of the timed cache, it will introduce a cache avalanche problem. In order to prevent avalanches, we performed peak shaving and valley filling operations in real-time calculations, and staggered the time of setting the cache.
The second is to set up a layer of Redis cache before accessing HBase content, because 1000 data access HBase is seconds, and accessing Redis
It can be seen that before and after optimization, the amount of data has been reduced from tens of billions to billions, and the time has been reduced from hours to tens of seconds, a reduction of 99%.
2. Downstream service <
img
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHN14wqwiasMicQoVwOjt0Qb2n14u6hfpg5XjueXxnXl6BsxVFiar79neXow/640?wx_fmt=png”>
The difficulty of real-time data warehouse is: it is in a relatively new field, and the business gap of each company is relatively large, how to design a convenient, easy to use, in line with the interesting business scenario real-time data warehouse is difficult.
Let’s take a look at what the real-time data warehouse does, the real-time data warehouse is externally several message queues, and different message queues store real-time data of different aggregation granularities, including content ID, user ID, C-side behavior data, B-side content dimension data and user portrait data.
How we build a real-time
data warehouse is the output of the real-time computing engine introduced above, which is stored in a message queue and can be provided to downstream multi-user reuse.
We can look at the difference between developing a real-time application before and after we build a real-time data warehouse. When there is no data warehouse, we need to consume tens of millions of original queues / s, carry out complex data cleaning, and then carry out user portrait association, content dimension association, in order to get real-time data in the required format, the cost of development and expansion will be relatively high, if you want to develop a new application, you have to go through this process again. After you have a data warehouse, if you want to develop a real-time application with content ID granularity, you can directly apply for a message queue at the DWS layer of TPS 10,000/s. The development cost is much lower, the resource consumption is much smaller, and the scalability is much stronger.
Looking at a practical example, to develop a real-time data screen of our system, all the above operations were originally required to get the data. Now you only need to consume DWS layer message queues and write a Flink SQL, which only consumes 2 CPU cores and 1G of memory.
It can be seen that taking 50 consumers as an example, before and after establishing a real-time data warehouse, the downstream development of a real-time application can reduce resource consumption by 98%. Including computing resources, storage resources, labor costs and developer learning access costs, etc. And the more consumers, the more savings. Take the Redis storage part, which can save millions of yuan a month.
5) After introducing real-time computing
, let’s introduce real-time storage.
This block is divided into three parts to introduce
< ul class="list-paddingleft-2">
– high availability The second is massive
1. Distributed-High Availability
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNVtXs59ypZCnU27Z4eBA15oPnOIeSmYJoLnS4kADNibMMMhuQt2UBc5A/640?wx_fmt=png”>
We are listening to the official advice of Clickhouse to achieve a high-availability solution with the help of ZK. Data is written to a shard, only one replica is written, and then ZK is written, and ZK tells other replicas of the same shard, and other replicas come over to pull data to ensure data consistency.
Message Queuing is not used here for data synchronization because ZK is more lightweight. And when writing, write any copy, and other copies can get consistent data through ZK. And even if other nodes fail to get data for the first time, as long as it is found to be inconsistent with the data recorded on ZK, they will try to get the data again to ensure consistency.
2. Massive data – write
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNLuUW3Vmgsic6XfsDWEg0Gm03pR96wyZfHZ7rkYia7E3iavMgibiaTlYA87g/640?wx_fmt=png”>
The first problem encountered in data writing is that if massive data is directly written to Clickhouse, the QPS of ZK will be too high, and the solution is to use Batch mode to write instead. How big is the Batch setting, if the Batch is too small, it can not relieve the pressure of ZK, and the Batch cannot be too large, otherwise the upstream memory pressure is too large, through experiments, we finally chose a batch with a size of hundreds of thousands.
The second problem is that with the growth of data volume, the video content of a single QQ watch may be written to tens of billions of data every day, and the default scheme is to write a distributed table, which will cause a disk bottleneck on a single machine, especially the bottom layer of Clickhouse uses Mergetree, which is similar to HBase, RocketsDB’s underlying LSM-Tree. During the merge process, there will be a problem of write amplification, which will increase the disk pressure. The peak is tens of millions of pieces of data per minute, it takes tens of seconds to write, and if you are doing Merge, it will block write requests, and the query will be very slow. We do two optimization solutions: one is to do RAID on the disk to improve the IO of the disk; The second is to divide the table before writing, and directly write to different shards separately, and the disk pressure directly becomes 1/N.
The third problem is that
although our writes are divided by shards, a common problem of distributed systems is introduced here, that is, the problem that local top is not global top. For example, if the data of the same content ID falls on different shards, calculate the content ID read by the global Top100, and there is a content ID that is Top100 on shard 1, but not Top100 on other shards, resulting in a part of the data being lost when summarizing, affecting the final result. The optimization we made was to add a layer of routing before writing, and all records with the same content ID were routed to the same shard, which solved this problem.
After covering writes, the next step is to introduce Clickhouse’s high-performance storage and query.
3. High Performance – Storage – Query
Clickhouse A key point of high performance queries is sparse indexes. The design of sparse index is very particular, good design can speed up the query, poor design will affect the query efficiency. I’m based on our business scenario, because most of our queries are related to time and content ID, say, a certain content, how did it perform in the last N minutes in various groups of people? I sparsely indexed by date, minute granularity time, and content ID. When a query is made against a content, sparse indexing can reduce file scanning by 99%.
Another problem is that we now have too much data and too many dimensions. Take the video content of QQ Watchpoint, there are tens of billions of pieces of turnover a day, and there are hundreds of categories in some dimensions. If you pre-aggregate all dimensions at once, the amount of data will expand exponentially, queries will become slower, and it will take up a lot of memory space. Our optimization establishes corresponding pre-polymerized views for different dimensions, and exchanges space for time, which can shorten the query time.
Another problem with distributed table queries is that when they query information about a single content ID, the distributed table will deliver the query to all shards, and then return the query results for summarization. In fact, because of routing, a content ID only exists on one shard, and the rest of the shards are running empty. For this kind of query, our optimization is that the background is routed first according to the same rules, and the target shard is directly queried, which reduces the load of N-1/N and can greatly shorten the query time. And because we provide OLAP queries, the data can meet the eventual consistency, and the performance can be further improved through the master-slave copy read/write splitting.
We also made a 1-minute data cache in the background, and for the same query query, the background returned directly.
4. Expansion
Here we introduce our expansion solutions and investigate some common solutions in the industry.
For example, HBase, the original data is
stored on HDFS, and the expansion is only the expansion of the Region Server, which does not involve the migration of the original data. However, each shard data of Clickhouse is local, which is a relatively low-level storage engine, and cannot be easily expanded like HBase.
Redis is a hash slot, which is
similar to a consistent hash, which is a more classic distributed cache scheme. Although the Redis slot has a brief ask read unavailable during the rehash process, it is generally more convenient to migrate, from the original h[0] to h[1], and finally delete h[0]. However, most of Clickhouse is OLAP batch query, not point check, and because of the columnar storage, it does not support the feature of deletion, and the consistent hashing scheme is not very suitable.
The current expansion scheme is to consume another piece of data, write it to the new Clickhouse cluster, and run the two clusters together for a period of time, because the real-time data is stored for 3 days, and after 3 days, the background service directly accesses the new cluster.
4. Youzan real-time data warehouse case
1) Hierarchical design
In order to standardize the organization and management of data, there will be more hierarchical division, and in some complex logic processing scenarios, temporary layer landing intermediate results will be introduced to facilitate downstream processing. Real-time data warehouse considering the timeliness problem, the hierarchical design needs to be as streamlined as possible to reduce the possibility of errors in the intermediate process, but in general, the real-time data warehouse will still refer to the layered idea of offline data warehouse to design.
The real-time data warehouse hierarchical architecture is shown in the following figure
– ODS (Real-time Data Access Layer)
ODS
layer, that is, real-time data
access layer, collects real-time data of various business systems through data collection tools, structures unstructured data, saves original data, and almost does not filter data; The main sources of data in this layer are three parts: the first part is the NSQ message created by the business side, the second part is the binlog log log of the business database, and the third part is the tracking log and application log, and the real-time data of the above three parts is finally written to the Kafka storage medium.
ODS tier table naming convention: department name. App name. Data warehouse level topic domain prefix database name/
message name, for example:
Binlog
real-time data warehouse table name:
deptname.appname.ods_subjectname_tablename
For example, the NSQ message
real-time data warehouse table name of the access service side: deptname.appname.ods_subjectname_msgname
– DWS (Real-Time Detail Middle Layer).
DWS layer, that is, the real-time detail intermediate layer, which is driven by business processes and builds the most fine-grained detail layer fact table based on each specific business process event; For example, the transaction process, there are order events, payment events, delivery events, etc., we will build the detail layer based on these independent events. At this level, the fact detail data is also divided according to the subject field of the offline data warehouse, and the data will also be organized in the way of dimensional modeling, and appropriate redundancy will be made for some important dimensional fields. Based on the real-time demand of Youlike, focus on building data in subject domains such as transactions, marketing, customers, stores, and products. The data in this layer comes from the ODS layer and is ETL processed through FlinkSQL, and the main work includes canonical naming, data cleaning, dimension completion, multi-stream association, and finally unified writing to Kafka storage media.
DWS layer table naming convention: Department name. App name. Data warehouse hierarchy_subject domain prefix_data warehouse table namingFor
example: middle tier real-time data warehouse table naming for real-time event A: deptname.appname.dws_subjectname_tablename_eventnameA
Example: middle tier
real-time
data warehouse table naming
for
real-time event
B
: deptname.appname.dws_subjectname_tablename_eventnameB
– DIM (
real-time dimensional surface layer) DIM layer, that is, the real-time dimensional surface layer, is used
to store dimensional data, mainly used to complete the dimension when the real-time detail intermediate layer is widened, and the data of this layer is mainly stored in HBase. More suitable types of storage media will be provided based on QPS and data volume size.
DIM layer table naming convention: application name_data warehouse hierarchy_subject domain prefix_data
warehouse table naming
, for example: HBase storage,
real-time dimension table
real-time data warehouse table naming: appname_dim_tablename-DWA
(real-time summary layer).
DWA layer, that is, real-time summary layer, the layer through DWS layer data for multi-dimensional summary, for downstream business parties to use, in the actual application process, different business parties use dimension aggregation in different ways, according to different needs to use different technical solutions to achieve. The first method, using FlinkSQL for real-time aggregation, the result indicators into HBase, MySQL and other databases, this method is our early adoption of the solution, the advantage is to achieve more flexible business logic, the disadvantage is that the aggregation granularity is solidified, not easy to expand; The second method, using real-time OLAP tools for aggregation, is our commonly used solution, the advantage is that the aggregation granularity is easy to scale, and the disadvantage is that the business logic needs to be preprocessed in the middle layer.
DWA layer table naming convention: application name
_data warehouse hierarchy_subject domain prefix_aggregation granularity_data range
For example: HBase storage, a real-time summary table of a certain granularity on the day of a domain
Real-time data warehouse table naming:
appname_dwa_subjectname_aggname_daily- APP (
real-time application layer) APP layer, that is, the real-time application layer, the data
of this layer has been written to the storage of the application system, such as the real-time data set written to Druid as a BI dashboard; Write HBase and MySQL to provide a unified data service interface; Write ClickHouse is used to provide real-time OLAP services. Because this layer is very close to the business, there are no uniform requirements for real-time data warehouses in the naming convention.
2) Real-time ETL real-time data warehouse There are many components involved in the
ETL processing process, and then take stock of the components required to build a real-time data warehouse and the application scenarios of each component.
As shown in the following figure:
4. Once the real-time data recovery
task is online, it requires continuous accurate and stable services. Unlike offline tasks, which are scheduled on a daily basis, if there is a bug in the offline task, there will be enough time to fix it. If there is a bug in the real-time task, you must follow the process established in advance and strictly follow the steps, otherwise it is very easy to have problems. There are many situations that cause bugs, such as code bugs, exception data bugs, and real-time cluster bugs, as shown in the figure below to show the process of fixing real-time task bugs and recovering data.
5. Tencent’s full-scenario real-time data warehouse construction case
In the data warehouse system there will be a variety of big data components, such as Hive/HBase/HDFS/S3, computing engines such as MapReduce, Spark, Flink, according to different needs, users will build a big data storage and processing platform, data is processed and analyzed on the platform, the result data will be saved to MySQL, Elasticsearch and other relational and non-relational databases that support fast query. The application layer can then use this data for BI report development, user portraits, or interactive queries based on OLAP tools such as Presto.
1) Pain points
of the Lambda architecture
In the whole process, we often use some offline scheduling system to perform some Spark analysis tasks on a regular basis (T+1 or every few hours) to do some data input, output or ETL work. There must be data delay in the whole process of offline data processing, whether it is data access or intermediate analysis, the delay of data is relatively large, which may be hourly or day-level. In other scenarios, we often build a real-time processing process for some real-time requirements, such as using Flink+Kafka to build a real-time stream processing system.
On the whole, there are many components in the data warehouse architecture, which greatly increases the complexity of the entire architecture and the cost of operation and maintenance.
As shown below, this is the Lambda architecture that many companies have adopted before or are now adopting, the Lambda architecture divides the data warehouse into offline layer and real-time layer, corresponding to batch processing and stream processing two independent data processing processes, the same data will be processed more than twice, the same set of business logic code needs to be adapted to the development twice. Lambda architecture should be very familiar to everyone, so I will focus on some of the pain points we encountered in the process of data warehouse construction using Lambda architecture.
For example, in the real-time scenario of real-time calculation of some user-related indicators, when we want to see the current PV and UV, we will put these data into the real-time layer to do some calculations, and the values of these indicators will be presented in real time, but at the same time, if we want to understand a growth trend of users, we need to calculate the data of the past day. This needs to be achieved through batch scheduling tasks, such as starting a Spark scheduling task on the scheduling system at two or three o’clock in the morning to run all the data of the day again.
Obviously, in this process, because the two processes run at different times and run the same data, it may cause data inconsistencies. Because of the update of one or several pieces of data, it is necessary to run the entire offline analysis link again, the data update cost is very large, and at the same time it is necessary to maintain two sets of computing platforms for offline and real-time analysis, and the development process and operation and maintenance costs of the entire upper and lower layers are actually very high.
In order to solve the various problems caused by the Lambda architecture, the Kappa architecture was born, which should be very familiar to everyone.
2) The pain points of
the Kappa architecture
Let’s talk about the Kappa architecture, as shown in the figure below, its practical middle is the message queue, which connects the entire link in series by using Flink. The Kappa architecture solves the problem of high O&M costs and development costs caused by the different engines between the offline processing layer and the real-time processing layer in the Lambda architecture, but the Kappa architecture also has its pain points.
First of all, when building real-time
business scenarios, Kappa will be used to build a near real-time scenario, but if you want to do some simple OLAP analysis or further data processing in the middle layer of the data warehouse, such as writing data to Kafka at the DWD layer, you need to access Flink separately. At the same time, when it is necessary to import data from Kafka in the DWD layer into Clickhouse, Elasticsearch, MySQL or Hive for further analysis, it obviously increases the complexity of the entire architecture.
Secondly, the
Kappa architecture is strongly dependent on message queues, we know that the accuracy of the data calculation of the message queue itself on the entire link is strictly dependent on the order of its upstream data, the more message queues are connected, the greater the possibility of out-of-order. ODS layer data is generally absolutely accurate, and it may be out of order when the ODS layer data is sent to the next kafka, and the DWD layer may be out of order when it is sent to DWS, so the data inconsistency will become very serious.
Third, because Kafka is a sequential storage system, there is
no way to directly use some optimization strategies of OLAP analysis on it, such as predicate pushdown, which is difficult to implement on sequential storage Kafka.
So is there such an architecture that can meet the requirements of real-time and offline computing, but also reduce the cost of operation and maintenance development, and solve some pain points encountered in the process of building Kappa architecture through message queues? The answer is yes, which will be discussed in more detail in a later article.
3) Summary of
pain points
< img
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/2UHIhrbfNBd8F47zvRC39bsBk2kBRicHNEF8PFj6EKzicN8Ea750IkbLyxwUP7GicjWBObOibxezP4K6KunMtdLLQA/640?wx_fmt=png”>
4 Flink+Iceberg builds real-time data warehouse1
. Near real-time data access
Iceberg not only supports read/write splitting, but also supports concurrent read, incremental read, small file merging, and can also support second-to-minute delays, based on these advantages we try to use Iceberg these functions to build based on Flink Real-time full-link batch flow integrated real-time data warehouse architecture.
As shown in the figure below, each commit operation of Iceberg is a change in the visibility of the data, such as making the data from invisible to visible, in the process, near real-time data recording can be realized.
2. Real-time data warehouse – The data lake analysis system
previously needed to access data first,
such as using Spark’s offline scheduling tasks to run some data, pull, extract, and finally write to the Hive table, which has a relatively large delay. With Iceberg’s table structure, you can use Flink or spark streaming in the middle to complete near real-time data access.
Based on the above features, let’s review the Kappa architecture discussed earlier, the pain points of the Kappa architecture have been described above, since Iceberg can be used as an excellent table format, supporting both Streaming reader and Streaming sink, can we consider replacing Kafka with Iceberg?
The underlying storage that Iceberg relies on is cheap storage like HDFS or S3, and Iceberg supports columnar storage such as parquet, orc, and Avo. With the support of columnar storage, you can perform basic optimization of OLAP analysis and directly calculate in the middle tier. For example, the most basic OLAP optimization strategy of predicate pushdown, based on the Iceberg snapshot’s Streaming reader function, can greatly reduce the latency of offline tasks from the daily level to the hour level, and transform it into a near real-time data lake analysis system.
In the intermediate processing layer,
you can use presto for some simple queries, because Iceberg supports Streaming read, so in the middle layer of the system can also directly access Flink, directly in the middle layer with Flink to do some batch or streaming computing tasks, the intermediate results are further calculated and output to the downstream.
Advantages and disadvantages
of replacing Kafka:
In general, the advantages of replacing Kafka by Iceberg mainly include:
-
to achieve stream batch unification of the storage layer
-
Middle-tier support OLAP analysis
-
perfectly supports efficient back-storage
-
cost reduction
Of course, there are also certain flaws, such as:
- data
-
latency changes from real-time to near real-time
-
interfacing with other data systems requires additional development work
second analysis – Data lake acceleration:
Since Iceberg
itself stores all data files on HDFS, HDFS read and write this scene for second-level analysis still cannot fully meet our needs, so we will support Alluxio such a cache at the bottom of Iceberg, with the help of caching capabilities to achieve data lake acceleration. This architecture is also in our future planning and construction.
end
public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.
like + Looking, less bugs 👇