Due to the downtime of a physical machine, many jobs in the real-time cluster have failed, most of which can be successfully failed, and some jobs in a certain department have been failover and have never been successful, go to the WebUI to view the job exception log as follows:

 2021-11-09 16:01:11java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException at java.util.concurrent.CompletableFuture.encodeThrowable( CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)

 at java.util.concurrent.CompletableFuture$ AsyncSupply.run(CompletableFuture.java:1592)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)

 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.reflect.UndeclaredThrowableException

 at com.sun.proxy. $Proxy54.submitTask(Unknown Source)

 at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:72)

 at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$10(Execution.java:756)

 at java.util.concurrent.CompletableFuture $AsyncSupply.run(CompletableFuture.java:1590) ... 7 moreCaused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke( AkkaInvocationHandler.java:134) ... 11 more

Resolving Exceptions We

extracted key information from the above exception log:

 Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize. 

It seems that the RPC message size exceeds the maximum value of the default akka framesize, so let’s take a look at the default value of this value, from the website we can see that the default size of this value is “10485760b”, and the description of this parameter is:

 Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limitthen you should increase it. The message size requires a size-unit specifier. 

Translated, this parameter is the maximum message size allowed for communication between JobManager and TaskManagers, and if the Flink job exceeds this value because of the communication message size, you can solve it by increasing the size of this value, which requires specifying a unit.

Analysis of reasons Flink

uses Akka as an RPC framework between components (JobManager/TaskManager/ResourceManager), the maximum size of messages sent between JobManager and TaskManagers defaults to 10485760b, If the message exceeds this limit, it will fail and an error will be reported. This can be seen in the source code where the exception is thrown:

protected RpcInvocation createRpcInvocationMessage(String methodName, Class[] parameterTypes, Object[] args) throws IOException {    Object rpcInvocation; 

    if (this.isLocal) {

        rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);

    } else {

        try {            RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(methodName, parameterTypes, args);

if (remoteRpcInvocation.getSize() > this.maximumFramesize) {

// Exception location

throw new IOException( "The rpc invocation size exceeds the maximum akka framesize.");

            }            rpcInvocation = remoteRpcInvocation;        } catch (IOException var6) {

            LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", var6);

            throw var6;        }    }

    return (RpcInvocation)rpcInvocation;

As for why the RPC message size between JobManager and TaskManager is so large, the initial explanation is that after the task has an exception, it needs to call the RPC interface updateTaskExecutionState (TaskExecutionState, taskExecutionState) to notify Flink The jobmanager changes the state of the corresponding task and restarts the task. However, there is an error attribute in the taskExecutionState parameter, when my task typed too many error stacks, after serialization, it exceeded the maximum data size required by the rpc interface (that is, the maximum akka framesize), resulting in the call to updateTaskExecutionState The rpc interface failed, and the jobmanager could not know the task Already in the fail state, it can’t be restarted, and then it leads to a chain reaction.

The workaround

task stops, add the akka.framesize parameter to flink-conf.yaml, and increase the value.

akka.framesize: "62914560b"

Then restart the task, you can observe the Jobmanager Configration to see if the parameters take effect.




public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.

like + Looking, less bugs