Flink relies on memory computing, and the lack of memory during the calculation process has a great impact on the execution efficiency of Flink. You can monitor the GC (Garbage Collection), evaluate the memory usage and remaining conditions, determine whether the memory has become a performance bottleneck, and optimize according to the situation.
Monitor the YARN container GC logs of node processes, and optimize GC if full GC occurs frequently.
GC configuration: In the “conf/flink-conf.yaml” configuration file of the client, add the parameter to the “env.java.opts” configuration item
:
-Xloggc: /gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX: GCLogFileSize=20M
GC logs have been added by default.
adjusts the ratio of the old and new generations. In the client’s “conf/flink-conf.yaml” configuration file, add the parameter to the “env.java.opts” configuration item: “-XX:NewRatio”. For example, “-XX:NewRatio=2” means that the ratio of the old age to the new generation is 2:1, the new generation accounts for 1/3 of the entire heap space, and the old generation accounts for 2/3.
When partitioning
causes data skew, you need to consider optimizing partitioning. Avoid non-parallelism operations, some operations on DataStream can result in no parallelism, such as WindowAll. keyBy tries not to use Strings.
parallelism of the operation scenario controls the number
of tasks, which affects the number of blocks that the data is divided into after the operation. Adjust the degree of parallelism so that the number of tasks and the data processed by each task are optimized with the processing power of the machine. View CPU usage and memory usage, when tasks and data are not evenly distributed among nodes, but concentrated on individual nodes, you can increase the degree of parallelism to make tasks and data more evenly distributed among nodes. Increase the parallelism of tasks and make full use of the computing power of the cluster machine, and the general parallelism degree is set to 2-3 times the sum of the number of cluster CPU cores.
The degree of parallelism of
task can be specified by the following four levels (in order of priority from highest to lowest), and the user can adjust the parallelism parameters according to the actual memory, CPU, data, and application logic.
The degree of parallelism of an operator, data source, and sink can be specified by calling the setParallelism() method, such as
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream text = [...] DataStream> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
The Flink program runs in the execution environment. The execution environment defines a default degree of parallelism for all executed operators, data sources, and data sinks.
The default degree of parallelism for the execution environment can be specified by calling the setParallelism() method. For example:
final StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream text = [...] DataStream> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
The degree of parallelism can be set when the client submits a job to Flink. For CLI clients, the degree of parallelism can be specified via the “-p” parameter. For example: ./bin/flink run -p 10 : /examples/WordCount-java.jar
can specify the default degree of parallelism for
all execution environments at the system level by modifying the “parallelism.default” configuration option in the “flink-conf.yaml” file in the Flink client conf directory.
Flink on YARN mode, there are two processes: JobManager and TaskManager. In the process of task scheduling and running, JobManager and TaskManager take on a lot of responsibility.
Therefore, the parameter configuration of JobManager and TaskManager has a great impact on the execution of Flink applications. You can optimize the performance of your Flink cluster by performing the following operations:
1.Configure JobManager memoryThe JobManager
is responsible for task scheduling, as well as message communication between TaskManager and RM. When the number of tasks increases and the parallelism of tasks increases, the JobManager memory needs to increase accordingly. You can set a suitable memory for JobManager according to the actual number of tasks.
jm MEM” parameter to set memory when using the yarn-session command.
When using the yarn-cluster command, add the “-yjm MEM” parameter to set the memory.
the number of TaskManagers
Each TaskManager can run a task at the same time per core, so increasing the number of TaskManagers is equivalent to increasing the concurrency of tasks. In the case of sufficient resources, the number of TaskManagers can be increased accordingly to improve operational efficiency.
add the “-
-
n NUM” parameter to set the number of TaskManagers when using the yarn-session command.
-
When using the yarn-cluster command, add the “-yn NUM” parameter to set the number of TaskManagers.
3. Configure the number of TaskManager
Each TaskManager multiple cores can run multiple tasks at the same time, which is equivalent to increasing the concurrency of tasks. However, since all cores share the memory of the TaskManager, it is necessary to balance the memory and the number of cores.
add the “-
-
s NUM” parameter to set the number of slots when using the yarn-session command.
-
When using the yarn-cluster command, add the “-ys NUM” parameter to set the number of SLOTs.
memory
The memory of TaskManager is mainly used for task execution, communication, etc. When a task is large, it may require more resources, so the memory can be increased accordingly.
will add the “-
-
tm MEM” parameter to set memory when using the yarn-sesion command.
-
When using the yarn-cluster command, the “-ytm MEM” parameter will be added to set the memory.
The operation
is reasonably designed and partitioned, and the task sharding can be optimized. In the process of program writing, it is necessary to partition evenly as much as possible, so that the data of each task is not skewed, and the entire task is not executed due to the long execution time of a task.
following are several partitioning methods
-
shuffle partitioning: the elements are randomly partitioned. dataStream.shuffle();
-
Rebalancing (Round-robin partitioning): Partitions elements based on round-robin so that each partition is responsible for balancing. Useful for performance optimizations where there is data skew. dataStream.rebalance();
-
Rescaling: Partitions elements into a subset of downstream operations in the form of round-robin. This is useful if you want to distribute data from each parallel instance of a source into some subset of mappers to spread the load, but you don’t want to fully rebalance (introducing rebalance()). dataStream.rescale();
-
Broadcast: Broadcasts each element to all partitions. dataStream.broadcast();
-
Custom partitioning: Using a user-defined partitioner to select the target task for each element, because users are more familiar with their own data, they can partition according to a certain feature to optimize task execution. A simple example is as follows:
fromElements constructs a simple Tuple2 stream DataStream > dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));// Define the key value used for partitioning, return which partition belongs to, the value plus 1 is the id number of the corresponding subtask Partitioner > strPartitioner = new Partitioner>() { @Override public int partition(Tuple2 key, int numPartitions) { return (key.f0.length() + key.f1) % numPartitions; }};// key value for partitioning with Tuple2 dataStream.partitionCustom(strPartitioner, new KeySelector, Tuple2>() { @Override public Tuple2 getKey(Tuple2 value) throws Exception { return value; }}).print();
Flink communication mainly relies on netty network, so in the process of Flink application execution, netty settings are particularly important, the quality of network communication directly determines the speed of data exchange and the efficiency of task execution.
The following configurations can be modified and adapted in the client’s “conf/flink-conf.yaml” configuration file, the default is already a relatively good solution, please modify it carefully to prevent performance degradation.
“taskmanager.network.netty.num-arenas”: The default is “taskmanager.numberOfTaskSlots”, which indicates the number of netty’s domains.
“taskmanager.network.netty.server.numThreads” and “taskmanager.network.netty.client.numThreads”: the default is “taskmanager.numberOfTaskSlots”, Represents the number of threads setting on the client and server side of netty.
“taskmanager.network.netty.client.connectTimeoutSec”: The default is 120s, which indicates the timeout for the taskmanager’s client connection.
>”taskmanager.network.netty.sendReceiveBufferSize”: The default is the system buffer size (cat /proc/sys/net/ipv4/tcp _ [rw]mem), usually 4MB, Represents the buffer size of netty’s send and receive.
“
-
taskmanager.network.netty.transport”: The default is “nio” mode, which indicates the transmission mode of netty, and there are two ways: “nio” and “epoll”.
When the data is
skewed (a certain part of the data is particularly large), although there is no GC (Gabage Collection), the task execution time is seriously inconsistent.
the key
-
needs to be redesigned to rationalize the task size with a smaller-grained key.
-
Modify the degree of parallelism.
-
Call the rebalance operation to partition the data evenly.
timeout setting
-
Since there is data exchanged over the network during task execution, the buffer timeout period for data passed between different servers can be set by setBufferTimeout.
-
When setBufferTimeout(-1) is set, it will wait for the buffer to be full before flushing to achieve maximum throughput; When “setBufferTimeout(0)” is set, the delay can be minimized, and the data will be refreshed once received; When “setBufferTimeout” is set to be greater than 0, the buffer times out after that time and then the buffer is flushed. An example can be referred to as follows: env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
1. What is checkpoint Simply put, Flink
periodically persists the state in order to achieve fault tolerance and exactly-once semantics, and this persistence process is called checkpoint, which is a snapshot of the global state of Flink Job at a certain time.
When we want to implement a global state preservation function for the distributed system, the traditional scheme will introduce a unified clock, broadcast to each slaves node through the master node in the distributed system, and when the nodes receive this unified clock, they record their current state.

However, there are also certain problems in the way of unifying the clock, a certain node GC time is relatively long, or the master and slaves network at that time there are fluctuations that cause clock transmission delay or transmission failure, which will cause the slave and other machines to have data inconsistencies and eventually lead to brain splitting. If we want to solve this problem, we need to do HA (High Availability) for master and slaves. However, the more complex a system is, the more unstable and costly it is to maintain.
Flink puts all the checkpoints into a stream called Barrier.

The above figure is an example of a barrier, from the first task upstream to the last task downstream, every time the task passes through the blue fence in the figure, the save snapshot function will be triggered. Let’s illustrate it briefly with an example.

This is a simple ETL process, first we take the data from Kafka for a trans conversion operation, and then send it to a downstream Kafka.
In this example, no chaining tuning is performed. So at this time, the forward strategy is used, that is, “the output of a task is only sent to one task as input”, which also has the advantage that if both tasks are in a JVM, unnecessary network overhead can be avoided.
Set Parallism to 2, the DAG diagram is as follows:



Every Flink job will have a JobManager, and there will be a checkpoint coordinator in the JobManager to manage the entire checkpoint process, we can set a time interval for the checkpoint coordinator to send a checkpoint event to each source task in the container. That is, the first task (corresponding to task1, task2 in the parallel graph).
When a Source operator receives a Barrier, it will suspend its own data processing, then make its current state into a snapshot (snapshot), save it to the specified persistent storage, and finally send an ack (Acknowledge character — confirmation character) asynchronously to the CheckpointCoordinator. At the same time, broadcast the barrier to all downstream operators and resume its own data processing.
Each operator continues to make a snapshot according to the above and broadcast it downstream until finally the Barrier is passed to the sink operator, at which point the snapshot is made. At this time, it should be noted that the upstream operator may be multiple data sources, and the corresponding multiple barriers need to be all aligned before triggering the checkpoint at one time, so when encountering a long checkpoint time, it may be caused by the data alignment taking a long time.

As shown in the figure, this is the initialization stage of our container container, e1 and e2 are the data that has just been consumed from Kafka, and at the same time, the CheckpointCoordinator also sends a barrier to it.
At this point, Task1 completes its checkpoint process, the effect is to record offset as 2(e1, e2), and then broadcast Barrier to the downstream operator, the input of Task3 is the output of Task1,
now suppose that the function of my program is the number of statistics, at this time the checkpoint effect of Task3 is to record the number of data as 2 (because from Task1 The data is e1 and e2), and then the barrier is broadcast down, and when the barrier is passed to the sink operator, the snapshot is completed.
At this time, a steady stream of data will be generated in the source and new checkpoints will be generated, but if the container is down and restarted, data recovery is required. In the checkpoint just completed, the offset is 2 and the count is 2, so we will restore according to this state. At this point, Task1 will start consuming from e3, which is the Recover operation.
■ Precautions for checkpoint

3 points to note below can affect the throughput of the system In the actual development process, it is necessary to pay attention to
:
The following content is a summary of the Flink Chinese community, for your reference: Flink
homework problem positioning
1. Problem location mantra
“one pressure, two checks and three indicators, delayed throughput is the core.” Always pay attention to the amount of resources, and look at the GC first. “
One pressure refers to back pressure,
when encountering problems, first look at the back pressure, the second check refers to the checkpoint, whether the time to align the data is very long, whether the state is very large, these are closely related to the system throughput, the three indicators refer to some of the display of Flink UI, our main focus is actually latency and throughput, system resources, and GC logs.
-
look at backpressure: Usually the downstream of the last pressed subTask is one of the bottlenecks of the job.
Look at the checkpoint duration
-
: The checkpoint duration can affect the overall throughput of the job to a certain extent.
-
Look at the core indicators: indicators are the basis for accurate judgment of a task’s performance, and latency indicators and throughput are the most critical indicators.
-
Resource utilization: Improving resource utilization is the ultimate goal.
■ Common performance problems

-
When focusing on back pressure, people often ignore the performance problems caused by the serialization and deserialization of data.
Some data structures,
-
such as HashMap and HashSet, which require key data structures to be hash calculated, use keyby to operate when the amount of data is large, and the performance impact is very large.
-
Data skew is our classic problem, which will be expanded upon later.
-
If our downstream is MySQL, HBase, etc., we will carry out a batch operation, that is, let the data be stored in a buffer, and then send it when certain conditions are met, the purpose of this is to reduce the interaction with the external 5. system, reduce the cost of network overhead.
-
Frequent GC, whether it is CMS or G1, when GC is performed, it will stop the entire job running, and the longer GC time will also cause JobManager and TaskManager to have no way to send heartbeats on time, at which time JobManager will think that this TaskManager is out of contact, and it will open a new TaskManager
-
A window is a means by which infinite data can be sliced into finite chunks. For example, we know that when using sliding windows, the overlapping problem of data, size = 5min Although it does not belong to the category of large windows, step = 1s means that data processing is carried out once in 1 second, which will cause high overlap of data and a large amount of data.

We can deduplicate some data structures, such as Set or Map, in combination with Flink state. However, these deduplication schemes will continue to increase with the amount of data, resulting in a sharp decrease in performance, such as the write performance problem caused by the hash collision we just analyzed, the GC problem caused by excessive memory, and the disconnection problem of TaskManger.

Scheme 2 and 3 are also deduplicated through some data structure means, and interested students can go down to understand it by themselves and will not expand it here.

data skew is a high-frequency problem that everyone encounters , there are many solutions.
The first scenario is when our concurrency is set lower than the number of partitions, which will cause the above-mentioned uneven consumption.

The second mentioned is the case where the keys are unevenly distributed, and their distribution can be broken up by adding random prefixes so that the data is not concentrated in several tasks.
Aggregate the same key locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, each node will only have one identical key locally, because multiple identical keys are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thereby reducing disk IO and network transfer overhead.

Flink’s memory structure has just been mentioned, so we know that the tuning aspect is mainly for non-heap memory Network buffer, manager pool and heap memory tuning, which are basically controlled by parameters.
We need to adjust these parameters according to our own situation, and here are only some suggestions. And for ManagerBuffer, Flink’s streaming jobs don’t use too much memory now, so we’ll all set it to be smaller than 0.3.

The tuning of heap memory is about the JVM, the main thing is to change the default garbage collector to G1, because the default Parallel Scavenge has a serialization problem for the old GC, its Full GC takes a long time, the following is some introduction to G1, there is a lot of information on the Internet, I will not expand it here.



public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇