Click on the top ofzhisheng” to follow, star or pin to grow together

Flink goes from beginner to proficient Series of articles

Interval

Join: Interval join. Example example: Left Keyed Stream Press Key Join data for the last 2 minutes (interval) of another stream (Right Keyed Stream).

Interval Join syntax

leftKeyedStream .intervalJoin(rightKeyedStream)

time interval, set the lower and upper bounds

.
between(Time.minutes(- 10), Time.seconds(0)) does not contain the lower
bound .
lowerBoundExclusive()
does not contain the upper bound
. upperBoundExclusive() Custom ProcessJoinFunction Handle elements joined to .process(ProcessMergeFunction)

Note:

Both streams are cached in an internal State. leftElement arrives, goes to retrieve the data in the corresponding time range of RightElement in State, and then executes ProcessJoinFunction for join operations.

Time interval: leftElement default and [leftElementEventTime + lowerBound, leftElementEventTime + upperBound] time range rightElement join.

Example: leftElementEventTime=2019-11-16 17:30:00,

lowerBound=-10minute, upperBound=0, then this leftElement is joined by Key and [2019-11-16 17:20:00,2019-11-16 17:30:00] time range.

Interval Join currently only supports Event Time.

For larger data volumes, use RocksDBStateBackend.

Each user’s click joins the user’s browsing

test data

stream in the last 10 minutes

 Data stream Left
A user clicked on a page at a certain point
{"userID": "user_2", "eventTime": "2019-11-16 17:30:02", "eventType": "click", "pageID": "page_1"}

Data stream Right


A user viewed a product at a certain time, and the value of the item
{"userID": "user_2", "eventTime": "2019-11-16 17:30:01", "eventType": "browse", "productID": "product_1", "productPrice": 10}

case

package com.bigdata.flink.dataStreamWindowJoin.tumblingTimeWindow.intervalJoin; 

import com.alibaba.fastjson.JSON;


import com.bigdata.flink.beans.UserBrowseLog;
import com.bigdata.flink.beans.UserClickLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.Properties;

/** * Summary: *  Interval Join

 */


@Slf4j
public class Test {
     public static void main(String[] args) throws Exception{

        args=new String[ ]{"--application","flink/src/main/java/com/bigdata/flink/dataStreamWindowJoin/application.properties"};

1. Parse the command line arguments

ParameterTool fromArgs = ParameterTool.fromArgs(args);

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");

        String browseTopic = parameterTool.getRequired("browseTopic");


        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        String clickTopic = parameterTool.getRequired("clickTopic");


        String clickTopicGroupID = parameterTool.getRequired("clickTopicGroupID");

2. Set the running environment

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

3、Add Kafka data source


Browse stream Properties
browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);        DataStream browseStream=env

                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))


                .process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new BrowseBoundedOutOfOrdernessTimestampExtractor(Time.seconds( 0)));

Clickstream

Properties
clickProperties = new Properties();
        clickProperties.put("bootstrap.servers",kafkaBootstrapServers);
        clickProperties.put("group.id",clickTopicGroupID);        DataStream clickStream = env

                .addSource(new FlinkKafkaConsumer010<>(clickTopic, new SimpleStringSchema(), clickProperties))


                .process(new ClickKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new ClickBoundedOutOfOrdernessTimestampExtractor(Time.seconds( 0)));

        //browseStream.print();


        //clickStream.print();

4, Interval

Join Each user's click
Join The user's browsing clickStream

.keyBy("userID")


in the last 10 minutes                 .intervalJoin(browseStream.keyBy("userID"))
interval, setting the lower and upper bounds Lower bound:
10 minutes ago, upper bound: Current EventTime moment
                .between(Time.minutes(-10),Time.seconds(0))
Custom ProcessJoinFunction Handle elements to join
.process(new  ProcessJoinFunction() {
                    @Override
                    public void processElement (UserClickLog left, UserBrowseLog right, Context ctx, Collector out) throws Exception {
                        out.collect(left +" =Interval Join=> "+right);                    }                })                .print();        env.execute();    }
    static class BrowseKafkaProcessFunction extends ProcessFunction<String UserBrowseLog{
        @Override
        public void processElement(String value, Context ctx, Collector out) throws Exception {
            try {
                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog. class);
                if(log!=null){                    out.collect(log);                }

}catch (Exception ex){


log.error("Parsing Kafka data exception...",ex);            }        }    }
    static class ClickKafkaProcessFunction extends ProcessFunction<String UserClickLog{
        @Override
        public void processElement(String value, Context ctx, Collector  out) throws Exception {
            try {
                UserClickLog log = JSON.parseObject(value, UserClickLog. class);
                if(log!=null){                    out.collect(log);                }

}catch (Exception ex){


log.error("Parsing Kafka data exception...",ex);            }        }    }
    static class BrowseBoundedOutOfOrdernessTimestampExtractor extends  BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog{        BrowseBoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {

            super (maxOutOfOrderness);

        }

        @Override


        public long extractTimestamp(UserBrowseLog element) {
            DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");            DateTime dateTime = DateTime.parse(element.getEventTime(), dateTimeFormatter);

            return dateTime.getMillis();

        }    }
    static class ClickBoundedOutOfOrdernessTimestampExtractor extends  BoundedOutOfOrdernessTimestampExtractor<UserClickLog{        ClickBoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {

            super (maxOutOfOrderness);

        }

        @Override


        public long extractTimestamp(UserClickLog element) {
            DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");            DateTime dateTime = DateTime.parse(element.getEventTime(), dateTimeFormatter);

            return dateTime.getMillis();

} }}The

result

sends the following data to the two Topics

: Click to record Topic
{"userID": "user_2", "eventTime." ": "2019-11-16 17:30:00", "eventType": "click", "pageID": "page_1"}

Browse history topic


{"userID""user_2""eventTime""2019-11-16 17:19:00""eventType""browse"" productID""product_1""productPrice"10}
{"userID""user_2""eventTime""2019-11-16 17:20:00""eventType""browse""productID""product_1""productPrice" 10}
{"userID""user_2""eventTime""2019-11-16 17:22:00""eventType""browse""productID""product_1""productPrice"10}
{"userID""user_2""eventTime" "2019-11-16 17:26:00""eventType""browse""productID""product_1""productPrice"10}
{"userID""user_2""eventTime""2019-11-16 17:30:00""eventType""browse" "productID""product_1""productPrice"10}
{"userID""user_2"" eventTime""2019-11-16 17:31:00""eventType""browse""productID""product_1"" productPrice"

: 10} to get the output result, you can see that the click of the user_2 at 2019-11-16 17:30:00

time point has joined to this user in [2019-11-16 17:20:00,2019-11-16 17:30:00] Browse the time range.

UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID='page_1'} = Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:26:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:20:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:22:00', eventType='browse', productID='product_1' , productPrice=10}

class="js_darkmode__77"> public account (zhisheng) reply Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__80"> to view more articles corresponding to keywords.

Buy Me A Coffee