Flink core chapter, four cornerstones, fault tolerance, broadcasting, backpressure, serialization, memory management, resource management,

Flink basics, basic concepts, design concepts, architecture models, programming models, common operators

Flink source code, job submission process, job scheduling process, job

internal conversion flow chart 1, Flink job submission process

should be understood, right?

2. How many ways are there for Flink job submission?

When was the Flink JobGraph generated?

4. What is the process before JobGraph submits the cluster?

5, PipeExecutor, what are its implementation classes?

6. What are the characteristics of the Local submission mode and how to implement it?

7. What are the remote submission modes?

8, a brief introduction to the Standalone mode?

9. What are the characteristics of the yarn-session mode?

10. What are the characteristics of the yarn-perJob model?

11. What are the characteristics of the yarn-application mode?

12. What is the details of the yarn-session submission process?

13. What is the detailed introduction of yarn-perJob submission process?

14. What is the difference between flow diagram, operation diagram, and execution diagram?

15. What about StreamGraph?

16. What about the JobGraph?

17, Execution Graph (ExecutionGraph) introduction?

18. What about the concept of Flink scheduler?

19. How many types of Flink scheduling behavior are there?

20. How many types of Flink scheduling modes are there?

21. How many types of Flink scheduling strategies are included?

22. What are the states of the Flink job life cycle?

23. What are the states of Task job life cycle?

24. What about Flink’s task scheduling process?

25. What does Flink’s task slot mean?

26. What does Flink slot sharing mean?

1. Should you understand the Flink job submission process?

Flink submission process:

In Flink

Client, start the main function in the jar by reflection, generate Flink StreamGraph and JobGraph, submit the JobGraph to the Flink cluster and

receive the JobGraph (received by the JobManager), Translate JobGraph into ExecutionGraph, then start scheduling, and start consuming data after a successful startup.

To sum up: Flink core execution flow, calls to user APIs can be turned into StreamGraph -> JobGraph -> ExecutionGraph.

2. How many ways are there for Flink job submission?

Flink’s job submission is divided into two ways

: local submission mode, which runs code directly on IDEA.

Remote submission mode:

Standalone mode, yarn mode, K8s

mode Yarn mode

is divided into three submission modes: Yarn-perJob mode, Yarn-Sessionmo mode, Yarn-Application mode

3, when was the Flink JobGraph generated?

StreamGraph and JobGraph are all generated on the Flink Client client, i.e. before the cluster is committed, and the schematic is as follows:

4. What is the process before JobGraph submits the cluster?

(1) Users can start the Flink cluster, submit jobs using the command line, and run the flink run -c WordCount xxx .jar

(2) After running the command line, the CliFrontend entry will be called through the run script, and CliFrontend will trigger the main method in the jar file submitted by the user, and then handed it to the PipelineExecuteor # execute method, and finally trigger a specific PipelineExecutor execution according to the submitted mode.

(3) According to the specific PipelineExecutor execution, the user’s code will be compiled to generate a streamGraph, and the jobgraph will be generated after optimization.

5, PipeExecutor, what are its implementation classes?

PipeExecutor is called pipeline executor in Flink, it is an interface, is Flink Client after generating JobGraph, the job to the cluster is an important link, as mentioned earlier, there are several ways to submit jobs to the cluster, the most commonly used is yarn mode, yarn mode contains 3 submission modes, mainly using session mode, perjob mode, Application mode, jobGraph is generated in a cluster.

So the implementation class of PipeExecutor is shown in the following figure:

class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RsD5RGfu0dhFwM4ZF2M6KsMeHuQpr21tSQD6iahNzibKUdbuk4ZlmmUzic1eAVs9a47UeTDlicBuS1XkQ/640?wx_fmt=png”>

In addition to the two modes of the above box, LocalExecutor is used when running Flink MiniCluster for debugging in an IDEA environment.

Local is a commit method that runs in the local IDEA environment. Not on the cluster. Mainly used for debugging, the schematic diagram is as follows:

the Flink program is submitted by JobClient

The JobClient submits jobs to the JobManager

and

is responsible for coordinating resource allocation and job execution. After the resource allocation is completed, the task is submitted to the corresponding TaskManager

The TaskManager starts

a thread to start execution, and the TaskManager reports the status change to the JobManager, such as Execution Started, In Progress or Completed.

After the job execution completes, the results are sent back to the client.

Source code analysis:

(1) Create and obtain the corresponding

StreamExecutionEnvironment object through

Flink 1.12.2 source code analysis:

LocalStreamEnvironment calls the

execute method of the StreamExecutionEnvironment object

(

2) get streamGrap h

(3) Execute specific PipeLineExecutor -> to get localExecutorFactory

(

4) Get the JobGraph

Generate the JobGraph based on the localExecutorFactory’s implementation class LocalExecutor

The above part is all generated in Flink Client, since it is submitted using Local mode. All the next step is to create a MiniCluster cluster, which is instantiated by the miniCluster.submitJob specifying the jobGraph

(5)

to commit

(

6) returns JobClient

The client executes miniCluster.submitJob above After submitting the JobGraph to the local cluster, it returns a JobClient client that contains some details of the application, including JobID, application status, and so on. Finally, return to the upper level of code execution, corresponding to the class StreamExecutionEnvironment.

The above is the source code execution process in Local mode.

7. What are the remote submission modes?

Remote submission mode: divided into Standalone mode,

yarn mode, K8s

mode

Standalone: including session

mode Yarn mode

is divided into three submission modes: Yarn-perJob mode, Yarn-Sessionmo mode, Yarn-Application mode.

K8s mode: including session

mode

8, Standalone mode briefly introduced?

Standalone mode is

a stand-alone version of Flink cluster, which uses only one node for submission, and is commonly used in Session mode.

The job submission schematic is as follows:

10. What are the characteristics of the yarn-perJob model?

Submit command

: ./

bin/flink run -t yarn-per-job –detached xxx.jar Yarn-Per-Job

mode: Each job starts the cluster separately, with good isolation, JM load balancing, and the main method is executed on the client. In per-job mode, each job has a JobManager, and each TaskManager has only a single Job.

Features:

A task will correspond to a job, each submitted job will be according to its own situation, will be to yarn separately to apply for resources, until the job execution is completed, the failure of one job will not affect the normal submission and operation of the next job. Exclusive access to Dispatcher and ResourceManager, accepting resource requests on demand; Suitable for large-scale, long-running jobs.

11. What are the characteristics of the yarn-application mode?

Submit command

: ./

bin/flink run-application -t yarn-application xxx.jar

Yarn-Application mode: Each job starts the cluster separately, with good isolation, JM load balancing, and the main method is executed on the JobManager.

Features

: In both yarn-per-job

and yarn-session modes, clients need to perform the following three steps, namely:

obtain the dependencies required by the job;

By performing environment analysis and obtaining a logical plan, i.e. StreamGraph -> JobGraph;

Upload dependencies and JobGraph into the cluster.

Only after this is done will the Flink runtime be triggered by the env.execute() method to actually start executing the job.

If all users submit jobs on the same client, larger dependencies consume more bandwidth, and more complex job logic translated into JobGraph also needs to eat more CPU and memory, and the client’s resources will become a bottleneck.

To solve it, the community implemented the Application model on top of the traditional deployment model. The three things that originally needed the client are transferred to the JobManager, that is, the main() method is executed in the cluster (the entry point is located at the ApplicationClusterEntryPoint), and the client only needs to be responsible for initiating the deployment request

> sum up The Flink community recommends using yarn-perjob or yarn-application mode for submitting applications.

The submission flowchart is as follows:

1 Start the cluster

(1) Flink Client submits task information to Yarn ResourceManager.

Flink Client uploads application configurations (Flink-conf.yaml, logback.xml, log4j.properties) and related files (Flink Jars, configuration class files, user jar files, JobGraph objects, etc.) to the distributed storage HDFS.

Flink Client submits task information to

Yarn ResourceManager

(2) Yarn starts the Flink cluster and does 2 steps:

Submit a Flink request to create a cluster through Yarn Client to Yarn ResourceManager, and Yarn ResourceManager allocates container resources and notifies the corresponding NodeManager to start an ApplicationMaster (an applicationMaster will be started for each flink job submitted). ApplicationMaster will contain the JobManager currently to be started and Flink’s own internal ResourceManager.

Run YarnJobClusterEntryPoint in the JobManager process as the entry point for cluster launch.

Initialize the Dispatcher, the ResourceManager that Flink itself wants to use internally, start the relevant RPC service, and wait for the Flink Client to submit the JobGraph through the Rest interface.

Job submission

(3) ApplicationMaster starts Dispatcher, and Dispatcher

starts ResourceManager and JobMaster (this step is different from Session, JobMaster is pulled up by Dispatcher, not Client).

(4) JobMaster is responsible for job scheduling, managing the life cycle of jobs and Tasks, and building ExecutionGraph (parallelized version of JobGraph, the most core data structure

of the scheduling layer) After the

above two steps are executed, the job enters the scheduling execution stage.

Job scheduling execution

(5) JobMaster applies for resources from the ResourceManager and starts scheduling the ExecutionGraph.

(6) The ResourceManager adds the resource request to the waiting queue and starts the TaskManager process by applying for a new container to YarnResourceManager through a heartbeat.

(7) YarnResourceManager starts, then loads Jar files and other related resources from HDFS, starts TaskManager in the container, TaskManager starts TaskExecutor

(8) After TaskManager starts, register with ResourceManager, And report your Slot resources to the ResourceManager.

(9) The ResourceManager takes out the slot request from the waiting queue, confirms the availability of resources with the TaskManager, and tells the TaskManager which JobMaster to assign the slot to.

(10) The TaskManager replies to the JobMaster that one of its slots belongs to you, and JobMaser caches the slot to the SlotPool.

(11) JobMaster schedules Task to execute on TaskMnager’s Slot.

14. What is the difference between flow diagram, operation diagram, and execution diagram?

Flink internal Graph overview diagram, because Flink

now implements stream batch integration code, the Batch API is basically abandoned, not too much introduction In the Flink DataStramAPI, the Graph internal transformation diagram is as follows:

Taking WordCount as an example, the Task scheduling between flow graph, job graph, execution graph, and physical execution graph is as follows:

class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RsD5RGfu0dhFwM4ZF2M6KsMeILzQoBE36gvRRVpqMkw2Z5DXjpS7YemrBgOGIVPT5PW5rfmuBicwMg/640?wx_fmt=png”>

For Flink stream computing applications, when running user code, first call the DataStream API to convert the user code into Transformation, then go through the three layers of transformation: StreamGraph->JobGraph->ExecutionGraph (these are all Flink’s built-in data structures), and finally go through Flink scheduling execution to start computing tasks in the Flink cluster to form a physical execution graph.

15. What about StreamGraph?

Flow graph StreamGraph

class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RsD5RGfu0dhFwM4ZF2M6KsM9CL71DhRjiaOzQ0ZnjfuUiazG5W75zfVGvicbcXhblDTocrxdrPwdeoKw/640?wx_fmt=png” >

flow graph StreamGraph The core objects include two:

StreamNode point and StreamEdge

edge 1)

StreamNode point StreamNode

point, converted from Transformation, can be simply understood as StreamNode represents an operator, there are entities and virtual, there can be multiple inputs and outputs, the entity StreamNode eventually becomes a physical operator, The virtual one is attached to the edge of the StreamEdge.

2)

StreamEdge

Edge is the edge of the StreamGraph, used to connect two StreamNode points, a StreamEdge can have multiple outbound, inbound, and other information.

16. What about the JobGraph?

Job Graph

JobGraph

JobGraph is optimized by StreamGraph, which merges operators through the OperationChain mechanism, and schedules on the same Task thread during execution to avoid cross-thread and cross-network transmission of data.

Job graph JobGraph core objects include three:

JobVertex points,

JobEdge edges, IntermediateDataSet intermediate dataset1) JobVertex points

After operator fusion optimization, multiple StreamNodes that meet the criteria may fuse together to produce a JobVertex, that is, a JobVertex contains one or more operators, and the input of JobVertex is JobEdge. The output

is IntermediateDataSet

2) JobEdge side JobEdge

represents a data transfer channel in the JobGraph, whose upstream data source is IntermediateDataSet and downstream consumer is JobVertex. That is, the data is passed from the IntermediateDataSet to the target JobVertex JobEdge through the JobEdge

,

which directly affects the data connection relationship between the tasks at the time of execution, whether it is a point-to-point connection or a full connection.

3) IntermediateDataSet Intermediate DataSet Intermediate Data

Set IntermediateDataSet is a logical structure used to represent the output of JobVertex, that is, the data set generated by operator processing. Different execution modes, the corresponding result partition types are different, which determines the mode of data exchange at the time of execution.

17、 What about the ExecutionGraph?

ExecutionGraph

ExecutionGraph

is the core data structure for scheduling Flink job execution, including all the task information of parallel execution in the job, the association relationship between Tasks, and the data transfer relationship.

Both StreamGraph and JobGraph are generated in Flink Client and then handed off to the Flink cluster.

JobGraph to ExecutionGraph is done in JobMaster, and the important changes in the conversion process are as follows: the

concept of parallelism is added, it becomes a truly schedulable graph structure, and 6 core objects are generated.

ExecutionGraph Core objects include 6:

ExecutionJobVertex, ExecutionVertex, IntermediateResult, IntermediateResultPartition, ExecutionEdge, and Execution.

1) ExecutionJobVertex

This object corresponds to JobVertex in the JobGraph. The object also contains a set of ExecutionVertex with an amount consistent with the parallelism of the StreamNodes contained in the JobVertex, assuming that the parallelism of the StreamNode is 5, then ExecutionJobVertex will also contain 5 ExecutionVertex.

ExecutionJobVertex is used to encapsulate a JobVertex into ExecutionJobVertex and create ExecutionVertex, Execution, IntermediateResult, and IntermediateResultPartition to enrich the ExecutionGraph.

2)

ExecutionVertex

ExecutionJobVertex parallelizes the job and constructs instances that can be executed in parallel, and each instance of parallel execution is ExecutionVertex.

3) IntermediateResult IntermediateResult

is

also called intermediate result set, this object is a logical concept representing ExecutionJobVertex output, and IntermediateDalaSet in JobGrap corresponds to one-to-one, and likewise a ExecutionJobVertex can have multiple intermediate results, depending on the present JobVertex has several JobEdges.

4) IntermediateResultPartition

IntermediateResultPartition

is also known as intermediate result partition. Represents the output of an ExecutionVertex partition, associated with an Execution Edge.

5) ExecutionEdge

represents the input of ExecutionVertex, connected to the upstream generated IntermediateResultPartition, an Execution corresponds to a unique IntermediateResultPartition and an ExecutionVertex, An ExecutionVertex can have multiple ExecutionEdges.

6)

Execution ExecutionVertex is

equivalent to the template of each task, and when it is actually executed, it will wrap the information in ExecutionVertex as an Execution, executing an attempt at ExecutionVertex.

In the event of a failure or data recalculation, ExecutionVertex may have multiple ExecutionAttemptIDs. An Execution is uniquely identified by the ExecutionAttemptID.

The deployment of tasks and the

update of task status between JM and TM are summarized by ExecutionAttemptID to determine the message recipient,

and the following points can also be seen from these basic concepts:

Since each JobVertex may have multiple IntermediateDataSets, each ExecutionJobVertex may have multiple IntermediateResults, so each ExecutionVertex may also contain multiple IntermediateResultPartitions;

The main function of ExecutionEdge here is to connect ExecutionVertex and IntermediateResultPartition to represent the connection relationship between them.

18. What about the concept of Flink scheduler?

The scheduler is the core component of Flink job execution, managing all related processes of job execution, including JobGraph to ExecutionGraph conversion, job lifecycle management (job publishing, canceling, stopping), task lifecycle management (task publishing, canceling, stopping), resource request and release, job and Task Faillover, etc.

Scheduler function

: job

lifecycle management, such as

job release, suspend, cancel job

execution resource application, allocation

, release job status management, job release status

management, job release status change in the process of job release and job exception FailOver and other

job

information provision, external job details

scheduling has several important components:

Scheduler: SchedulerNG and its subclasses

, implementation class scheduling

strategy: SchedulingStrategy and its implementation class

scheduling

mode: ScheduleMode contains flow and batch scheduling, with its own different scheduling

modes19, Flink scheduling behavior contains several types?

There are four scheduling behaviors:

The SchedulerStrategy interface defines scheduling behaviors, which include four behaviors:

1) startScheduling: scheduling entry, triggering the scheduling behavior of the scheduler

(2) restartTasks: Restarting the Task that failed to execute, usually caused by the Task execution exception.

(3) onExecutionStateChange: When the execution state changes.

(4) onPartitionConsumable: When data in IntermediateResultPartition is available for consumption.

20. How many types of Flink scheduling modes are there?

There are three scheduling modes: Eager mode, phased mode (Lazy_From_Source), and phased slot reuse mode (Lazy_From_Sources_With_Batch_Slot_Request).

1) Eager scheduling

is suitable for stream computing. Request all the resources you need at once, and if there are not enough resources, the job will fail to start.

2) Phased scheduling

LAZY_FROM_SOURCES Suitable for batch processing. Starting from SourceTask, you schedule in stages, and when applying for resources, apply for all the resources required in this stage at once.

After the upstream task is executed, it starts scheduling and executing the downstream task, reading the upstream data, executing the calculation task of this stage, and after the execution is completed, scheduling the

task of the next stage is scheduled, and scheduling is carried out in turn until the job is completed.

3) Phased Slot Reuse Scheduling

LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch processing. Basically the same as staged scheduling, the difference is that this mode uses the batch resource request mode, which can execute jobs in the event of insufficient resources, but you need to ensure that there is no shuffle behavior in the job execution at this stage.

At present, the Eager mode in sight is the same as the resource request logic

of the LAZY_FROM_SOURCES mode, and LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST is a separate resource request logic.

21. How many types of Flink scheduling strategies are included?

There are three types of scheduling policies:

The scheduling strategy is all implemented in the scheduler SchedulingStrategy, and there are three implementations

: EagerSchedulingStrategy: suitable for

stream computing, and scheduling all tasks at the same time LazyFromSourcesSchedulingStrategy: suitable for

batch processing, vertices scheduling when the input data is ready (upstream processing).

PipelinedRegionSchedulingStrategy

: Scheduling at the granularity of the local part of the pipeline PipelinedRegionSchedulingStrategy

was added in 1.11, and from 1.12, it will be scheduled in pipelined regions.

Pipelined Region is a set of pipeline-connected tasks. This means that for a streaming job with multiple regions, it no longer waits for all tasks to get slots before starting the deployment task. Instead, any region can deploy it once it has enough task slots. For batch jobs, the task is not assigned a slot and the task is not deployed separately. Instead, once a region has enough slots, the task will be deployed in the same region as all other

tasks.22 What states does the Flink job lifecycle contain?

In a Flink cluster, the JobMaster is responsible for job lifecycle management, and the specific management behavior is implemented in the scheduler and ExecutionGraph.

The full lifecycle state transition for a job is shown in the following figure:

(1) The job is first created, then switches to the running state, and when all the work is completed, it switches to the finished state.

(2) In the event of failure, the job first switches to the failed state and cancels all running tasks.

If all nodes have reached a final state and the job is not restartable, the state transitions to failed.

(3) If the job can be restarted, then it will enter the restart state. Once the reboot is complete, it becomes created.

(4) When the user cancels the job, it will enter the cancelling state, which will cancel all currently running tasks. Once all running tasks have reached their final state, the job transitions to the canceled state.

The

finished, canceled, and failed states indicate a global finalization state and trigger cleanup, while the suspended state is only in the local terminated state. This means that the execution of the job is terminated on the corresponding JobManager, but another JobManager of the cluster can resume the job from the persistent HA storage and restart it. As a result, jobs that are in a suspended state will not be cleaned up completely.

23, What are the states of the job lifecycle of Task?”

The TaskManager is responsible for the lifecycle management of the Task and notifies the JobMaster of state changes, tracking the state changes of the Execution in the ExecutionGraph, one Execution for one Task.

The life cycle of a Task is as follows: There are 8 states in total.

During the execution of the ExecutionGraph, each parallel task goes through multiple stages, from creation to completion or failure, and the above diagram illustrates the states and possible transitions between them. A task can be performed multiple times (for example, failback).

Each execution tracks the execution of an ExecutionVertex, and each ExecutionVertex has a current execution and a precursor (prior execution).

24, Flink’s task scheduling process explained?

The task scheduling flowchart is as follows

25, What does Flink’s task slot mean?

Each TaskManager is a JVM process that can execute one or more subtasks in different threads. To control how many tasks a worker can receive. Workers are controlled by task slots (a worker has at least one task slot).

1

. Each task slot

in the task slot represents a fixed-size subset of resources owned by the TaskManager.

In general: the number of slots

we allocate is equal to the number of cores of the CPU, such as 8 cores, then 8 slots are allocated.

Flink divides a process’s memory into multiple slots.

There are 2 TaskManagers in the figure, each TaskManager has 3 slots, and each slot occupies 1/3 of the memory.

After the memory is divided into different slots, the following benefits can be obtained:

the maximum number of tasks that the

TaskManager can execute concurrently at the same time can be controlled, that is, 3, because the number of slots cannot be exceeded. The purpose of the task slot is to separate the task’s managed memory, and CPU isolation does not occur.

The slot has exclusive memory space so that multiple different jobs can be run in a TaskManager without affecting each other.

Summary: The number of task slots represents the number of tasks that the TaskManager can execute in parallel.

26, what does Flink slot sharing mean?

2. Slot sharing

By default, Flink allows subtasks to share slots, even if they are subtasks of different tasks, as long as they are from the same job. The result is a slot that can hold the job’s entire pipeline. Allowing slot sharing has two main benefits:

only the task slot with the highest parallelism in the job needs to be calculated. As long as this is satisfied, other jobs can also be satisfied.

More equitable distribution of resources. If there is a relatively free slot, more tasks can be assigned to it. If there is no task slot sharing in the figure, subtasks such as Source/Map with low load will occupy many resources, while window subtasks with high load will lack resources.

With task slot sharing, base parallelism can be increased from 2 to 6. Improved utilization of slotted resources. It also ensures that the TaskManager’s allocation of slots to subtasks is fairer.

end


 


 

public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.

like + Looking, less bugs 👇