Summary: This article describes some of Qunar’s practices for using Flink + Iceberg 0.11. The content includes:

  1. background and pain points

  2. Iceberg architecture

  3. Pain point 1: Kafka data loss

  4. pain point 2: Summary of near real-time Hive pressure

  5. Iceberg optimization practice

Tips: ClickRead Original at the end of the article to view more technical dry goods~
 GitHub address >
https://github.com/apache/flink
welcome everyone to like Flink and send star~

First, the background and pain points

1

.

BackgroundWe encountered some problems in the process of using Flink for real-time data storage and data transmission: such as Kafka data loss, Flink combined with Hive’s near-real-time data warehouse performance, etc. The new features of Iceberg 0.11 solve the problems encountered in these business scenarios. Compared with Kafka, Iceberg has its own advantages in some specific scenarios, and here we have made some practical sharing based on Iceberg.

2. Original architecture scheme

The original architecture used Kafka to store real-time data, including logs, orders, tickets, and more. Then use Flink SQL or Flink datastream to consume the data for circulation. We have developed a platform for submitting SQL and Datastream internally and submit real-time jobs through this platform.

3. Pain points

    > Kafka storage is expensive and data-intensive. Kafka sets the data expiration time to a relatively short time due to high pressure, and when the data is backpressed or backlogged, if the data is not consumed within a certain period of time, resulting in data expiration, data loss will be caused.
  • Flink does near real-time read and write support on Hive. In order to share the pressure of Kafka, put some data that is not too real-time into Hive, and let Hive do minute-level partitioning. But as metadata grows, the pressure on Hive metadata increases, queries become slower, and so does the pressure on the database where Hive metadata is stored.

II. Iceberg Architecture

1. Iceberg architecture analysis


Terminology resolution

The Iceberg table is a file that actually stores data, usually stored in the data directory, ending in “.parquet”.

Each row is a detailed description of each data file, including the status of the data file, file path, partition information, column-level statistics (such as the maximum minimum value per column, the number of null values, etc.). With this file, irrelevant data can be filtered out and retrieval speed can be improved.

A snapshot represents the state of a table at a point in time. Each snapshot version contains a list of all data files at a given moment. Data files are stored in different manifest files, which are stored in a manifest list file and a manifest list file represents a snapshot.

2. Iceberg query plansquery

plans are the process of finding “files required for querying” in a table.

The manifest file includes a partition data tuple and column-level statistics for each data file. During planning, query predicates are automatically converted to predicates on partitioned data and applied first to the filtered data file. Next, column-level value counts, null counts, lower bounds, and upper bounds are used to eliminate files that do not match the query predicate.

Each Snapshot ID is associated with a set of manifest files, and each set of manifest files contains many manifest files.

Each manifest file records the metadata information of the

current data data block, which contains the maximum and minimum values of the file columns, and then indexes to the specific file block according to this metadata information, so as to query the data faster.

3. Pain point 1: Kafka data loss

1. Introduction to pain points

Usually we choose Kafka for real-time data storage and log transfer. Kafka is inherently expensive to store, and data retention is time-sensitive, once the consumption backlog reaches the expiration time, the data will be lost and not consumed.

2. Solutions

Put business data with low real-time requirements, such as a delay of 1-10 minutes. Because Iceberg 0.11 also supports SQL real-time reads and can also save historical data. This takes the pressure off Kafka online and ensures that data can be read in real time without being lost.

3. Why can Iceberg only do near real-time access to the lake?

  1. Iceberg commits a transaction at file granularity. This makes it impossible to commit transactions in seconds, otherwise the number of files will swell;

  2. There is no online service node. For real-time high-throughput and low-latency writes, pure real-time response cannot be obtained;

  3. Flink writes in checkpoints, physical data can not be directly queried after writing to Iceberg, when the checkpoint is triggered, the metadata file will be written, and the data will change from invisible to visible. Each time a checkpoint is executed, it will take a certain amount of time.

4. Flink inflow analysis

component introduction

It is mainly used to write records to the corresponding avro, parquet, orc files, generate a corresponding Iceberg DataFile, and send it to the downstream operator.

The other is called IcebergFilesCommitter, which is mainly used to collect all the DataFile files when the checkpoint arrives, and submit the transaction to Apache Iceberg to complete the data writing of the checkpoint and generate the DataFile.

A list of DataFile files is maintained for each checkpointId, i.e. map >, even if a checkpoint fails to commit a transaction in between, its DataFile file is still maintained in the State, and the data can still be submitted to the Iceberg table through subsequent checkpoints.

5. Flink SQL Demo Flink

Iceberg

real-time ingress process, consuming Kafka data to write to Iceberg and reading data from Iceberg in near real time.

■ 5.1 Preliminary work

set execution.type = streaming

  • enable table sql hint to use the OPTIONS propertyset

table.dynamic-table-options.enabled=true

  • registers the Iceberg catalog for operations Iceberg table

 CREATE CATALOG Iceberg_catalog WITH (\n " +  " 'type'=' Iceberg',\n" + " 'catalog-type'='Hive'," +  " 'uri'=' thrift://localhost:9083'" + " ); 
 insert into Iceberg_catalog. Iceberg_db.tbl1 \n  select * from  Kafka_tbl; 


  insert into Iceberg_catalog. Iceberg_db.tbl2  select * from Iceberg_catalog. Iceberg_db.tbl1  /*+ OPTIONS('streaming'= 'true', 'monitor-interval'= '10s',snapshot-id'='3821550127947089987 ')*/

■ 5.2 parameter interpretation

The interval between continuous monitoring of newly submitted data files (default: 1s).

Starting from the specified snapshot ID, the data is read, and each snapshot ID

is associated with a set of manifest file metadata files, each metadata file maps its own real data file, and through the snapshot ID, a certain version of the data is read.

6. Pit step record

I used to write data to Iceberg in SQL Client, and the data directory data has been updated, but the metadata has no data, resulting in no number when querying, because Iceberg’s query requires metadata to index the real data. SQL Client does not have checkpoint enabled by default, and needs to be enabled through a configuration file. Therefore, it causes the data directory to write data and the metadata directory not to write metadata.
PS: Checkpoint must be turned on whether you enter the lake through SQL or Datastream.

7. The

following two figures show the effect of querying Iceberg in real time, one second before and one second after the data changes.

Fourth, pain point two:

Flink combined with Hive in near real-time is getting slower and slower

1. Introduction to
pain points

Although the near real-time architecture of Flink + Hive supports real-time read and write, the problem brought by this architecture is that as the number of tables and partitions increases, it will face the following problems: Hive

changes the partition to hour/minute level, although it improves the near real-time performance of data, but the pressure of metestore is also obvious, too much metadata leads to slow generation of query plans, and it will also affect the stability of other online services.

As the metadata increases, so does the pressure on the database where Hive metadata is stored, and after a period of time, the library needs to be expanded, such as storage space.

2. The solution

migrated the original Hive to Iceberg in near real time. Why can Iceberg handle the problem of large amounts of metadata, while Hive tends to form a bottleneck when the metadata is large?

  • Iceberg maintains metadata on an extensible distributed file system with no centralized metadata system;

  • Hive maintains the metadata on the partition in the

  • metastore (too many partitions put a lot of pressure on mySQL), and the metadata in the partition is actually maintained in the file (the startup job needs to list a large number of files to determine whether the file needs to be scanned, the whole process is very time-consuming).

V. Optimization Practice

1. Small file processing

    Iceberg 0.11

  • Before Iceberg 0.11, small file merging was done by periodically triggering the batch API, so that although it could be merged, it required to maintain a set of Actions code, and it was not merged in real time.

Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles() .targetSizeInBytes (10 * 1024) // 10KB .execute(); 
  • new feature in Iceberg 0.11, support for streaming small file merging.

The advantage of using hash shuffling to write data through the partition/bucket key and merge files directly from the source is that a task will process the data of a certain partition and submit its own Datafile file, for example, a task only processes the data of the corresponding partition. This avoids the problem of multiple tasks handling and submitting many small files, and does not require additional maintenance code, just specify the attribute write.distribution-mode when creating the table, which is common to other engines, such as Spark.

 CREATE TABLE city_table (  province BIGINT, city STRING  ) PARTITIONED BY (province, city) WITH ( 'write.distribution-mode'='hash'  ); 

2.

Iceberg 0.11 Sorting

■ 2.1 Sorting Introduction

Before Iceberg 0.11,

Flink did not support the Iceberg sorting function, so before it could only be combined with Spark to support the sorting function in batch mode, 0.11 The addition of sorting feature support also means that we can also experience this benefit in real time.

The essence of sorting is to scan faster, because after aggregating according to the sort key, all the data is arranged from smallest to largest, and max-min can filter out a large amount of invalid data.

■ 2.2 Sorting demo

 insert into Iceberg_table select days from Kafka_tbl order by days, province_id; 

3. Iceberg sorted manifest details

parametric solution Interpretation

    >file_path: Physical file location.

  • partition: The partition to which the file belongs.

  • lower_bounds: In the file, there are multiple sort fields with minimum values, and the following figure is my days and province_id minimums.

  • upper_bounds: In the file, the maximum value of multiple sort fields, the following figure is my days and province_id maximum.

Through the partition, column

upper and lower limit information to determine whether to read the file_path file, after the data sorting, the file column information will also be recorded in the metadata, the query plan from the manifest to locate the file, do not need to record the information in the Hive metadata, thereby alleviating Hive metadata pressure to improve query efficiency.

Take advantage of the sorting feature of Iceberg 0.11 to use days as partitions. Sort by days, hours, and minutes, then the manifest file will record this collation, so as to improve query efficiency when retrieving data, which can not only realize the retrieval advantages of Hive partitions, but also avoid the pressure caused by excessive Hive metadata metadata.

VI. Summary

Compared with the previous version, Iceberg 0.11 has added many useful features, compared to the previous version, to make the following summary:

Prior to Iceberg 0.11, the sorting function integrated Spark, but not Flink, when a batch of Hive tables was migrated in bulk using Spark + Iceberg 0.10. The benefits on BI are: the original BI in order to improve the speed of Hive query to build a multi-level partition, resulting in too many small files and metadata, in the process of entering the lake, using Spark to sort BI frequently queried conditions, combined with implicit partitioning, and finally improve the speed of BI retrieval at the same time, there is no problem with small files, Iceberg has its own metadata, but also reduces the pressure on Hive metadata.

Icebeg 0.11 supports Flink sorting and is a useful feature point. We can transfer the original Flink + Hive partition to Iceberg sorting, which can not only achieve the effect of Hive partitioning, but also reduce small files and improve query efficiency.

Through the programming of SQL, real-time reading of data can be realized. The advantage is that data with low real-time requirements, such as business can accept a delay of 1-10 minutes, can be put into Iceberg, which can reduce the pressure on Kafka while also achieving near real-time reading of data and saving historical data.

Prior to Iceberg 0.11, small file merges needed to be maintained using Iceberg’s merge API, which required incoming table information as well as timing information, and merges were done in batches, not in real time. In terms of code, it increases maintenance and development costs; In terms of timeliness, it is not real-time. 0.11 Use hash to merge data in real time from the source, just specify the (‘write.distribution-mode’=’hash’) attribute when creating a table in SQL, without manual maintenance.