Abstract: This article is compiled from the “Tens of Billions of Real-time Data Entering the Lake” shared by Chen Junjie, senior engineer of Tencent Data Lake R&D, on April 17 at Flink Meetup in Shanghai. Contents include: Tips:
Click on the end of the article Read the original article to see more technical dry goods~
GitHub address 
welcome to give  Flink likes and sends star~

1. Tencent Data Lake introduction

As can be seen from the figure above, the entire platform is relatively large, including data access, upper-layer analysis, intermediate management (such as task management, analysis management and engine management), and then to the lowest table format.

Second, the landing scenario of tens of billions of data landing

1. Traditional platform architecture

As shown in the figure above, the traditional platform architecture in the past was nothing more than two, one was the Lambda architecture and the other was the Kappa architecture:

    > In the Lambda architecture, batches and streams are separated, so O&M requires two sets of clusters, one for Spark/Hive and one for Flink. There are several problems:

      the

    • first is that the cost of operation and maintenance is relatively large;
    • The second is the cost of development. For example, on the business side, one has to write Spark, the other has to write Flink or SQL, and in general, the development cost is not particularly friendly to data analysts.
  • The second is the Kappa architecture. In fact, it is the message queue, to the underlying transmission, and then to do some analysis later. It is characterized by being relatively fast and has a certain real-time performance based on Kafka.

Both architectures have their pros and cons, the biggest being that storage can be discordant, resulting in fragmented data links. At present, our platform has been connected to Iceberg, and the following will explain the problems encountered and the process of solving them according to different scenarios.

2. Scenario 1: Hand Q security data enters the lake

Mobile QQ security data entering the lake is a very typical scenario.

The current business scenario is that the message queue TubeMQ is landed into ODS to Iceberg through Flink, and then Flink is used to do some user table associations, and then made into a wide table to do some queries, put it in COS, and may do some analysis in BI scenarios.

This process may seem ordinary, but you should know that the user association dimension table in hand Q is 2.8 billion, and the daily message queue is tens of billions, so it will face certain challenges.

■ Small File Challenge

1, Flink Writer generates small files

Flink writes without shuffle, and the distributed data is out of order, resulting in many small files.

2. High

delay requirements

The checkpoint interval is short, the commit interval is small, and the file size is a problem.

3. Small files explode

In a few days, the metadata and data small files exploded at the same time, and the cluster pressure was huge.

4. Merge small files and enlarge the problem

In order to solve the problem of small files, open Action to merge small files, resulting in more files.

5. There is no time to delete the data

Delete snapshots and orphan files, but there are too many scanned files, and the pressure on namenode is huge.

Solution

1: Flink synchronous merge

  • adds small file merge operators;

  • Added Snapshot automatic cleanup mechanism.

1)snapshot.retain-last.nums
2)snapshot.retain-last.minutes

Spark asynchronous merge<

ul class=”list-paddingleft-2″

  • >

    Add background services for small file merging and orphan file deletion;

  • Add small file filtering logic to gradually delete small files;

  • Increase per-partition merge logic to avoid generating too many deleted files at once and causing task OOM.

  • ■ Flink synchronous merge


    After committing all the data files, a Commit Result will be generated. We will take the Commit Result to generate a compressed task, then send it into multiple Task Managers to do the rewrite work, and finally commit the result to the Iceberg table.

    Of course, the key to this is what CompactTaskGenerator does. In the beginning, we wanted to merge as much as possible, so we did a scan of the table and scanned many files. However, its table is very large, and there are many small files, and a sweep makes the whole Flink hang up immediately.

    We came up with a way to scan the data incrementally each time we merged. From the previous Replace Operation to the present, do an increment to see how much has been added in between, and which ones are in line with the Rewrite strategy.

    There are actually many configurations in this, to see how many snapshots are reached, or how many files can be merged, and these places can be set by the user themselves. Of course, we also have default values to ensure that users can use these functions without realizing it.

    ■ Fanout Writer’s pit

    When Fanout Writer, you may encounter multiple tiers if you have a large amount of data. For example, the data of hand Q is divided into provinces and cities; But after the division, it was still very large, so it was divided into buckets. At this time, each Task Manager may be divided into many partitions, and each partition opens a Writer, and the Writer will be very large, resulting in insufficient memory.

    Here we do two things:

      > the first is KeyBy support. Do KeyBy actions according to the partitions set by the user, and then gather the same partitions in a Task Manager so that it does not open so many partitioned writers. Of course, this approach will bring some performance losses.

    • The second is to do LRU Writer, maintaining a Map in memory.

    3. Scenario 2: News platform index analysis

    The above is an online indexing architecture of news articles based on Iceberg streaming. On the left is Spark collects the dimension table above HDFS, and on the right is the access system, after collection, Flink and the dimension table will be used to make a Window-based Join, and then written to the index pipeline.

    ■ Function

    • quasi-real-time detail layer;

    • real-time streaming consumption;

    • Streaming MERGE INTO;

    • multidimensional analysis;

    • Offline analysis.

    ■ Scene

    characteristics

    The above scene has the following characteristics:

      > order of magnitude : Index single table exceeds 100 billion, single batch 20 million, daily average 100 billion;

    • Latency requirements: End-to-end data visibility in minutes;

    • Data source: full-volume, quasi-real-time increment, message flow;

    • Consumption mode: streaming consumption, batch loading, point check, row update, multi-dimensional analysis.

    Challenge: MERGE INTO

    has users who have proposed the demand for Merge Into, so we think about it from three aspects:

      > Function: Merge the flow table after each batch join into the real-time index table for downstream use;

    • performance: the downstream has high requirements for index timeliness, and it is necessary to consider that merge into can catch up with the upstream batch consumption window;

    • ease of use: Table API? Or the Action API? Or the SQL API?

    ■ Solution

    • refer to Delta Lake Design JoinRowProcessor;

    • Take advantage of Iceberg’s WAP mechanism to write temporary snapshots.

    • optionally skip Cardinality-check;

    • When writing, you can choose to only hash and not sort.

    • supports the Dataframe API;

    • Spark 2.4 supports SQL;

    • Spark 3.0 uses the community version.

    4. Scenario 3

    : Advertising data analysis

    ■ Advertising data mainly has the following characteristics:

      > Order of magnitude: 100 billion petabytes of data per day, a single 2K;

    • Source: SparkStreaming incremental into the lake;

    • Data characteristics: labels keep increasing, schema keeps changing;

    • How to use: Interactive query analysis.

    ■ Challenges encountered and corresponding solutions

    :

    • Challenge 1: Schema nesting complexity, Nearly 10,000 columns after tiling, OOM as soon as it is written.

    Solution: By default, each Parquet Page Size is set to 1M, which needs to be set according to the Executor memory.

    • Challenge 2: 30-day data basic cluster burst.

      Solution: Provides actions for lifecycle management, and documents distinguish between lifecycle and data lifecycle.

    1)column projection;
    2)predicate push down。

    Third,

    future

    planning for the future is mainly divided into the core side and the platform side.

    1. On the kernel

    side

    In the future, we hope to have the following plans on the kernel side

    :

    More data access

    • incremental lake support;

    • V2 Format support;

    • Row Identity support.

    Faster query
    • index support;

    • Alloxio acceleration layer support;

    • MOR optimization.

    Better data governance

      data
    • governance action;

    • SQL Extension support;

    • Better metadata management.

    2. On the platform side
    ,

    we have the following plans:

    ■ Data governance as a service
    ■ Incremental entry into the lake support
      Spark

    • consumption CDC into the lake;

    • Flink consumes CDC into the lake.

    ■ Indicator monitoring alarm

    4. Summary

    After the application and practice of mass production, we have three aspects of summary

    :

    • usability: Through the actual operation of multiple business lines, it was confirmed that Iceberg can withstand the test of tens of billions or even hundreds of billions per day.

    • Ease of use: The threshold for use is relatively high, and more work needs to be done to let users use it.

    • scenario support: The currently supported lake entry scenarios are not as many as Hudi, and the incremental reading is also relatively missing, which needs to be made up for by everyone’s efforts.