Iceberg provides two configurations:
public static final ConfigOption TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
ConfigOptions.key ("table.exec.iceberg.infer-source-parallelism") .booleanType() .defaultValue(true)
.withDescription( "If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n"); public static final ConfigOption TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
ConfigOptions.key( "table.exec.iceberg.infer-source-parallelism.max") .intType() .defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");
-
table.exec.iceberg.infer-source-parallelism: The default is true, which means that the parallelism of the source is configured according to inference, and if the configuration is false, then the parallelism is configured according to the configuration. -
table.exec.iceberg.infer-source-parallelism.max: The default is 100, the maximum degree of parallelism of the source operator.
These two parameters are called only in FileSource’s
inferParallelism method:
int inferParallelism(FlinkInputFormat format, ScanContext context) {
Read the table.exec.resource.default-parallelism configuration, default is -1
int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
Read the table.exec.iceberg.infer-source-parallelism configuration, the default is true
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_ PARALLELISM)) {
Read the table.exec.iceberg.infer-source-parallelism.max configuration, which defaults to 100
int maxInferParallelism = readableConfig.get(FlinkConfigOptions . TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); Preconditions.checkState( maxInferParallelism >= 1,
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
Get the table's splitNum
int splitNum;
try {
FlinkInputSplit[] splits = format.createInputSplits(0); splitNum = splits.length; } catch (IOException e) {
throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e); } parallelism = Math.min(splitNum, maxInferParallelism); } if (context.limit() > 0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit(); parallelism = Math.min(parallelism, limit); } // parallelism must be positive.
parallelism = Math.max(1, parallelism);
return parallelism; }
In the following code, get the splitNum
FlinkInputSplit[] splits = format.createInputSplits(0); splitNum = splits.length; public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
load catalog tableLoader.open(); final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());
try (TableLoader
loader = tableLoader) { load
table Table table = loader.loadTable(); call
return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
} finally { workerPool.shutdown(); } } static FlinkInputSplit[] planInputSplits(Table table, ScanContext context, ExecutorService workerPool) {
Mainly through the planTasks method
(CloseableIterable tasksIterable = planTasks(table, context, workerPool)) { List tasks = Lists.newArrayList(tasksIterable); FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
boolean exposeLocality = context.exposeLocality(); Tasks.range(tasks.size()) .stopOnFailure() .executeWith(exposeLocality ? workerPool : null)
.run(index -> { CombinedScanTask task = tasks.get(index); String[] hostnames = null;
if (exposeLocality) { hostnames = Util.blockLocations(table.io(), task); } splits[index] = new FlinkInputSplit(index, task, hostnames);
}); return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e); } } The
planTasks
method relies mainly on scan.planTasks(), and the code is as follows:
public CloseableIterable planTasks () { CloseableIterable fileScanTasks = planFiles(); CloseableIterable splitFiles = TableScanUtil.splitFiles(fileScanTasks, targetSplitSize()); return TableScanUtil.planTasks(splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
} Get file conditions
public CloseableIterable planFiles() { Snapshot snapshot = snapshot(); if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}" , table, snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()), context.rowFilter()); Listeners.notifyAll( new ScanEvent(table.name(), snapshot.snapshotId(), context.rowFilter(), schema()));
return planFiles(ops, snapshot,
context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats()); } else {
LOG.info("Scanning empty table {}", table);
return CloseableIterable.empty(); } } // split
public static CloseableIterable splitFiles(CloseableIterable tasks, long splitSize) {
Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); Iterable splitTasks = FluentIterable .from(tasks) .transformAndConcat(input -> input.split(splitSize)); // Capture manifests which can be closed after scan planning
return CloseableIterable.combine(splitTasks, tasks); } public static CloseableIterable planTasks(CloseableIterable splitFiles,
long splitSize, int lookback, long openFileCost) {
Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);
Preconditions.checkArgument(lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback);
Preconditions.checkArgument(openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); // Check the size of delete file as well to avoid unbalanced bin-packing
Function weightFunc = file -> Math.max( file.length() + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(), (1 + file.deletes().size()) * openFileCost);
return CloseableIterable.transform(
CloseableIterable.combine( new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),
splitFiles), BaseCombinedScanTask::new);
}
After extrapolating the number of splits in the table file, then it is time to decide the degree of parallelism:
parallelism = Math.min(splitNum, maxInferParallelism);
Take the configured maximum degree of parallelism
and the minimum number of splits, eg: set the maximum degree of parallelism of source to 50, but the number of splits divided according to the table file is 40, then the parallelism degree of source is 40.
If table.exec.iceberg.infer-source-parallelism is not configured to be true, the parallelism of table.exec.resource.default-parallelism prevails (the default is -1).
Continue analyzing the next code:
if (context.limit() > 0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit(); parallelism = Math.min(parallelism, limit); }
limit is like the value of limit in the SQL query statement, if limit is configured, then it
will also participate in the calculation of parallelism configuration, eg: if limti is 1, then parallelism takes the minimum value of parallelism and 1.
// parallelism must be positive.
parallelism = Math.max(1, parallelism);
return parallelism;
The final code guarantees a minimum degree of parallelism of 1.
Let’s take a look at the inferParallelism method call, only on the build() method call of the FileSource class:
public DataStream build(). {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null" ); FlinkInputFormat format = buildFormat(); ScanContext context = contextBuilder.build(); TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project())); if (!context.isStreaming()) {
Takes effect only in batch mode
int parallelism = inferParallelism(format, context);
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else {
StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context); String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
String readerOperatorName = String.format("Iceberg table (%s) reader", table); return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format)); }}}
You can see the testInferedParallelism method of the TestFlinkScanSql test class for testing:
@Test
public void testInferedParallelism() throws IOException {
Table table = catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC); TableLoader tableLoader = TableLoader.fromHadoopTable(table.location()); FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat(); ScanContext scanContext = ScanContext.builder().build(); // Empty table, infer parallelism should be at least 1
int parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0),
RandomGenericData.generate(TestFixtures.SCHEMA, 2 , 0L));
DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); // Make sure to generate 2 CombinedScanTasks
long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen); // 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext); Assert.assertEquals("Should produce the expected parallelism.", 2, parallelism);
// 2 splits and limit is 1 , max infer parallelism is default 100,
// which is greater than splits num and limit, the parallelism is the limit value : 1
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, ScanContext.builder().limit(1).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); // 2 splits and max infer parallelism is 1 (max < splits num), the parallelism is 1
Configuration configuration = new Configuration();
configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, 1 ); parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().build()); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
// 2 splits, max infer parallelism is 1, limit is 3, the parallelism is max infer parallelism : 1
parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); // 2 splits, infer parallelism is disabled, the parallelism is flink default parallelism 1
configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); Note
⚠️: This code module is
a stream read under
the flink module of the Iceberg project Iceberg
does not configure Source parallelism for stream read
Join Knowledge Planet to see the above article: https://t.zsxq.com/6YjyJYR
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/1flHOHZw6RsiccqrQCL7Spics3sv0nUIiaxSrK3kFICia5zTQ7GfrGGQ60EykeOUibNVYjq7oqJSICfVbmwibzdOKPOQ/640?wx_fmt=jpeg”>