Click on the top of “zhisheng” to follow, star or pin to grow together
Flink goes from beginner to proficient Series of articles
Interval
Join: Interval join. Example example: Left Keyed Stream Press Key Join data for the last 2 minutes (interval) of another stream (Right Keyed Stream).

Interval Join syntax
leftKeyedStream .intervalJoin(rightKeyedStream) time interval, set the lower and upper bounds
.
between(Time.minutes(- 10), Time.seconds(0)) does not contain the lower
bound .
lowerBoundExclusive()
does not contain the upper bound
. upperBoundExclusive() Custom ProcessJoinFunction Handle elements joined to .process(ProcessMergeFunction)
Note:
Both streams are cached in an internal State. leftElement arrives, goes to retrieve the data in the corresponding time range of RightElement in State, and then executes ProcessJoinFunction for join operations.
Time interval: leftElement default and [leftElementEventTime + lowerBound, leftElementEventTime + upperBound] time range rightElement join.
Example: leftElementEventTime=2019-11-16 17:30:00,
lowerBound=-10minute, upperBound=0, then this leftElement is joined by Key and [2019-11-16 17:20:00,2019-11-16 17:30:00] time range.
Interval Join currently only supports Event Time.
For larger data volumes, use RocksDBStateBackend.
Each user’s click joins the user’s browsing
test data
stream in the last 10 minutes
Data stream Left
A user clicked on a page at a certain point
{"userID": "user_2", "eventTime": "2019-11-16 17:30:02", "eventType": "click", "pageID": "page_1"} Data stream Right
A user viewed a product at a certain time, and the value of the item
{"userID": "user_2", "eventTime": "2019-11-16 17:30:01", "eventType": "browse", "productID": "product_1", "productPrice": 10}
case
package com.bigdata.flink.dataStreamWindowJoin.tumblingTimeWindow.intervalJoin; import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.UserBrowseLog;
import com.bigdata.flink.beans.UserClickLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter; import java.util.Properties;
/** * Summary: * Interval Join */
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{ args=new String[ ]{"--application","flink/src/main/java/com/bigdata/flink/dataStreamWindowJoin/application.properties"};
1. Parse the command line arguments
ParameterTool fromArgs = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID"); String clickTopic = parameterTool.getRequired("clickTopic");
String clickTopicGroupID = parameterTool.getRequired("clickTopicGroupID"); 2. Set the running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 3、Add Kafka data source
Browse stream Properties
browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID); DataStream browseStream=env .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction())
.assignTimestampsAndWatermarks(new BrowseBoundedOutOfOrdernessTimestampExtractor(Time.seconds( 0))); Clickstream
Properties
clickProperties = new Properties();
clickProperties.put("bootstrap.servers",kafkaBootstrapServers);
clickProperties.put("group.id",clickTopicGroupID); DataStream clickStream = env .addSource(new FlinkKafkaConsumer010<>(clickTopic, new SimpleStringSchema(), clickProperties))
.process(new ClickKafkaProcessFunction())
.assignTimestampsAndWatermarks(new ClickBoundedOutOfOrdernessTimestampExtractor(Time.seconds( 0))); //browseStream.print();
//clickStream.print(); 4, Interval
Join Each user's click
Join The user's browsing clickStream .keyBy("userID")
in the last 10 minutes .intervalJoin(browseStream.keyBy("userID"))
interval, setting the lower and upper bounds Lower bound:
10 minutes ago, upper bound: Current EventTime moment
.between(Time.minutes(-10),Time.seconds(0))
Custom ProcessJoinFunction Handle elements to join
.process(new ProcessJoinFunction() {
@Override
public void processElement (UserClickLog left, UserBrowseLog right, Context ctx, Collector out) throws Exception {
out.collect(left +" =Interval Join=> "+right); } }) .print(); env.execute(); }
static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {
UserBrowseLog log = JSON.parseObject(value, UserBrowseLog. class);
if(log!=null){ out.collect(log); } }catch (Exception ex){
log.error("Parsing Kafka data exception...",ex); } } }
static class ClickKafkaProcessFunction extends ProcessFunction<String, UserClickLog> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {
UserClickLog log = JSON.parseObject(value, UserClickLog. class);
if(log!=null){ out.collect(log); } }catch (Exception ex){
log.error("Parsing Kafka data exception...",ex); } } }
static class BrowseBoundedOutOfOrdernessTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> { BrowseBoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { super (maxOutOfOrderness);
} @Override
public long extractTimestamp(UserBrowseLog element) {
DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); DateTime dateTime = DateTime.parse(element.getEventTime(), dateTimeFormatter); return dateTime.getMillis();
} }
static class ClickBoundedOutOfOrdernessTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserClickLog> { ClickBoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { super (maxOutOfOrderness);
} @Override
public long extractTimestamp(UserClickLog element) {
DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); DateTime dateTime = DateTime.parse(element.getEventTime(), dateTimeFormatter); return dateTime.getMillis();
} }}The
result
sends the following data to the two Topics
: Click to record Topic
{"userID": "user_2", "eventTime." ": "2019-11-16 17:30:00", "eventType": "click", "pageID": "page_1"} Browse history topic
{"userID": "user_2", "eventTime": "2019-11-16 17:19:00", "eventType": "browse", " productID": "product_1", "productPrice": 10}
{"userID": "user_2", "eventTime": "2019-11-16 17:20:00", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_2", "eventTime": "2019-11-16 17:22:00", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_2", "eventTime" : "2019-11-16 17:26:00", "eventType": "browse", "productID": "product_1", "productPrice" : 10}
{"userID": "user_2", "eventTime": "2019-11-16 17:30:00", "eventType": "browse" , "productID": "product_1", "productPrice": 10}
{"userID": "user_2", " eventTime": "2019-11-16 17:31:00", "eventType": "browse", "productID": "product_1", " productPrice"
: 10} to get the output result, you can see that the click of the user_2 at 2019-11-16 17:30:00
time point has joined to this user in [2019-11-16 17:20:00,2019-11-16 17:30:00]
Browse the time range.
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID='page_1'} = Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:26:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:20:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='browse', productID='product_1' , productPrice=10}
UserClickLog{userID='user_2', eventTime='2019-11-16 17:30:00', eventType='click', pageID= 'page_1'} =Interval Join=> UserBrowseLog{userID='user_2', eventTime='2019-11-16 17:22:00', eventType='browse', productID='product_1' , productPrice=10}
class="js_darkmode__77"> public account (zhisheng) reply Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__80"> to view more articles corresponding to keywords.