Flink’s side output provides us with the function of side stream (shunting) output, according to the conditions can divide a stream into a number of different streams, and then do different processing logic, the following look at the lower side stream output related source code.

Let’s look at the demo below, a stream is divided into 3 streams, one mainstream, and two side stream outputs.

To see each operator more clearly, I disabled the operator chain, and the DAG diagram for the task looks like this:

This makes it clearer, and it is clear that starting with the process operator, 1 data stream is divided into 3 data streams, of course, it is not forbidden by default

All operators of an operator chain are chained together.

Let’s first look at the source code for the first mainstream output, which is out.collect(value), where out is actually a TimestampedCollector object.

Holding an output object in the collect method that is used to output data, in this case it is actually a CountingOutput It is an object wrapped in Output, which is mainly used to update the metric that sends the data and output the data.

There is also an output object in CountingOutput, but the output here is the BroadcastingOutputCollector object, and it can be seen from the name that it broadcasts data downstream. If you broadcast the data downstream, isn’t that the data in every downstream data stream? How does this achieve diversion? With this question in mind, let’s look at how BroadcastingOutputCollector’s collect method is implemented.

In the BroadcastingOutputCollector object also holds an output object, in fact, they all implement the Output interface, used to send data downstream, where outputs is an Output array, representing all Outputs downstream, because there are three output streams above, so the array contains 3 Output objects. The loop calls the collect method of output to send data downstream, because I interrupted the operator chain, so the process operator and the downstream Print operator are not in the same operatorChain, so the data transfer between the upstream and downstream operators uses RecordWriterOutput, otherwise CopyingChainingOutput or ChainingOutput, which Output is used is not much introduced here, and will be introduced separately if there is time later.

Then look at the RecordWriterOutput collect method, in the collect method will first determine whether the outputTag is empty, if it is not empty and do not do any processing, directly return, otherwise the data will be pushed to the downstream operator, only the side stream output needs to define the outputTag, the mainstream (normal stream) is no outputTag, so here will go pushToRecordWriter The method writes the data to the downstream, that is, although the data will be broadcast to all downstream in the form of broadcasting, in fact, the other two side streams are directly returned, and only the mainstream will push the data downstream, which explains the above doubts.

Then look at the source code for the second side stream output ctx.output(test, value), where ctx is actually a ProcessOperator#ContextImpl object.

If the outputTag is empty, throw the exception directly, because this is a side flow, so the OutputTag must be defined. The output here is actually a variable held by the parent class AbstractStreamOperator, if the outputTag is not empty, call the collect method of output to send the data downstream, where output is the same as above CountingOutput but the collect method is another overloaded method.

It can be found that this collect method has one more OutputTag parameter than the one above, that is, the OutputTag object defined when using the side stream output, and then calls the collect method of output to send data, which is also the same as above, which is also another overloaded method of the BroadcastingOutputCollector object, with an additional OutputTag parameter.

The logic here is the same as above, the same loop calls the collect method to send data.

In this collect method, the incoming OutputTag object and the member variable this.outputTag are first judged to be equal, and if so, the data is sent, otherwise no processing is done, so only one downstream stream output data is selected each time, which realizes the so-called shunting.

You can see that in the isResponsibleFor method, the equals method of the OutputTag is directly called to determine whether the two objects are equal.

The third side stream test1 ctx.output(test1, value) is exactly the same as the second side stream test, so I am not looking at the code here.

The above is the completion of the shunt operation, so how to get the result after the shunt (data flow)? We can get it via the getSideOutput method.

The getSideOutput method first builds a SideOutputTransformation object, and then builds a DataStream object, so that we can do different processing logic based on the DataStream after the shunt, thus realizing the function of splitting a DataStream into multiple DataStreams.

By parsing the source code of the side stream output, when the diversion, the data is sent to the downstream operator by broadcast, for the mainstream data, only the OutputTag is empty will be processed, the side stream because the OutputTag is not empty, so it returns directly, does not do any processing, that for the side stream data, it is to determine whether the two OutputTags are equal, so each time will only send the data to the downstream corresponding side stream, This enables the shunt logic.

If you think the article is helpful to you, please click on it and read it, your support is the biggest motivation for my creation.

 13