Data services are a key component of the data middle office system. As a unified entrance and exit for data warehouse docking upper-layer applications, Data Service accesses data warehouse as a unified DB and provides a unified API interface to control data inflow and outflow, which can meet users’ access to different types of data.

The

data service of the e-commerce platform Vipshop has been under construction since 2019, and has experienced the process of landing from scratch in the company, and then providing data services to To B and To C for more than 30+ business parties. This article mainly introduces the background, architecture design and core functions of VIP.com’s self-developed data service Hera.

Background introduction

Before the unified data warehouse data

service, the access mode provided by the data warehouse often has problems such as low efficiency and difficulty in unifying data indicators, specifically the following prominent situations:

  • Advertising crowd USP, DMP system needs to export data from the data warehouse to the local in the form of streams through HiveServer every day, the data volume of each group ranges from hundreds of thousands to hundreds of millions, the number of people is 2w+, the running time of each group is 30min +, and the operation of some large groups of people directly exceeds 1h, and the crowd delay is serious in the case of resource shortage.

  • When the data

  • of the data warehouse is used by data products, a separate interface needs to be generated for each table, and the application side needs to distinguish between different interfaces for each access method (such as Presto and ClickHouse), resulting in a surge in data product interfaces, which is inconvenient to maintain and affects the efficiency of development and maintenance. When the data is stored in different ways, it needs to contain third-party jar packages such as clickhouse-client, presto-client, etc.

  • Different data products

  • need to use some commonly used data indicators, such as sales, order number, PV, UV, etc., and these data are different in different data products to achieve different calibers and implementation methods, and data sharing cannot be formed, and each data product repeats the same indicator construction. Therefore, it is difficult to determine which data product provides accurate data when looking at the same metric for different data products but finding different values.

Figure 1.Data inflow and outflow mode

To solve the above problems, data services came into being. At present, the main advantages of data services are: shielding the underlying storage engine and computing engine, using the same API (one service), hierarchical storage of data warehouse data, SQL generation capabilities of different engines, adaptive SQL execution and unified cache architecture to ensure business SLAs, supporting data registration and authorization to any caller for use, and improving data delivery efficiency.

Identified by a unique ID, a data product can consult data by ID instead of directly accessing the corresponding data warehouse table. On the one hand, the metrics service unifies the caliber of metrics and supports the rapid construction of new data products.

Architecture design

data

services can bring operational and commercial value to the business, and the core is to provide users with self-service data analysis capabilities. Hera’s overall architecture is based on a typical master/slave model, where data flow and control flow are linked separately to ensure high system availability. The data service system is mainly divided into three layers

:

  1. application access layer: when applying for access, you can select data service API (TCP Client), HTTP and OSP service interface (internal RPC framework) according to business requirements.

  2. Data service layer: It mainly performs the tasks submitted by the business and returns the results. The main function points include: routing strategy, multi-engine support, engine resource configuration, dynamic assembly of engine parameters, SQL Lispengine generation, SQL adaptive execution, unified data query cache, FreeMaker SQL dynamic generation and other functions.

  3. Data layer: The data of business queries can be well supported in Datawarehouse, Clickhouse, MySQL or Redis, and users all use the same set of APIs.

Figure 2. The overall flow of the data service

architecture diagram of the scheduling system roughly consists of the following modules

:

  • Master: responsible for managing all workers, TransferServer, and Adhoc workers Node, which is also responsible for scheduling distribution jobs;

  • Worker: responsible for executing ETL and data file export type jobs, pulling up the AdhocWorker process (Adhoc tasks are executed in the thread pool in the AdhocWorker process), ETL type jobs are completed through child processes;

  • Client: A client that submits SQL jobs programmatically;

  • ConfigCenter: responsible for pushing unified configuration information and other runtime-related configurations and SQLParser to the cluster (parsing, replacing, and generating rewritten SQL statements according to given rules to support the execution of different compute engines);

  • TransferServer: File Transfer Service.

Figure 3. Data Service Scheduling Flowchart

Main Functions

The main functions of Hera Data Service are: multi-queue scheduling strategy, multi-engine query, multi-task type, file export, resource isolation, dynamic assembly of engine parameters, adaptive engine execution, and SQL construction.

The multi-queue scheduling policy

data service supports different scheduling queues according to different users, different task types, and weights to meet the SLAs of different task types.

The multi-engine query

data service supports all OLAP and database types currently within the company, including Spark, Presto, Clickhouse, Hive, MySQL, and Redis. The best query engine is selected based on the specific scenarios and requirements of the business.

The task types supported by

the multi-task data

service are: ETL, Adhoc, file export, and data import. Together with multi-engine capabilities, multiple feature combinations such as Spark adhoc and Presto adhoc are implemented.

File

export

mainly supports the export of a large amount of data from the data warehouse, which is convenient for business analysis and processing, such as supplier issuance and information push.

The

specific execution process is as follows: the user submits the SQL that needs to be exported, and after the execution is completed through the distributed engine, the landing file is uploaded to hdfs/alluxio. The client pulls the file to the local computer via TCP. Petascale data export takes up to 10 minutes. The performance of data export has been improved from the original 30min+ to a maximum of 3min, and the performance has been improved by 10~30 times. The specific process is as follows:

Figure 4. Data service file download flowchart

Resource isolation (worker resources and computing resources)

services are generally divided into core and non-core, and are different in resource allocation and scheduling. Mainly from the execution of tasks Worker and engine resources, can achieve physical level isolation, to minimize the interaction between different services.

Dynamic assembly of engine parameters

Online business execution needs to be optimized according to business conditions, dynamically restricting user resource usage, overall cluster switching and other operations, at this time it is necessary to dynamically modify user job parameters, such as OLAP engine when executing tasks, often according to task tuning, set different parameters. To solve such problems, the data service provides automatic assembly of engine parameters according to the engine type, and the engine parameters support dynamic adjustment, and OLAP engine execution parameters can also be set for specific tasks, execution accounts, and business types.

When the

adaptive engine executes

a query, the execution may fail due to insufficient engine resources or mismatched data types of query conditions. In order to improve the query success rate and service SLA guarantee, the Ad Hoc adaptive engine execution is designed, and when one engine reports an error, it will switch to another engine to continue execution. The specific adaptive execution logic is shown in the following figure:

Figure 5. Adaptive Engine Executes

SQL

to Build

Data Services SQL builds based on dimensional fact modeling and supports single-table, star, and snowflake models.

  • single-table model: a fact table, usually a summary fact table of DWS or ADS.

  • Star model: 1 fact table (such as DWD detail fact table) +

  • N dimension tables, such as order detail table (fact table FK=item ID) + product dimension table (dimension table PK=item ID).

  • Snowflake model: 1 fact table (such as DWD detail fact table) + N dimension tables + M dimension tables that are not directly connected to the fact table, such as order detail table (fact table FK=item ID) + product dimension table (dimension table PK=

  • product ID, FK=category ID) + category dimension table (dimension table PK=category ID).

Figure 6.SQL Dimensional model

task scheduling

is based on the Netty library to send and receive cluster messages, the system only uses the same thread pool object EventLoopGroup to send and receive messages, and the user’s business logic, is left to a separate thread pool.

Another reason for choosing Netty is the ability to “zero copy”, which sends the result directly to the caller in the form of a file when a large amount of data is returned.

Multi-queue + multi-user scheduling

business requirements usually include time-sensitive and insensitive jobs, in order to improve the stability of jobs and the configurability of the system, Hera provides the function of multi-queue job scheduling.

When submitting a job, the user can explicitly specify a job queue name, when the job is submitted to the cluster, if the corresponding queue is

free, it will be added to the corresponding queue, otherwise return a specific error to the client, such as the task queue is full, the queue name does not exist, the queue has been closed, etc., the client can choose “whether to retry the submission”.

When a job is added to the queue, the

master will immediately try to schedule the jobs in the queue, select the appropriate job to run based on the following conditions:

  1. each queue has its own weight, and will set the total amount of resources occupied by the entire cluster, such as the maximum amount of memory used , the maximum number of tasks to run, and so on.

  2. The tasks in the queue

  3. also have their own weights, and the time of the job joining the queue is recorded, and when sorting the jobs in the current queue, the time offset of the queue and the total timeout period are used to calculate a final score.

  4. In addition to the scheduling

  5. strategy of the scheduling system itself, it is also necessary to consider the load of the external computing cluster, and after taking out a job from a queue, it is filtered again, or it is filtered first and then the scoring calculation of the job is performed.

One of the available computational job scoring models is as follows:

Queue

Dynamic Factor = Queue Size / Queue Capacity * (1 – Number of Job Runs / Queue Parallelism).

The meaning of this equation is that if a queue is waiting for a large proportion of jobs and a large proportion of jobs running in parallel, the jobs in this queue have a larger factor, which means that when the queue weight is the same, the jobs in this queue should be scheduled first.

Job weight = 1 – (current time – queue time) / Timeout The

equation means that if a job has less timeout remaining in the

same queue, it means that the job will reach the timeout faster, so it should have a greater chance of selection.

Score = Job weight + queue dynamics factor + queue weight

The meaning of this equation is: for all tasks in all queues, the first factor that determines whether a job is scheduled first is the set queue weight, for example, a job in a queue with a weight of 10 should be scheduled in priority over a job in a queue with a weight of 1, regardless of the weight of the job itself (whether there is a high probability of timeout); For example, when there are two queues with the same weight, if the dynamic factor of one queue is 0.5 and the dynamic factor of the other queue is 0.3, then the queue job with a dynamic factor of 0.5 should be preferred for scheduling, regardless of the weight of the job itself; For example, in the same queue, there are two jobs with weights of 0.2 and 0.5, so in order to avoid more job timeouts, jobs with a weight of 0.2 should be scheduled first.

To briefly describe the sorting process of jobs, first sort all queues by queue weight; For queues with duplicates, the dynamic factor for each cohort is calculated and ordered by this factor; For each queue, the collation of the jobs is sorted by the job’s timeout ratio; Finally, go through each queue in sequence, trying to select enough jobs to run until the jobs are all running or the cluster limit is reached. Enough here means that each queue will have a maximum degree of parallelism and a maximum proportion of resources, and the combination of these two parameters that limit the queue is to avoid the situation that the capacity and parallelism of a certain queue are set too large, which may exceed the entire cluster, resulting in other queues being “starved”.

SQL job flow

The user submits the original SQL through the client, here taking Presto SQL as an example, when the client submits the job, specifies the SQL route, it will first access the SQLParser service, before sending it to the master, it will first submit the SQL statement to the SQLParser server, and parse the SQL into SQL statements that the back-end computing cluster can support, such as Spark, Presto, ClickHouse, etc. To reduce the number of RPC interactions, SQLParser returns all SQL statements that may be overwritten at once.

After receiving multiple possible SQL statements returned by the SQLParser service, the current job object is populated and the run to the Master actually begins.

After receiving the job submitted by the user, the master will finally distribute the task to the appropriate worker according to a certain scheduling strategy and start execution. The worker will first use the default execution engine of the SQL job, such as Presto, to submit to the corresponding computing cluster to run, but if the result cannot be obtained for some reason, it will try to use other computing engines for calculation. Of course, it is also possible to submit jobs to multiple compute clusters at the same time, and cancel all other jobs once a cluster returns the result first, but this requires the entry of other computing clusters to support the cancellation operation.

When the SQL job is completed, the result is returned to the worker

side, in order to return the query result to the client more efficiently, the worker will extract the client side information from the task object sent by the master, and send the result directly to the client until the confirmation message is received, so that the entire task is executed.

During the entire flow of the job, the concept of the task will be propagated in the scheduling system, and it will go through several status updates, which identify the new, waiting, running, succeed, and failed phases.

Figure 9. SQL job processing process

Metrics collection

The data service collects two types of metrics, one is static, which describes the basic information of master/worker/client; One is dynamic and describes the runtime information of the master/worker. This mainly explains the collection process and function of cluster dynamic information. Taking the worker as an example, when the worker is successfully registered with the master, it will start the scheduled heartbeat reporting action and report its runtime information to the master through the heartbeat request. The main thing here is memory usage, for example, the current worker counts how much memory the currently running task occupies through the estimation method, so that the master can make decisions based on memory information during subsequent task distribution. Master counts the entire situation of the cluster it manages, such as the snapshot information of each task queue, the snapshot information of the worker, the runtime configuration information of the cluster, etc., and controls whether to print these information through parameters for debugging.

Resolved Performance Issues

Data service mainly addresses SLA issues. Such as crowd computing, data seamless migration, data product SLA, etc., here are used as examples of crowd as follows

:

Problems encountered in

crowd computing:

    data

  1. locality of crowd computing tasks is not good;

  2. HDFS has data hotspot issues;

  3. HDFS reads and writes themselves have a long tail.

Data service transformation new architecture scheme:

  • computing and storage colocation, so that data does not need to be repeatedly read through the network, resulting in waste of network traffic.

  • Reduce the additional impact of HDFS

  • read and write long tails on crowd computing, and reduce the impact of crowd computing on HDFS stability.

  • Advertisers calculate the types of tasks that fall between online production tasks and offline tasks. Here we hope to ensure the reliability and stability of such applications, so as to better empower the company’s business to

  • perform crowd computing through data services.

Figure 10. Alluxio and Spark clusters are mixed

based on Alluxio-based cache table synchronization

Replacing the location of a Hive table from the HDFS path with an Alluxio path indicates that the table’s data is stored in Alluxio. The scheme we use is not to directly write the data of the Alluxio table through the ETL task, but for Alluxio to actively pull the data in HDFS with the same Hive table structure, that is, we create an Alluxio cache table of HDFS table.

Since Alluxio cannot sense changes in the partitioned table, we have developed a scheduled task to automatically sense the partition changes of the source table, so that the data of the Hive table can be synchronized to Alluxio.

The specific steps are as follows:

  1. a scheduled task initiates a poll to detect whether the source table has added partitions.

  2. The task that initiates a SYN2ALLUXIO is performed by the data service.

  3. The task execution script adds the same partitions to the Alluxio table as the HDFS table.

  4. After the partition is added, Alluxio automatically completes data synchronization from the HDFS path of mount.

Figure 11.

The previous section of Alluxio cache table synchronization

crowd computing

task describes how to keep the data synchronized between Alluxio and

HDFS Hive tables, and the next thing to do is to let the Spark task of task calculation run on the Spark and Alluxio mixed cluster. Make full use of the locality of data and the isolation of computing resources to improve the computing efficiency of the population.

The crowd service is performed by calling the data service. The data service decides whether the Alluxio table is used to complete the calculation based on whether the bottom table partition is synchronized to Alluxio or not. If the base table data has been synchronized to Alluxio, the Alluxio table is used as the base table to calculate the population.

Relying on the data service scheduling system, through user SQL rewriting and the mixed mode of Alluxio and Spark computing nodes, the crowd computing task is accelerated by 10%~30%.

Although

Hera Data Services has supported many production operations as of today, there is still a lot to be desired:

    > Different engines have inconsistent functions with the same meaning. This situation is especially prominent when comparing Presto with ClickHouse functions, such as Presto’s strpos(string, substring) function, which is position (haystack, needle[, start_pos]) in Clickhouse, and the order of the parameters of these functions is inconsistent, how to support different engines more elegantly The discrepancies require further reflection.

  • Crowd computing adopts the industry-standard ClickHouse BitMap solution to improve the computing efficiency of the crowd and expand the business boundaries of data services.

END

Popular content

two years of experience to win an Ant/Headline/PingCAP Offer, awesome

." Kuaishou big data platform as a service

deep understanding of the Java memory model

Follow me, Java learning is not lost!"

Like + watch, less bugs 👇