Flink学习连载文章11--双流Join

双流 Join 和两个流合并是不一样的

两个流合并:两个流变为 1 个流 union connect

双流 join: 两个流 join,其实这两个流还是原来的,只是满足条件的数据会变为一个新的流。

可以结合 sql 语句中的 union 和 join 的区别。

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join -- 类似于我们以前学过的内连接 inner join
  • coGroup -- 类似于我们以前学过的 内连接,左连接,右连接
  • intervalJoin -- 一个流中的数据可以关联另一个流中一个时间段的所有数据

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

时间窗口如果是 00~05 ,05 这个时间点的数据是不包含在本窗口内的,而是归于下一个窗口,所谓的前包后不包。

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即可,不用深究 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们一起看一下如何实现上图所示的滑动窗口 Join:

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @基本功能: 演示join的滑动窗口* @program:FlinkDemo* @author: 闫哥* @create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream).where(tuple3 -> tuple3.f0).equalTo(tuple3 -> tuple3.f0).window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print("绿色的流:");orangeDataStream.print("橘色的流:");resultStream.print("最终的结果:");//5. execute-执行env.execute();}
}

假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

2. CoGroup

CoGroup 算子是将两条数据流按照 Key 进行分组,然后将相同 Key 的数据进行处理。要实现 CoGroup 功能需要为两个输入流分别指定 KeySelector 和 WindowAssigner。它的调用方式类似于 Join 算子,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流或者右流的数据,基于此我们可以实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

目前,这些分组中的数据是在内存中保存的,因此需要确保保存的数据量不能太大,否则,JVM 可能会崩溃。

CoGroup 通用用法如下:

stream.coGroup(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<CoGroupFunction>);

下面我们看一下如何使用 CoGroup 算子实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

最大的优势是可以实现内连接,左连接,右连接,但是缺点是内存压力大,而上面的join只能实现内连接。

CoGroup 从写法上,是coGroup 和 join的区别,而且apply 里面的函数也是不一样的,一定要注意观察。

2.1 InnerJoin

下面我们看一下如何使用 CoGroup 实现内连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。InnerJoin 只有在两个流对应窗口中都存在元素时,才会输出。

我们以滚动窗口为例来实现 InnerJoin

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuCoGroupDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);coGroup.where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {// 凭借这两个迭代器实现内连接,左右连接// 内连接  外面这个循环和里面的循环必须都有数据才会进行输出,典型的内连接for (Tuple3<String, Integer, String> t1 : i1) {for (Tuple3<String, Integer, String> t2 : i2) {collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);}}}}).print();//5. execute-执行env.execute();}}

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 InnerJoin ,只需要两个迭代器中的元素两两组合即可。两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
2.2 LeftJoin

下面我们看一下如何使用 CoGroup 实现左连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要绿色流窗口中有元素时,就会输出。即使在橘色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 LeftJoin

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuCoGroupLeftDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);coGroup.where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {// 凭借这两个迭代器实现内连接,左右连接// 内连接for (Tuple3<String, Integer, String> t1 : i1) {boolean noEelement = true;for (Tuple3<String, Integer, String> t2 : i2) {noEelement = false;collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);}if(noEelement){collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+null);}}}}).print();//5. execute-执行env.execute();}}

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 green Iterable,橘色流为 orange Iterable,如果要实现 LeftJoin ,需要保证 orange Iterable 中没有元素,green Iterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 orange Iterable 是否有元素,如果 orange Iterable 中没有元素,单独输出 greenIterable 中的元素即可。Join 效果如下所示:

2.3 RightJoin

下面我们看一下如何使用 CoGroup 实现右连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要橘色流窗口中有元素时,就会输出。即使在绿色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 RightJoin

// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
// 绿色流
.where(new KeySelector<Tuple3<String, String, String>, String>() {@Overridepublic String getKey(Tuple3<String, String, String> tuple3) throws Exception {return tuple3.f0;}
})
// 橘色流
.equalTo(new KeySelector<Tuple3<String, String, String>, String>() {@Overridepublic String getKey(Tuple3<String, String, String> tuple3) throws Exception {return tuple3.f0;}
})
// 滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new RightJoinFunction());// 右连接
private static class RightJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {@Overridepublic void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {for (Tuple3<String, String, String> orangeTuple : orangeIterable) {boolean noElements = true;for (Tuple3<String, String, String> greenTuple : greenIterable) {noElements = false;LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2);collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);}if (noElements) {LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",orangeTuple.f0, "null, " + orangeTuple.f1, "null, " + orangeTuple.f2);collector.collect("null, " + orangeTuple.f2);}}}
}

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 RightJoin,实现原理跟 LeftJoin 一样,需要保证 greenIterable 中没有元素,orangeIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 greenIterable 是否有元素,如果 greenIterable 中没有元素,单独输出 orangeIterable 中的元素即可。

3. Interval Join

Interval Join 不同于 Join以及CoGroup 原因是 Join和CoGroup 他们是窗口Join ,必须给定窗口的 ,Interval Join不需要给窗口。Interval Join 必须先分组才能使用。

Flink 中基于 DataStream 的 Join,只能实现在同一个窗口的两个数据流进行 Join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内 Join。Flink 基于 KeyedStream 提供的 Interval Join 机制可以对两个keyedStream 进行 Join, 按照相同的 key 在一个相对数据时间的时间段内进行 Join。按照指定字段以及右流相对左流偏移的时间区间进行关联:

b.timestamp ∈ [a.timestamp + lowerBound, a.timestamp + upperBound]

或者

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b分别是上图中绿色流和橘色流中的元素,并且有相同的 key。只需要保证 lowerBound 永远小于等于 upperBound 即可,均可以为正数或者负数。

从上面可以看出绿色流可以晚到 lowerBound(lowerBound为负的话)时间,也可以早到 upperBound(upperBound为正的话)时间。也可以理解为橘色流中的每个元素可以和绿色流中指定区间的元素进行 Join。需要注意的是 Interval Join 当前仅支持事件时间(EventTime):

public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {if (timeBehaviour != TimeBehaviour.EventTime) {throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");}
}

下面我们具体看看如何实现一个 Interval Join:

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuIntervalJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.keyBy(tup -> tup.f0).intervalJoin(orangeStream.keyBy(tup -> tup.f0)).between(Time.seconds(-2),Time.seconds(1)).process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

需要注意的是 Interval Join 当前仅支持事件时间(EventTime),所以需要为流指定事件时间戳(毫秒值)。

两条流输入元素如下所示:

绿色流:
c,0,2021-03-23 12:09:00
c,1,2021-03-23 12:09:01
c,6,2021-03-23 12:09:06
c,7,2021-03-23 12:09:07橘色流:
c,0,2021-03-23 12:09:00
c,2,2021-03-23 12:09:02
c,3,2021-03-23 12:09:03
c,4,2021-03-23 12:09:04
c,5,2021-03-23 12:09:05
c,7,2021-03-23 12:09:07

总结:

join、coGroup 都是基于窗口的join, join算子本身只支持内连接

coGroup 只可以实现内连接,左右连接

intervalJoin 是一个范围join, 一个数据流上的某一时刻数据可以join 另外一个数据流上的某一范围的数据。跟窗口无关。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/35468.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

vue3+wangeditor富文本编辑器详细教程

一、前言 在这篇教程中&#xff0c;我将指导如何使用 Vue 3 和 WangEditor 创建一个功能丰富的富文本编辑器。WangEditor 是一个轻量级的富文本编辑器&#xff0c;它非常适合集成到 Vue 项目中。这个例子展示了如何配置富文本编辑器&#xff0c;包括工具栏、编辑器配置以及如何…

Python学习39天

my_tools.py文件提供工具函数 """ 此文件编写工具函数&#xff0c;供程序员使用 my_tools """def read_confirm_select():"""让用户输入&#xff1a;Y/N&#xff0c;不区分大小写&#xff0c;将用户输入值转为小写返回&#xff…

LCA - Lowest Common Ancestor

LCA - Lowest Common Ancestor https://www.luogu.com.cn/problem/SP14932 题目描述 A tree is an undirected graph in which any two vertices are connected by exactly one simple path. In other words, any connected graph without cycles is a tree. - Wikipedia T…

unity打包web,发送post请求,获取地址栏参数,解决TypeError:s.replaceAll is not a function

发送post请求 public string url "http://XXXXXXXXX";// 请求数据public string postData "{\"user_id\": 1}";// Start is called before the first frame updatevoid Start(){// Post();StartCoroutine(PostRequestCoroutine(url, postData…

恒创科技:如何区分网站的域名主机名

如何区分网站的域名主机名?它们都是网址机制的一部分&#xff0c;当你在地址栏输入它们&#xff0c;就能访问互联网上想去的地方。你可曾思考过主机名和域名的区别呢? 简单来说&#xff0c;域名就像网址&#xff0c;而主机名用于标识网络中的设备。不过&#xff0c;这只是表面…

【技巧学习】ArcGIS如何计算水库库容量?

ArcGIS如何计算水库库容量? 一、数据获取 DEM数据来源于地理空间数据云&#xff0c;该网站是由中科院计算机网络信息中心于2008年创立的地学大数据平台。 二、填洼 将DEM数据中凹陷的区域填充至与倾斜点同样高度&#xff0c;这里的【Z限制】说的是设定一个特定的值&#x…

机器学习——感知机模型

文章目录 前言1.感知机模型介绍1.1基本概念1.2数学表达1.3几何解释1.4优缺点 2.二分类应用2.1应用介绍2.2准备数据集2.2.1环境检查2.2.2数据集介绍2.2.3获取数据2.2.4划分数据集 2.3可视化训练集2.4训练过程2.4.1首轮梯度下降2.4.2多轮梯度下降 2.5可视化分类结果2.6在验证集验…

11.20[JAVAEXP3]重定向细究【DEBUG】

设置了根域名访问为testServlet,让他重定向到首页为test.jsp&#xff0c;事实上也都触发了&#xff0c;但是最后显示的为什么不是test.jsp生成页面&#xff0c;依然还是index.jsp生成的页面&#xff1f;&#xff1f; 重定向是通过Dispatcher进行的&#xff0c;而不是sendRedir…

YOLOv11模型改进-注意力-引入卷积和注意力融合模块(CAFM) 提升小目标和遮挡检测

本篇文章将介绍一个新的改进机制——卷积和注意力融合模块CAFM&#xff0c;并阐述如何将其应用于YOLOv11中&#xff0c;显著提升模型性能。首先&#xff0c;CAFM是为了融合卷积神经网络&#xff08;CNNs&#xff09;和 Transformer 的优势&#xff0c;同时对全局和局部特征进行…

APM装机教程(五):测绘无人船

文章目录 前言一、元生惯导RTK使用二、元厚HXF260测深仪使用三、云卓H2pro遥控器四、海康威视摄像头 前言 船体&#xff1a;超维USV-M1000 飞控&#xff1a;pix6c mini 测深仪&#xff1a;元厚HXF160 RTK&#xff1a;元生惯导RTK 遥控器&#xff1a;云卓H12pro 摄像头&#xf…

基于MinIO打造高可靠分布式“本地”文件系统

MinIO是一款高性能的对象存储服务&#xff0c;而S3协议是由亚马逊Web服务&#xff08;AWS&#xff09;制定的一种标准协议&#xff0c;用于云存储服务之间的数据交换。MinIO与S3协议的关系在于&#xff0c;MinIO实现了S3协议的接口&#xff0c;这意味着用户可以使用与AWS S3相同…

Luma 视频生成 API 对接说明

Luma 视频生成 API 对接说明 随着 AI 的应用变广&#xff0c;各类 AI 程序已逐渐普及。AI 已逐渐深入到人们的工作生活方方面面。而 AI 涉及的行业也越来越多&#xff0c;从最初的写作&#xff0c;到医疗教育&#xff0c;再到现在的视频。 Luma 是一个专业高质量的视频生成平…

基础算法——搜索与图论

搜索与图论 图的存储方式2、最短路问题2.1、Dijkstra算法&#xff08;朴素版&#xff09;2.2、Dijkstra算法&#xff08;堆优化版&#xff09;2.3、Bellman-Ford算法2.4、SPFA求最短路2.5、SPFA判负环2.6、Floyd算法 图的存储方式 2、最短路问题 最短路问题可以分为单源最短路…

Online Monocular Lane Mapping

IROS 2023 港科大 文章链接&#xff1a;http://arxiv.org/abs/2307.11653 github&#xff1a;GitHub - HKUST-Aerial-Robotics/MonoLaneMapping: Online Monocular Lane Mapping Using Catmull-Rom Spline (IROS 2023) 动机 摆脱高精地图&#xff0c;使用车端的传感器来实现车端…

29.两数相除 python

两数相除 题目题目描述示例 1:示例 2:提示&#xff1a;题目链接 题解解题思路python实现代码解释提交结果 题目 题目描述 给你两个整数&#xff0c;被除数 dividend 和除数 divisor。将两数相除&#xff0c;要求 不使用 乘法、除法和取余运算。 整数除法应该向零截断&#x…

MicroBlaze软核开发(二):GPIO

实现功能&#xff1a;使用 MicroBlaze软核&#xff0c;配置GPIO用拨码开关控制LED灯 Vivado版本&#xff1a;2018.3 目录 引言 vivado部分&#xff1a; 一、配置GPIO 二、生成HDL文件编译 SDK部分&#xff1a; 一、导出硬件启动SDK 二、新建应用程序工程 三、编写程序代…

sdk项目的git 标记新tag的版本号

在 Git 中&#xff0c;tag 是用来标记某个特定的提交点&#xff08;通常是发布版本或重要的里程碑&#xff09;的工具。通过 git tag&#xff0c;你可以为版本号创建标记&#xff0c;帮助团队跟踪不同版本的代码。 如果你想创建一个新的版本号标签&#xff0c;可以按照以下步骤…

40分钟学 Go 语言高并发:服务注册与发现

服务注册与发现 一、系统架构设计 让我们先通过流程图了解服务注册与发现的整体架构&#xff1a; 二、核心组件实现 1. 服务注册中心 package discoveryimport ("context""sync""time" )// ServiceInstance 服务实例 type ServiceInstance…

〔 MySQL 〕索引

目录 1. 没有索引&#xff0c;可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘&#xff1a; 在看看磁盘中一个盘片​编辑 扇区 定位扇区​编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…

微信小程序里的小游戏研发需要什么技术栈

研发小程序里的小游戏通常需要以下技术栈&#xff1a; 前端技术 HTML5 / CSS3&#xff1a;用于构建游戏的界面布局和样式。JavaScript&#xff1a;作为核心编程语言&#xff0c;实现游戏的逻辑和交互。小程序开发框架&#xff1a;如微信小程序的开发框架&#xff0c;了解其 API…