Iceberg provides two configurations:

  ConfigOptions.key ("table.exec.iceberg.infer-source-parallelism")      .booleanType()


      .withDescription( "If is false, parallelism of source are set by config.\n" +
          "If is true, source parallelism is inferred according to splits number.\n");


  ConfigOptions.key( "table.exec.iceberg.infer-source-parallelism.max")      .intType()


      .withDescription("Sets max infer parallelism for source operator.");
  • table.exec.iceberg.infer-source-parallelism: The default is true, which means that the parallelism of the source is configured according to inference, and if the configuration is false, then the parallelism is configured according to the configuration.
  • table.exec.iceberg.infer-source-parallelism.max: The default is 100, the maximum degree of parallelism of the source operator.

These two parameters are called only in FileSource’s

inferParallelism method:

int inferParallelism(FlinkInputFormat format, ScanContext context) {
Read the table.exec.resource.default-parallelism configuration, default is -1
int  parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
Read the table.exec.iceberg.infer-source-parallelism configuration, the default is true
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_ PARALLELISM)) {
Read the table.exec.iceberg.infer-source-parallelism.max configuration, which defaults to 100
int  maxInferParallelism = readableConfig.get(FlinkConfigOptions        . TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);    Preconditions.checkState(

        maxInferParallelism >= 1,

        FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
Get the table's splitNum
int splitNum;
    try {
      FlinkInputSplit[] splits = format.createInputSplits(0);      splitNum = splits.length;

    } catch (IOException e) {

      throw new UncheckedIOException("Failed to create iceberg input splits for table: "  + table, e);    }    parallelism = Math.min(splitNum, maxInferParallelism);  }

  if (context.limit() > 0) {

    int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();    parallelism = Math.min(parallelism, limit);  }

  // parallelism must be positive.

  parallelism = Math.max(1, parallelism);
  return parallelism; }

In the following code, get the splitNum

FlinkInputSplit[] splits = format.createInputSplits(0); splitNum = splits.length; 

  public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {

    // Called in Job manager, so it is OK to load table from catalog.
load catalog;

    final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());

try (TableLoader
loader = tableLoader) { load
table Table table = loader.loadTable();


return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
    } finally {      workerPool.shutdown();    } } static

FlinkInputSplit[] planInputSplits(Table table, ScanContext context, ExecutorService workerPool) {

Mainly through the planTasks method
 (CloseableIterable tasksIterable = planTasks(table, context, workerPool)) {      List tasks = Lists.newArrayList(tasksIterable);

      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];

      boolean exposeLocality = context.exposeLocality();      Tasks.range(tasks.size())          .stopOnFailure()

          .executeWith(exposeLocality ? workerPool : null)

          .run(index -> {            CombinedScanTask task = tasks.get(index);

            String[] hostnames = null;

            if (exposeLocality) {              hostnames = Util.blockLocations(, task);            }

            splits[index] = new FlinkInputSplit(index, task, hostnames);


      return splits;

    } catch (IOException e) {
      throw new UncheckedIOException("Failed to process tasks iterable", e);    } } The


method relies mainly on scan.planTasks(), and the code is as follows:

public CloseableIterable planTasks () {    CloseableIterable fileScanTasks = planFiles();    CloseableIterable splitFiles = TableScanUtil.splitFiles(fileScanTasks, targetSplitSize()); 

    return TableScanUtil.planTasks(splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());


Get file conditions

public CloseableIterable planFiles() { Snapshot snapshot = snapshot();

    if (snapshot != null) {"Scanning table {} snapshot {} created at {} with filter {}" , table,          snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),          context.rowFilter());      Listeners.notifyAll(

          new ScanEvent(, snapshot.snapshotId(), context.rowFilter(), schema()));

      return planFiles(ops, snapshot,

          context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());

    } else {"Scanning empty table {}", table);
      return CloseableIterable.empty();    }  }

  // split  

  public static CloseableIterable splitFiles(CloseableIterable  tasks, long splitSize) {
    Preconditions.checkArgument(splitSize > 0"Invalid split size (negative or 0): %s", splitSize);    Iterable splitTasks = FluentIterable        .from(tasks)        .transformAndConcat(input -> input.split(splitSize));

    // Capture manifests which can be closed after scan planning

    return CloseableIterable.combine(splitTasks, tasks);  }

  public static CloseableIterable planTasks(CloseableIterable splitFiles,
                                                              long splitSize, int lookback, long openFileCost)

    Preconditions.checkArgument(splitSize > 0"Invalid split size (negative or 0): %s", splitSize);
    Preconditions.checkArgument(lookback > 0"Invalid split planning lookback (negative or 0): %s", lookback);
    Preconditions.checkArgument(openFileCost >= 0"Invalid file open cost (negative): %s", openFileCost);

    // Check the size of delete file as well to avoid unbalanced bin-packing

    Function weightFunc = file -> Math.max(        file.length() + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),

        (1  + file.deletes().size()) * openFileCost);

    return CloseableIterable.transform(


            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),




After extrapolating the number of splits in the table file, then it is time to decide the degree of parallelism:

parallelism = Math.min(splitNum, maxInferParallelism); 

Take the configured maximum degree of parallelism

and the minimum number of splits, eg: set the maximum degree of parallelism of source to 50, but the number of splits divided according to the table file is 40, then the parallelism degree of source is 40.

If table.exec.iceberg.infer-source-parallelism is not configured to be true, the parallelism of table.exec.resource.default-parallelism prevails (the default is -1).

Continue analyzing the next code:

 if (context.limit() >  0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();    parallelism = Math.min(parallelism, limit); }

limit is like the value of limit in the SQL query statement, if limit is configured, then it

will also participate in the calculation of parallelism configuration, eg: if limti is 1, then parallelism takes the minimum value of parallelism and 1.

// parallelism must be positive. 
parallelism = Math.max(1, parallelism);
return parallelism;

The final code guarantees a minimum degree of parallelism of 1.

Let’s take a look at the inferParallelism method call, only on the build() method call of the FileSource class:

public DataStream build(). {
  Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null" );  FlinkInputFormat format = buildFormat();  ScanContext context =;  TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));

if (!context.isStreaming()) {

Takes effect only in batch mode
int parallelism = inferParallelism(format, context);
    return env.createInput(format, typeInfo).setParallelism(parallelism);
  } else {
    StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);

    String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);

    String readerOperatorName = String.format("Iceberg table (%s) reader", table);

    return env.addSource(function, monitorFunctionName)

        .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));  }}}

You can see the testInferedParallelism method of the TestFlinkScanSql test class for testing:

public void testInferedParallelism() throws IOException {
    Table table = catalog.createTable(TableIdentifier.of("default""t"), TestFixtures.SCHEMA, TestFixtures.SPEC);    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());    FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();    ScanContext scanContext = ScanContext.builder().build();

    // Empty table, infer parallelism should be at least 1

    int parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
    Assert.assertEquals("Should produce the expected parallelism."1, parallelism);

    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);

    DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20"0),
        RandomGenericData.generate(TestFixtures.SCHEMA, 2 0L));
    DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21"0),
        RandomGenericData.generate(TestFixtures.SCHEMA, 20L));    helper.appendToTable(dataFile1, dataFile2);

    // Make sure to generate 2 CombinedScanTasks

    long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
    sql("ALTER TABLE t SET (''='1', ''='%s')", maxFileLen);

    // 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2

    parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);

    Assert.assertEquals("Should produce the expected parallelism."2, parallelism);

    // 2 splits and limit is 1 , max infer parallelism is default 100,

    // which is greater than splits num and limit, the parallelism is the limit value : 1
    parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, ScanContext.builder().limit(1).build());
    Assert.assertEquals("Should produce the expected parallelism."1, parallelism);

    // 2 splits and max infer parallelism is 1 (max < splits num), the parallelism is  1

    Configuration configuration = new Configuration();
    configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, 1 );    parallelism = FlinkSource.forRowData()        .flinkConf(configuration)        .inferParallelism(flinkInputFormat, ScanContext.builder().build());

    Assert.assertEquals("Should produce the expected parallelism."1, parallelism);

    // 2 splits, max infer parallelism is 1, limit is 3, the parallelism is max infer parallelism : 1

    parallelism = FlinkSource.forRowData()        .flinkConf(configuration)

        .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());

    Assert.assertEquals("Should produce the expected parallelism."1, parallelism);

    // 2 splits, infer parallelism is disabled, the parallelism is flink default parallelism 1

    configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);    parallelism = FlinkSource.forRowData()        .flinkConf(configuration)

        .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());

    Assert.assertEquals("Should produce the expected parallelism."1, parallelism);  Note

⚠️: This code module is

a stream read under

the flink module of the Iceberg project Iceberg

does not configure Source parallelism for stream read

Join Knowledge Planet to see the above article:

class=”rich_pages wxw-img” src=””>

Buy Me A Coffee