background and pain points
Pain point 1: Kafka data loss
pain point 2: Summary of near real-time Hive pressure
Iceberg optimization practice
First, the background and pain points
2. Original architecture scheme
3. Pain points
Flink does near real-time read and write support on Hive. In order to share the pressure of Kafka, put some data that is not too real-time into Hive, and let Hive do minute-level partitioning. But as metadata grows, the pressure on Hive metadata increases, queries become slower, and so does the pressure on the database where Hive metadata is stored.
II. Iceberg Architecture
1. Iceberg architecture analysis
The Iceberg table is a file that actually stores data, usually stored in the data directory, ending in “.parquet”.
Each row is a detailed description of each data file, including the status of the data file, file path, partition information, column-level statistics (such as the maximum minimum value per column, the number of null values, etc.). With this file, irrelevant data can be filtered out and retrieval speed can be improved.
A snapshot represents the state of a table at a point in time. Each snapshot version contains a list of all data files at a given moment. Data files are stored in different manifest files, which are stored in a manifest list file and a manifest list file represents a snapshot.
2. Iceberg query plansquery
The manifest file includes a partition data tuple and column-level statistics for each data file. During planning, query predicates are automatically converted to predicates on partitioned data and applied first to the filtered data file. Next, column-level value counts, null counts, lower bounds, and upper bounds are used to eliminate files that do not match the query predicate.
Each Snapshot ID is associated with a set of manifest files, and each set of manifest files contains many manifest files.
Each manifest file records the metadata information of the
current data data block, which contains the maximum and minimum values of the file columns, and then indexes to the specific file block according to this metadata information, so as to query the data faster.
3. Pain point 1: Kafka data loss
1. Introduction to pain points
3. Why can Iceberg only do near real-time access to the lake?
Iceberg commits a transaction at file granularity. This makes it impossible to commit transactions in seconds, otherwise the number of files will swell;
There is no online service node. For real-time high-throughput and low-latency writes, pure real-time response cannot be obtained;
Flink writes in checkpoints, physical data can not be directly queried after writing to Iceberg, when the checkpoint is triggered, the metadata file will be written, and the data will change from invisible to visible. Each time a checkpoint is executed, it will take a certain amount of time.
4. Flink inflow analysis
It is mainly used to write records to the corresponding avro, parquet, orc files, generate a corresponding Iceberg DataFile, and send it to the downstream operator.
A list of DataFile files is maintained for each checkpointId, i.e. map
5. Flink SQL Demo Flink
■ 5.1 Preliminary work
enable table sql hint to use the OPTIONS propertyset
registers the Iceberg catalog for operations Iceberg table
CREATE CATALOG Iceberg_catalog WITH (\n " +
" 'type'=' Iceberg',\n" +
" 'catalog-type'='Hive'," +
" 'uri'=' thrift：//localhost：9083'" +
insert into Iceberg_catalog. Iceberg_db.tbl1 \n
select * from Kafka_tbl;
insert into Iceberg_catalog. Iceberg_db.tbl2
select * from Iceberg_catalog. Iceberg_db.tbl1
/*+ OPTIONS('streaming'= 'true',
'monitor-interval'= '10s',snapshot-id'='3821550127947089987 ')*/
■ 5.2 parameter interpretation
The interval between continuous monitoring of newly submitted data files (default: 1s).
Starting from the specified snapshot ID, the data is read, and each snapshot ID
is associated with a set of manifest file metadata files, each metadata file maps its own real data file, and through the snapshot ID, a certain version of the data is read.
6. Pit step record
I used to write data to Iceberg in SQL Client, and the data directory data has been updated, but the metadata has no data, resulting in no number when querying, because Iceberg’s query requires metadata to index the real data. SQL Client does not have checkpoint enabled by default, and needs to be enabled through a configuration file. Therefore, it causes the data directory to write data and the metadata directory not to write metadata. PS: Checkpoint must be turned on whether you enter the lake through SQL or Datastream.
following two figures show the effect of querying Iceberg in real time, one second before and one second after the data changes.
Fourth, pain point two:
Flink combined with Hive in near real-time is getting slower and slower
1. Introduction to
Although the near real-time architecture of Flink + Hive supports real-time read and write, the problem brought by this architecture is that as the number of tables and partitions increases, it will face the following problems: Hive
changes the partition to hour/minute level, although it improves the near real-time performance of data, but the pressure of metestore is also obvious, too much metadata leads to slow generation of query plans, and it will also affect the stability of other online services.
As the metadata increases, so does the pressure on the database where Hive metadata is stored, and after a period of time, the library needs to be expanded, such as storage space.
2. The solution
migrated the original Hive to Iceberg in near real time. Why can Iceberg handle the problem of large amounts of metadata, while Hive tends to form a bottleneck when the metadata is large?
Iceberg maintains metadata on an extensible distributed file system with no centralized metadata system;
metastore (too many partitions put a lot of pressure on mySQL), and the metadata in the partition is actually maintained in the file (the startup job needs to list a large number of files to determine whether the file needs to be scanned, the whole process is very time-consuming).
Hive maintains the metadata on the partition in the
V. Optimization Practice
1. Small file processing
- Iceberg 0.11
Before Iceberg 0.11, small file merging was done by periodically triggering the batch API, so that although it could be merged, it required to maintain a set of Actions code, and it was not merged in real time.
Table table = findTable(options, conf);
.targetSizeInBytes (10 * 1024) // 10KB
new feature in Iceberg 0.11, support for streaming small file merging.
CREATE TABLE city_table (
) PARTITIONED BY (province, city) WITH (
Iceberg 0.11 Sorting
■ 2.1 Sorting Introduction
Before Iceberg 0.11,
Flink did not support the Iceberg sorting function, so before it could only be combined with Spark to support the sorting function in batch mode, 0.11 The addition of sorting feature support also means that we can also experience this benefit in real time. The essence of sorting is to scan faster, because after aggregating according to the sort key, all the data is arranged from smallest to largest, and max-min can filter out a large amount of invalid data.
■ 2.2 Sorting demo
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
3. Iceberg sorted manifest details
parametric solution Interpretation
partition: The partition to which the file belongs.
lower_bounds: In the file, there are multiple sort fields with minimum values, and the following figure is my days and province_id minimums.
upper_bounds: In the file, the maximum value of multiple sort fields, the following figure is my days and province_id maximum.
>file_path: Physical file location.
Through the partition, column