there are two configurations in HiveOptions
public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key( "table.exec.hive.infer-source-parallelism")
.defaultValue(true) .withDescription( "If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n"); public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
key(" table.exec.hive.infer-source-parallelism.max")
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.") ;
-
table.exec.hive.infer-source-parallelism: The default value is true, which means that the parallelism of the source is inferred based on the number of data partitions and files, and if set to false, the degree of parallelism is based on the configuration -
table.exec.hive.infer-source-parallelism.max: The default value is 1000, which indicates the maximum degree of parallelism of the source for reading Hive data
These two parameters are only used in the HiveParallelismInference class, which is observed to be a tool class specifically configured for Hive parallelismInference, with the following code:
/** * A utility class to calculate parallelism for Hive connector considering various factors. */
class HiveParallelismInference { private static final Logger LOG = LoggerFactory.getLogger (HiveParallelismInference. class);
private final ObjectPath tablePath;
private final boolean infer;
private final int inferMaxParallelism; private int parallelism;
HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) { this.tablePath = tablePath;
Get the table.exec.hive.infer-source-parallelism configuration and assign the value,
this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
Get the table.exec.hive.infer-source-parallelism.max configuration and assign the value
this .inferMaxParallelism = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); Preconditions.checkArgument( inferMaxParallelism >= 1,
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1" );
Get the table.exec.resource.default-parallelism configuration
this.parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_ PARALLELISM); }
/** * Apply limit to calculate the parallelism. * Here limit is the limit in query SELECT * FROM xxx LIMIT [limit]
. */
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, ( int) (limit / 1000)); } // make sure that parallelism is at least 1
return Math.max(1, parallelism); } According to
/** * Infer parallelism by number of files and number of splits * If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
*/
HiveParallelismInference infer( SupplierWithException numFiles, SupplierWithException numSplits)
{
If table.exec.hive.infer-source-parallelism is set to false,
if (!infer) {
return this; } try {
// `createInputSplits` is costly,
// so we try to avoid calling it by first checking the number of files
which is the lower bound of the number of splits
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) { parallelism = inferMaxParallelism; return this;
} int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism); } catch (IOException e) {
throw new FlinkHiveException(e); } return this;
} private int logRunningTime(
String operationName, SupplierWithException supplier) throws IOException {
long startTimeMillis = System.currentTimeMillis();
int result = supplier.get(); LOG.info( "Hive source({}}) {} use time: {} ms, result: {}",
tablePath, operationName, System.currentTimeMillis() - startTimeMillis, result); return result;
}}} You can see
that the comment is mainly the parallelism inference done by the infer method, which has two parameters numFiles and numSplits, which is only called in the getDataStream method in the HiveTableSource class, you can see the following figure:

let’s take a look at the implementation of these two methods:
The getNumFiles method is used to get the number of files below the Hive table partition:
public static int getNumFiles(List partitions, JobConf jobConf) throws IOException {
int numFiles = 0;
FileSystem fs = null;
for (HiveTablePartition partition : partitions) { StorageDescriptor sd = partition.getStorageDescriptor(); org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) { fs = inputPath.getFileSystem(jobConf); } // it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue ; } numFiles += fs.listStatus(inputPath).length; } return numFiles;
}
createInputSplits method is used to split the files under the Hive table partition into logical InputSplit, here in the Flink Hive Connector defines a HiveSourceSplit class to wrap InputSplit, containing Hive table partition information.
public static List createInputSplits(
int minNumSplits, List partitions, JobConf jobConf)
throws IOException {
List hiveSplits = new ArrayList<>();
FileSystem fs = null;
for (HiveTablePartition partition : partitions) { StorageDescriptor sd = partition.getStorageDescriptor(); org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) { fs = inputPath.getFileSystem(jobConf); } // it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue ; } InputFormat format; try {
format = (InputFormat) Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new FlinkHiveException("Unable to instantiate the hadoop input format" , e); } ReflectionUtils.setConf(format, jobConf); jobConf.set(INPUT_DIR, sd.getLocation()); //TODO: we should consider how to calculate the splits according to minNumSplits in the future.
org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits); for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) {
Preconditions.checkState(inputSplit instanceof FileSplit,
"Unsupported InputSplit type: " + inputSplit.getClass().getName());
hiveSplits.add(new HiveSourceSplit((FileSplit) inputSplit, partition, null)); } } return hiveSplits;
Because
the execution of the
above two methods may take a little time, a logRunningTime is written specifically to record the time of their execution.
If the number of files is greater than the configured maximum degree of parallelism, the degree of parallelism of the
job is directly based on the configured maximum degree of parallelism. Otherwise, the minimum of the number of InputSplits and the configured maximum degree of parallelism is taken.
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) { parallelism = inferMaxParallelism; return this;
}int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);
Then there’s the limit method’s
limit of parallelism
: /** * Apply limit to calculate the parallelism. * Here limit is the limit in query SELECT * FROM xxx LIMIT [limit.] ]
. */
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, ( int) (limit / 1000)); } // make sure that parallelism is at least 1
return Math.max(1, parallelism); }
The comment of this method means to configure the degree of parallelism according to the limit of the query statement, judge the size of the parallelism obtained earlier and limit/1000, and take the minimum of the two. For example, the previous judgment that this Hive table partition has a lot of files, such as 10001, which is greater than the default maximum value of 1000, then the returned parallelism is 1000, but because the SQL of querying Hive is only 100, then the minimum value obtained here is 0, and the final source parallelism returned by Math.max(1, parallelism) is 1.
Note ⚠️: The parallelism configuration above is only for batch jobs to query Hive data, not for stream reading Hive data.

Stream read Hive does not configure Source parallelism for stream read in the getDataStream method in the
HiveTableSource class.
Join the Planet of Knowledge can be seen in the article above: https://t.zsxq.com/E6Mj6uv
