Under the command of Flink on K8s to submit the job by default, we will specify the CPU and Memory of the JM/TM of the job, and finally the pod generated by the job and its CPU/Memory request/limit are the same resources, but the resources used by the job when it is actually running are far from the limit value, so it will Waste of machine resources (The water level is not high, but the machine can no longer apply for pods).

For example, the following command: (TM resources specified, JM resources not specified).

 ./bin/flink run-application -p 1 -t kubernetes-application  -c com.zhisheng.Test \  -Dkubernetes.cluster-id=flink-log-alert-test1 \  -Dtaskmanager.memory.process.size=6g \  - Djobmanager.memory.process.size=2g \  -Dkubernetes.jobmanager.cpu=0.5 \  -Dkubernetes.taskmanager.cpu=1 \  -Dtaskmanager.numberOfTaskSlots=1 \  ....




1, add parameters to JM/TM memory and CPU respectively set request and limit,

if it works, this way needs to add 8 parameters to meet the demand, but because the Flink memory model makes it very complicated to set the request/limit of memory separately, only CPU parameters can be set, and the previous parameters will become unusable.

2, respectively JM/TM memory and CPU add parameters limit factor,

user-configured memory or CPU value defaults to the value of request, limit factor must be >= 1, this way needs to add four parameters, compared to the first method This method is simpler, but at present, YARN cluster user resource configuration, most jobs have a certain amount of waste of resources (the resources applied are much larger than the actual resources used), If you use this method, after the user’s job is migrated to the K8s cluster, the problem of resource waste is not solved.

3. Add parameter request factors to JM/TM memory and CPU respectively, the user-configured memory or CPU value defaults to the value of limit, the request factor must be <= 1, we can configure a reasonable value according to the production data, such as 0.5. This method also needs to add four parameters, but the advantage of this method over the second is that the resource allocation of most user jobs will be more reasonable, and the same resources of the machine can run more pods, which can increase the resource level of the machine.

Code development

1, add configuration to

KubernetesConfigOptions in KubernetesConfigOptions.java

public static final ConfigOption JOB_MANAGER_CPU_REQUEST_FACTOR =
    key("kubernetes.jobmanager.cpu.request-factor")    . doubleType()



    "The request factor of cpu used by job manager. "

    + "The resources request cpu will be set to cpu * request-factor.");

public static final ConfigOption JOB_MANAGER_MEMORY_REQUEST_FACTOR =

    key(" kubernetes.jobmanager.memory.request-factor")    .doubleType()



    "The request factor of memory used by job manager. "

     + "The resources request memory will be set to memory * request-factor.");

// taskmanager

public static final ConfigOption TASK_MANAGER_CPU_REQUEST_FACTOR =
    key( "kubernetes.taskmanager.cpu.request-factor")        .doubleType()



             "The request factor of cpu used by task manager. "

                + "The resources request cpu will be set to cpu * request-factor.");

public static final ConfigOption TASK_MANAGER_MEMORY_REQUEST_FACTOR =

    key(" kubernetes.taskmanager.memory.request-factor")        .doubleType()



             "The request factor of memory used by task manager. "

                + "The resources request memory will be set to memory * request-factor.");

2) Provide methods to obtain parameters in KubernetesJobManagerParameters and KubernetesTaskManagerParameters respectively .java

 KubernetesJobManagerParameters  public double getJobManagerCPURequestFactor() {
    final double  requestFactor =        flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU_REQUEST_FACTOR);    checkArgument(

        requestFactor <= 1,

        "%s should be less than or equal to 1.",        KubernetesConfigOptions.JOB_MANAGER_CPU_REQUEST_FACTOR.key());

    return requestFactor;


public double getJobManagerMemoryRequestFactor() {

    final double  requestFactor =        flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_MEMORY_REQUEST_FACTOR);    checkArgument(

        requestFactor <= 1,

        "%s should be less than or equal to 1.",        KubernetesConfigOptions.JOB_MANAGER_MEMORY_REQUEST_FACTOR.key());

    return requestFactor;



public double getTaskManagerCPURequestFactor () {
    final double requestFactor =        flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_REQUEST_FACTOR);    checkArgument(

        requestFactor <= 1,

        "%s should be less than or equal to 1.",        KubernetesConfigOptions.TASK_MANAGER_CPU_REQUEST_FACTOR.key());

    return requestFactor;


public double getTaskManagerMemoryRequestFactor() {

    final double  requestFactor =        flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_MEMORY_REQUEST_FACTOR);    checkArgument(

        requestFactor <= 1,

        "%s should be less than or equal to 1.",        KubernetesConfigOptions.TASK_MANAGER_MEMORY_REQUEST_FACTOR.key());

    return requestFactor;



KubernetesUtils.getResourceRequirements() method make the following changes, increase the request factor parameter

KubernetesUtils.jav a

/** * Get resource requirements from memory and cpu. *

 * @param mem Memory in mb.

 * @param memoryRequestFactor Memory request factor.
 * @param cpu cpu.
 * @param cpuRequestFactor cpu request factor.
 * @param externalResources external resources
 * @return KubernetesResource requirements.

public  static ResourceRequirements getResourceRequirements(
        int mem,
        double memoryRequestFactor,
        double cpu, double cpuRequestFactor, Map externalResources) { todo: CPU and memory set a factor respectively, the default is 0.5,

the resource set by the user is configured as limit; request = limit * factor

final quantity cpuQuantity = new Quantity(String.valueOf(cpu));
    final Quantity cpuRequestQuantity = new Quantity(String.valueOf(cpu * cpuRequestFactor));
    final Quantity memQuantity = new Quantity(mem + Constants.RESOURCE_UNIT_MB);
    final Quantity memRequestQuantity =
        new Quantity(((int) (mem * memoryRequestFactor)) + Constants.RESOURCE_UNIT_MB);

    ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder()

        .addToRequests(Constants.RESOURCE_NAME_MEMORY, memRequestQuantity)        .addToRequests(Constants.RESOURCE_NAME_CPU, cpuRequestQuantity)        .addToLimits(Constants.RESOURCE_NAME_ MEMORY, memQuantity)        .addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity);

    // Add the external resources to resource requirement.

    for (Map.Entry externalResource: externalResources.entrySet()) {
        final Quantity resourceQuantity = new  Quantity(String.valueOf(externalResource.getValue()));        resourceRequirementsBuilder            .addToRequests(externalResource.getKey(), resourceQuantity)            .addToLimits(externalResource.getKey(), resourceQuantity);

        LOG.info("Request external resource {} with config key {}.", resourceQuantity.getAmount(), externalResource.getKey());


    return resourceRequirementsBuilder.build();

4. Make the corresponding changes where InitJobManagerDecorator and InitTaskManagerDecorator call the above methods

final The configuration

can be defined in flink-conf.yaml as follows


kubernetes.jobmanager.cpu.request-factor: 0.5 
kubernetes.jobmanager.memory.request-factor: 0.8
kubernetes.taskmanager.cpu.request-factor: 0.5
kubernetes.taskmanager.memory.request-factor: 0.8

Of course, the user’s job submission parameters can also be overridden by the above parameters, and the final effect is as follows:



public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.

like + Looking, less bugs 👇