Click on the topzhisheng” to follow, star or pin the foreword together with growth

The DataStream-based join in Flink can only be implemented for joining two data streams in the same window, but in practice, there are often data out-of-order or delays, resulting in inconsistent data progress between the two streams, and data crossing windows will occur, so the data cannot be joined in the same window. Flink is based on the interval join mechanism provided by KeyedStream, and intervaljoin connects two keyedStreams, according to the same key in a period of time relative to the data time.

The code example

associates the order flow with the order flow through the order ID to obtain the member ID in the order flow.

where ds1 is the order flow, ds2 is the order

flow, respectively, ds1 and ds2 through the order id keyBy operation, get two KeyedStreams, and then perform intervalJoin operations;

The between method passes two parameters, lowerBound and upperBound, which control the flow on the right can be associated with the stream on the left in which time range, namely: leftElement.timestamp + lowerBound <= rightElement.timestamp < = leftElement.timestamp + upperBound The equivalent of the left stream can be as late as the lowerBound (if lowerBound is negative) time or as early as the upperBound (if upperBound is positive) time.

DataStream ds = ds1.keyBy(jo -> jo.getString("fk_tgou_order_id"))
                .intervalJoin(ds2.keyBy(jo -> jo.getString( "id")))
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(new ProcessJoinFunction () {


                    public void processElement (JSONObject joItem, JSONObject joOrder, Context context, Collector collector) throws Exception {
                        String order_id = joItem.getString("fk_tgou_order_id");
                        String item_id = joItem.getString("activity_to_product_id");
                        String create_time = df.format(joItem.getLong("create_time"));
                        String member_id = joOrder.getString("fk_member_id");
                        Double price = joItem.getDouble("price");
                        Integer quantity = joItem.getInteger("quantity");
                        collector.collect(new OrderItemBean(order_id, item_id, create_time, member_id, price, quantity));                    }                }); FlinkKafkaProducer010("berkeley-order-item", schema, produceConfig));

Interval Join source

code<1> When using Interval Join, the time type must be specified as EventTime

<2> Both KeyedStreams use the process method after intervalJoin and call the between method; The process method passes a custom ProcessJoinFunction as a parameter, and the three parameters of ProcessJoinFunction are the element type of the left flow, the element type of the right flow, and the element type of the output stream.

<3> intervalJoin, the bottom layer is to connect two KeyedStreams to get ConnectedStreams, so that the state can be shared between the two data streams, for intervalJoin is that the data of the same key of the two streams can be accessed to each other. ConnectedStreams’ keyby????

<4> The operation performed on top of ConnectedStreams is the IntervalJoinOperator

there are two parameters that control whether to include the upper and lower bounds , which is included by default.

a.initializeState() method, which initializes two state objects,

is used to store the data of two streams separately. where Long corresponds to the timestamp of the data, List < BufferEntry> data

corresponding to the same timestamp

b. processElement1 and processElement2 method

method description is that when the two streams reach, such as the left stream has data arrived, go to the right stream to find the data in the corresponding upper and lower bounds. Both methods call the processElement method.

  private  void processElement(
            final StreamRecord record,
            final MapState>> ourBuffer,
            final MapState >> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean  isLeft)
 throws Exception 

        final THIS ourValue = record.getValue();

        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {

            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");        }

        if (isLate(ourTimestamp)) {

            return;        }        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry>> bucket: otherBuffer.entries()) {

            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||

                    timestamp > ourTimestamp + relativeUpperBound) {



            for (BufferEntry entry: bucket.getValue()) {

                if  (isLeft) {                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);

                } else {

                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);                }            }        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

        if (isLeft) {            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);

        } else {

            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);        } } (

1) Get the value and timestamp of the record,

determine whether it is delayed, when the timestamp of the arriving record is less than the water mark, it means that the data is delayed, not processed, and not associated with the data of another stream.

    private boolean isLate(long timestamp) {
        long  currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;    }

2) Add data to the MapState cache state corresponding to its own stream, and the key is the time of the data.

addToBuffer(ourBuffer, ourValue, ourTimestamp);

private static  void addToBuffer(
            final MapState>> buffer,
            final T value,
            final long timestamp)
 throws Exception 
{        List> elemsInBucket = buffer.get(timestamp);

        if (elemsInBucket == null) {

            elemsInBucket = new ArrayList<>();        }

        elemsInBucket.add(new BufferEntry<>(value, false));

        buffer.put(timestamp, elemsInBucket);    }

(3) to traverse the MapState of another stream, if ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound, output the data to the ProcessJoinFunction call, ourTimestamp represents the time of the incoming data, timestamp represents the corresponding join data time

for (Map.Entry>> bucket: otherBuffer.entries()) {
final long timestamp  = bucket.getKey();

   if (timestamp < ourTimestamp + relativeLowerBound ||

           timestamp > ourTimestamp + relativeUpperBound) {



   for (BufferEntry entry: bucket.getValue()) {

       if (isLeft) {           collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);

       } else {

           collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);       } }} corresponding



private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
    final long  resultTimestamp = Math.max(leftTimestamp, rightTimestamp);    collector.setAbsoluteTimestamp(resultTimestamp);    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);    userFunction.processElement(left, right, context, collector); Set


resulting timestamp to the largest of the two streams, and then execute the processElement method

4) Register the scheduled cleanup time

long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; 
if (isLeft) {    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);

else {

    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); }

Scheduled cleaning time is the time of the current incoming data + relativeUpperBound, and when the watermark is greater than this time, it needs to be cleaned.

public void onEventTime(InternalTimer timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();

        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);

        switch (namespace) {

            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L ) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);                leftBuffer.remove(timestamp);



            case CLEANUP_NAMESPACE_RIGHT: {

                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);                rightBuffer.remove(timestamp);




                throw new RuntimeException("Invalid namespace " + namespace);        } } } Clean



Suppose that the timestamp of the data that the current stream arrives is 10s, and the time passed in by between is 1s, 5s, upperBound is 5s, and lowerBound is 1s

According to the left flow timestamp +1s< = right timestamp< = left flow timestamp +5s; Right timestamp -5s< = left stream timestamp< = right timestamp -1s a。 If the left flow data arrives, call the processElement1 method, at this time relativeUpperBound is 5, relativeLowerBound is 1, relativeUpperBound >0, so the timing cleaning time is 10+5, that is, 15s, when the time reaches 15s, clear the left flow data, that is, when the right stream is in 15s, the left flow time range that needs to be found is 10s<=left flow timestamp<=14s, so watermark > 15s can clear data for 10s.

b。 If data arrives for the right stream, call the processElement2 method
At this time, relativeUpperBound is -1, relativeLowerBound is -5, and relativeUpperBound <0, so the timing cleaning time is 10s When the time reaches 10s, clear the data of the right stream, that is, when the left stream is in 10s, you need to find the time range of the right stream 11s<=right stream timestamp<=15s, so you can clear the data for 10s.

Original address:

Reply to the official account (zhisheng) Face Scripture, ClickHouse, ES、Flink、 Spring, Java, Kafka, monitoring < keywords such as span class="js_darkmode__80"> to view more articles corresponding to keywords.

Buy Me A Coffee