directory

  • pipeline query requirements
  • what is real-time data
  • Instant query system
  • Big data requires
  • the

  • technical architecture of the real-time offline integrated system
  • The data flow of the real-time offline integrated system
    • real-time offline integrated access
    • data warehouse hierarchical normalization
    • precomputing scheme (Kylin+Kudu
    • Real-time offline development of unified access to data
    • ingress transparent data tiered storage
  • looks to the future
flow query requirements

are based on terabyte-level online data, which supports online query of payment bill details. Everyone knows that like bank statement flow, it is common to check the flow of several years.

Supported dimension queries: account period, arrears status, date range, expense account type, house classification, house project, related contract information, statistics column

> What is real-time data real-time can be divided into: real-time collection, real-time computing, high performance, and bottom delay output result data

. Real-time data refers to the data collected in real time from the source system, as well as the intermediate data or result data directly generated by real-time calculation of the real-time collected data. Real-time data has temporal validity and becomes invalidated over time.

real-time query system

housing rental fees, water and electricity costs, property management costs and other data validity period, generally indefinite, for example, office leases may be prepaid for 5 years, 10 years. Then this kind of data, for the business, is still online data, is non-archiveable data. Failure to archive data for a long time will cause data accumulation to become larger and larger, which is a great challenge for the lightweight database MySQL. Even if you are ready to shard and shard. Queries with complex conditions are just as likely to burst memory when aggregating. What’s more, the system is poorly designed at the dal layer. Refactoring to meet the flow query is too much to lose. From the perspective of requirements, OLTP is not involved, just the solution of OLAP is implemented. In order not to affect the transformation of business systems, database reconstruction, etc. The decision introduced an instant query system solution.
Business requirements translate technical requirements:
  • bill detail query can be randomly combined by more than 70 conditions, and cannot be solved using preprocessing techniques like kylin. Support N-year range of online
  • queries to support

  • complex conditional queries, such as joint multi-table and nested multi-level left join
  • In order to reduce the amount of SQL changes on the business side, you need to support standard SQL as much as possible
  • Frequently changing business data needs to be updated synchronously in real time
According to the above technical requirements and after technical screening, the landing architecture is as follows:

schema implementation

Real-time data synchronization—The Confluent Platform architecture implements

debezuim : The business database uses MySql, if the query result in the instant query system is the same as the query result of the business system, you need to synchronize the business data in real time and provide query capabilities in real time. Debezium is a low-latency streaming tool that captures database changes and logs to Kafka using Kafka and Kafka Connect for its own durability, reliability, and fault tolerance.
Confluent Platform : Mysql to Kudu requires a stable, efficient, elastically scalable data integration solution with high-speed and stable synchronization between heterogeneous data sources. Based on the red-hot kafka, Kafka Connect is the first choice. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. When in distributed working mode, it has high scalability and automatic fault tolerance mechanism. The confluent platform supports many Kafka connect implementations to facilitate the subsequent expansion of data integration services, including debezium-connector. In addition to this, the confluent platform uses the Kafka Schema Registry to provide Avro serialization support, which improves performance for serialization. However, the community version of the Confluent platform still provides relatively limited functions, such as the confluent center that provides monitoring interface management. To this end, we have developed a “Data Integration Service” with monitoring and operation and maintenance functions, and subsequent articles will introduce it in detail and provide an online experience.
Kudu-connector The Kudu Connector (Source and Sink) is available in the confluent platform, but it depends on Impala and Hive. From the perspective of requirements and architecture, these things are not needed, in order to comply with the principle of lightweight, in order to avoid too much dependence, we have implemented the lightweight Kudu-connector (source code address: https://github.com/dengbp/big-well). Kudu-Connector needs to use synchronization management to configure the tables to be synchronized, synchronize rules, create target tables, and finally create connectors to synchronize data. Kudu-Connector does not have the ability to automatically create target tables. Implemented in the next release.
Create source and sink connector parameters:

//source connector: curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "test_data-bill_rating_ rule_def-connector-souces","config": {"connector.class": " io.debezium.connector.mysql.MySqlConnector","database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "platform","database.password": "platform@0708", "database.server.id": "169798195","database.server.name": "test_data_ connector","database.whitelist": "test_data","table.whitelist ": "test_data.bill_rating_rule_def", "database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092", "database.history.kafka.topic": "dbhistory.inventory"," include.schema.changes": "true","database.history.skip.unparseable.ddl": "true", "decimal.handling.mode": "string","event.deserialization.failure.handling.mode:": "ERROR" }}'
//sink connector:  curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "test_data-bill_rating_rule_def-connector-sink-49","config": { "connector.class": "com.yr.connector.KuduSinkConnector","tasks.max": "16", "topics": "test_data_connector.test_data.bill_rating_rule_def","topic.table.map": "test_data_connector.test_ data.bill_rating_rule_def:bill_rating_rule_def","table.list": "bill_rating_rule_def", "key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": " org.apache.kafka.connect.storage.StringConverter", "database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092", "database.history.kafka.topic": "dbhistory.inventory", "kudu.masters": "kudu167:7051,cdh6-slave1:7051,cmhk-b-sl-236:7051,cdh6-slave2:7051,cdh6-slave3:7051,cdh6-slave4:7051,cdh6-slave5:7051,cdh6-slave6:7051,cdh6-slave7:7051 ,cdh6-slave8:7051,cdh6-slave9:7051","max.retries": "3"," retry.backoff.ms": "1000","behavior.on.exception": "FAIL"," linger.ms": "1000","batch.size": "5000","max.buffered.records" : "8000","flush.timeout.ms": "6000","offset.flush.timeout.ms": '5000' class='code-snippet_outer'>}}' Real-time
data bin— kudu
distributed columnar storage, supports row-level updates, can be processed quickly in OLAP scenarios, and is tightly integrated with other big data frameworks (it will be integrated with Hive Metastore hereinafter to provide a unified entrance for upper-layer developers to access lower-level data), which itself has high availability and is a member of the big data ecosystem of the Cloudera family, providing great space for the system to expand itself. This demand is mainly to synchronize the bill table data and the related table data used for bill query information, such as: lease contract data, project data, housing data, bill type and other data. From the analysis of business data characteristics, it is necessary to make a hash partition for the ID of the statement table and the invoice type, and the range partition of the statement creation time to create the bill target table, which can not only achieve uniform data distribution, but also retain the specified data in each shard, and continue to expand the time partition. Other related data tables, according to the characteristics of query association, also use a combination of hash partition and range partition.
(Network diagram)

query engine—Implement second-level response—Presto

Does not rely on Hive metadata service, simple to deploy. In terms of SQL syntax, although there are a small number of violations of the standard (such as: ORDER BY is required for paging, TIMESTAMP is required for time comparison, etc.), the overall support for standard SQL is extremely high. This is less costly for current business system changes. When accessing services, in addition to optimizing the performance of some SQL instances, you only need to configure multiple JDBC data sources. This is just an ideal state, because the entire business system uses the msyql database, so the slow development process, it is inevitable to use mysql-specific syntax, which causes more troublesome SQL compatibility problems. In this regard, we chose to make the official presto-jdbc two to support MySQL syntax as much as possible, such as group by, time size comparison, etc.

Expand the business coverage

from the initial bill detail inquiry to the entire platform common instant query system. All data involving OLAP queries and above the terabyte level are connected to the instant query system. Service deployment has also grown from a dozen nodes to more than thirty nodes.

64g Kudu 11 32-core 64g

deployment configuration
service name number of nodes Configure
Confluent Platform 7 32-core
Presto 15 32-core 64g
Zeppelin 1 6-core 32g

big data requirements

In asset leasing management services, in addition to understanding customer complaints, satisfaction surveys, water and electricity usage, equipment failures and other statistical analysis, it is also necessary to help customers do precision marketing of leasing business, crawl peer public data online, provide competitive product data analysis, and guide customer business decisions.

technical architecture of real-time offline integrated system

.”  

the data stream of the real-time offline integrated system

real-time offline integrated access to

big data sources are mainly divided into three:
    > the first source is the Mysql database (business analysis) of the internal system
  • The second source is the application app (user track) and
  • the

  • third source is the external system network collection (peer data, for competitive analysis, industry analysis).
  • Log files (business access, business data printed on log files)
Some real-time data can be produced with simple cleaning, such as: heterogeneous data synchronization, the above-mentioned instant query system, etc. This kind of data does not need to enter the ODS layer. In order to be able to unify the access to the instant query system. All source data is uniformly connected to the ODS layer (hdfs) or APP layer (Kudu) in real time by the integration service.

data warehouse layering normalized data layering is

popular to divide into four layers (about data warehouse layering, students who do not understand need to find their own articles to supplement their brains), here is no exception, but our storage and access at each layer need to solve the integration problem, the reason is related to the technical architecture we use. Next, let’s talk about how each type of data stream is stored after it comes in and after layers of analysis. Let’s start with the previous visualization:
For data that requires real-time, after entering kafka, the application data is directly output to Kudu or Mysql through ETL and provided to the application for use. It is equivalent to adding ETL functionality to the previous instant query system, no longer the previous simple kafka connector. If you need to do offline analysis, customized query or data analysis with low real-time requirements, you can enter the ODS through Hive through the data integration channel. Then the pre-calculated results of the developed program are put on the upper layer of data (DW and APP layer), our principle is: the higher the data, the more to the real-time warehouse Kudu. For offline computing, queries that can be solidified, if with the increase of data volume and computational complexity, even if we use the above instant query system, the response time cannot be guaranteed (even if the calculation nodes can be increased, if the query tree can no longer be split), so we choose the precalculation solution

precomputing scheme (Kylin+Kudu)

.” As we all know, inquiries in enterprises are generally divided into two types: ad hoc inquiries and customized queries. For instant query needs, we use presto and Impala as engines (why do we use two?). This problem is related to our demand evolution and the company’s system architecture, presto from the perspective of supporting standard SQL, can alleviate the business side to the existing functional SQL transformation, simply put, in order to be compatible with the status quo. The environment dependencies of the deployment are also relatively simple and convenient for deployment; Impala is mainly used for new functions of big data requirements, and it is convenient to retrieve the aggregation of hot and cold data). The custom query, its scenarios are mostly the user’s operation or the offline business data real-time analysis, if you use Hive or SparkSQL as the query engine, it is estimated to take minutes or even tens of minutes to respond, obviously can not meet the demand. For a long time, enterprises can only calculate the data in the data warehouse in advance, and then store the calculated results on the APP layer or DW layer, and then provide them to users for query. However, as we said above, when the complexity of the business and the amount of data gradually increase, the development cost and maintenance cost of using this solution have increased significantly. Therefore, the solution is to return queries that have been solidified in sub-seconds. We use Apache Kylin, we only need to define the query dimensions in advance, Kylin can help us calculate, and store the results in the result table. This not only solves the problem of fast query of massive data, but also reduces the cost of manually developing and maintaining advance calculation programs.
But Kylin puts the calculation results into Hbase by default, and from the above figure, you don’t see Hbase, but Kudu. Because we integrated Kylin with Kudu ourselves.

> Kylin uses the Kudu storage engine

Storage engine, we introduce the self-developed storage-kudu module to replace the default storage-hbase. Kylin relies on three modules: data source, build engine, and storage engine. As for the data in kudu, because the above has solved the scheme of Hive supporting kudu, Kylin can also load data into Kudu through Hive. We used the Spark compute engine powered by Kylin. Spark also supports integration with Kudu. From the source code point of view, the Kylin architecture requires the extension storage engine to implement the IStorage interface, which has two functions, one is to specify the construction of the Cube engine interface adaptToBuildEngine and the createQuery interface that can query the Cube, and the rest of the data in Kudu access details are basically directly used by Spark to support Kudu’s API.

real-time offline development of unified access data entry

Some analysis data, such as user satisfaction surveys, utility bill usage statistics, etc., already exist in the instant query system, and there is no need to synchronize a copy of the data to HDFS. In order to reduce storage space costs and avoid multiple copies of data, it is necessary to at least solve the problem that the data in Kudu can be accessed by hive. However, in the hive version we use, hive does not support the operation of the Kudu table, and the latest hive 4.0 version is not fully developed.

Problem to be

solved:
  • Kudu table data exists in the immediate system, which needs to be accessible through Hive, which is modeled after Impala, creating an external table and mapping kudu’s table to Hive
  • Hive can create tables, query, update, and delete operations like Impala
  • , and Kylin can use Kudu tables
  • Ensure the consistency of data structure and metadata information
Hive, Kudu metadata integration:
From the information released on the Hive official website and source code analysis, the core classes KuduStorageHandler, KuduSerDe, KuduInputFormat, and KuduOutputFormat have implemented a branch function, and KuduMetaHook has not yet been implemented. From the source code, KuduStorageHandler has inherited DefaultStorageHandler and implemented HiveStoragePredicateHandler, and then implemented interaction with HMS to perform related operations on Kudu meta and can discover Kudu tables and operate (with CDH 6.3.2 Upgrade Hive to 4.0.0). The same version is used in the article).
Among them, the real-time system synchronizes the table data to Kudu in real time, and also needs to create Hive external tables to map Kudu tables to Hive, which is also implemented in KuduStorageHandler, including data query, modification, and deletion. In the “Synchronization Management” module of the data integration service, every time you create a data synchronization task, it will connect to Hive and create an external mapping table of Kudu.
In this way, whether SparkSQL, Kylin or HQL is used at the upper level to access hdfs or kudu tables, it is a unified entry point for developers or data consumers.

Transparent tiered data storage

In the entire system architecture, there are two places to store data, one is Kudu and the other is HDFS. Most of the data stored in Kudu is real-time query system data and APP layer and DWS layer data after business processing analysis. Real-time data can be flashed to HDFS when there is no change; These data such as the APP layer gradually become cold data over time. Then when the data becomes cold, it needs to be migrated to HDFS. After data migration, query data is incomplete, how to achieve smooth data migration without affecting the integrity of query?

Part of the data is in

Kudu and part of the data is in HDFS, which solves the integrity of the query and is mainly implemented through View.
Migrate cold data in Kudu to HDFS every day, how to identify which cold data is provided by the service, and according to the business situation, the business side provides a time period of hot and cold data for each table. When the data exceeds the time period, the program will be migrated to HDFS. You need to create or modify a View after each migration is complete. Otherwise the data would not be found. View needs to define the query time range on Kudu and HDFS. For example,

>create view tb_uhome_acct_item_view as SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE ,CONCAT( BILL_DATE_START, BILL_DATE_END ),LEASE_POSITION,OBJ_CODEFROM tb_uhome_acct_item WHERE create_date >= "2017-01-01"UNION ALL SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE ,CONCAT( BILL_DATE_START, BILL_DATE_END ),LEASE_POSITION,OBJ_CODEFROM tb_uhome_acct_item_hdfs WHERE create_date < "2017-01-01"

Each

side of the data has table fields create_date the range is limited, and then modify the View after the migration is successful, so that there is no time to check the data, and some data will not be retrieved.

In addition, in the

previous instant query system, the Kudu table data synchronized through the connector, when synchronizing, in the data integration system, it is necessary to create an external table of Impala and map the table of Kudu to impala, so that Impala can find it. 

Looking

ahead1, based on the integrated architecture, we can provide more capabilities in the future to make more storage engines support Hive Metastore and enrich HMS’s metadata service support.

2. Data delay monitoring, delay

and LAG monitoring of each topic message of Kafka, so as to achieve delay monitoring of the entire data link.
3. Hive supports Kudu to continue optimization. The implementation of querying some data in Kudu and some data in hdfs through Hive is not perfect, and some DDL needs to be improved.
4. Kylin continues to open the two, using Spark to dynamically build a Cube according to the dimensions and measurement requirements of users collected in the data integration service.
 

end



public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.

like + Looking, less bugs 👇