滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。
窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据
滑动窗口(window)
比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。
代码展示:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4Window {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS//每隔5秒计算最近15秒的数据.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}
滑动窗口(windowAll)
将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加
代码展示:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4WindowAll {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS//每隔5秒计算最近15秒的数据//windowAll:将同一个窗口的数据发一起进行计算.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowAllDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}