Flink滑动窗口(Sliding)中window和windowAll的区别

滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。

窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据

滑动窗口(window)

比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。

代码展示:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4Window {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS//每隔5秒计算最近15秒的数据.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}

滑动窗口(windowAll) 

将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加

 代码展示:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4WindowAll {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS//每隔5秒计算最近15秒的数据//windowAll:将同一个窗口的数据发一起进行计算.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowAllDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5598.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

AWS云服务器选择哪个区域最好?

选择AWS云服务器的区域&#xff08;Region&#xff09;是一个非常重要的决策&#xff0c;因为它会影响你的应用性能、成本和合规性。以下是九河云总结的一些选择AWS区域时需要考虑的关键因素&#xff1a; 1. 地理位置和延迟 选择离你的用户或客户最近的区域可以最大程度减少网…

高频面试题(含笔试高频算法整理)基本总结回顾30

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…

6.气泵控制原理---单双向可控硅控制原理、斩波电路、项目举例说明

最近在项目开发中&#xff0c;涉及到气泵的相关控制&#xff0c;在这里进行比较系统的学习。对交直流气泵区别进行一个说明 &#xff0c;同时了解一下单向和双向可控硅&#xff0c;最后根据项目实例进行了解。因为个人比较偏软件&#xff0c;有什么不对的地方欢迎指正。 一交流…

element-plus table tableRowClassName 无效

官网上给的是 .el-table .warning-row {--el-table-tr-bg-color: var(--el-color-warning-light-9); } .el-table .success-row {--el-table-tr-bg-color: var(--el-color-success-light-9); } 但是 如果 加上了 scoped 这样样式是无效的 在 vue3 中用样式穿透 即可生…

Python自动化测试框架详解!

随着技术的进步和自动化技术的出现&#xff0c;市面上出现了一些自动化测试框架。只需要进行一些适用性和效率参数的调整&#xff0c;这些自动化测试框架就能够开箱即用&#xff0c;大大节省了开发时间。而且由于这些框架被广泛使用&#xff0c;他们具有很好的健壮性&#xff0…

【数据结构实战】从0打造你的专属顺序表

专栏&#xff1a;《数据结构实战篇》 生活中有着无穷无尽的数据需要存储&#xff0c;大到全国人口普查&#xff0c;小到微信、QQ好友列表&#xff0c;都需要有一个合理的存储方式才能使得我们的数据更方便管理&#xff0c;线性表就是其中之一 一、线性表 线性表&#xff08;li…

RFID标签实现托盘智能化管理

一、RFID技术概述 1.1 RFID技术原理 RFID技术&#xff0c;即无线射频识别技术&#xff0c;是一种利用无线电波进行非接触式自动识别和数据交换的技术。其核心优势在于能够实现远距离、快速、批量的识别&#xff0c;相较于传统的条形码技术&#xff0c;RFID技术在物资管理领域展…

net core 生成URL HtmlHelper

HtmlHelper Url.Action Url.RouteUrl RedirectToAction public IActionResult Privacy(){return RedirectToAction("Index");}Html.ActionLink Html.BeginForm Html.ActionLink 与 Url.Action 1.两者者是根据给定的Controller,Action 生成链接&#xff0c; 但是H…

零日漏洞被谷歌的 AI 工具发现

谷歌的 AI 研究工具 Big Sleep 取得了重大突破&#xff0c;发现了 SQLite 中的漏洞&#xff0c;SQLite 是全球使用最广泛的数据库引擎之一。 Google Project Zero 和 Google DeepMind 团队最近在官方博客文章中分享了这一里程碑&#xff0c;标志着 AI 驱动的漏洞检测在现实世界…

Github 2024-11-07 Go开源项目日报 Top10

根据Github Trendings的统计,今日(2024-11-07统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Go项目10HTML项目1Kubernetes: 容器化应用程序管理系统 创建周期:3618 天开发语言:Go协议类型:Apache License 2.0Star数量:106913 个Fork数…

Java学习笔记之类

文章目录 类和对象的基础属性/成员变量⚠️ 属性的注意事项和细节⚠️ 构建函数 类和对象的区别类和对象的内存分配机制练习 成员方法方法定义⚠️ 方法使用细节⚠️ 形参列表细节⚠️ 方法内部细节⚠️ 方法调用细节方法入门代码01⚠️ 行参和成员方法重名方法入门代码02方法递…

链表删除相关算法题|删除值为x的节点|删除最小值节点|删除值在区间内的节点|删除重复节点|删除绝对值相等的节点(C)

删除值为x的节点 在带头结点的单链表L中&#xff0c;删除所有值为X的结点&#xff0c;并释放其空间&#xff0c;假设值为的结点不唯一 算法思想 删除单链表的节点需要三个指针 一个是遍历链表的工作指针cur&#xff0c;一个是指向cur的上一个节点的指针prev&#xff0c;一个…

C++:哈希表的实现

一、哈希表的基本概念 1、负载因子&#xff1a;假设哈希表中已经映射存储了N个值&#xff0c;哈希表的大小为M&#xff0c;那么负载因子 N / M&#xff0c;负载因子有些地⽅也翻译为载荷因子/装载因子等&#xff0c;他的英文为load/factor。负载因子越大&#xff0c;哈希冲突的…

2024年11月软考考前注意事项

一、重要时间节点 准考证打印时间&#xff1a; 大部分省市的准考证打印时间从11月4日起开始&#xff0c;但上海、甘肃等地区则稍晚&#xff0c;从11月6日起开放打印。 请务必注意所在地区的具体打印时间&#xff0c;并尽早打印准考证&#xff0c;以免因错过时间而影响考试。…

书生大模型实战营Linux+InternStudio 关卡任务

一、端口映射 使用以下命令进行端口映射 ssh -p {YOUR_PORT} rootssh.intern-ai.org.cn -CNg -L 7860:127.0.0.1:7860 -o StrictHostKeyCheckingno 命令解释&#xff1a; -p 37367&#xff1a;是指定 SSH 连接的端口为 37367。rootssh.intern-ai.org.cn&#xff1a;表示要以…

道品科技智能水肥一体化技术要点及实施效果

## 一、引言 水肥一体化技术是现代农业中一种重要的耕作方式&#xff0c;旨在通过合理配置水资源与肥料&#xff0c;提高作物产量和质量&#xff0c;达到节水、增效和环保的目的。随着全球人口的增加和耕地资源的减少&#xff0c;水肥一体化技术在农业生产中的应用愈加重要。 …

sqlserver使用bak文件恢复数据库

进入数据库 sqlcmd -S localhost -U SA -P password备份文件 #备份格式BACKUP DATABASE your_database_name TO DISK path_to_backup_file.bak;#举例 1> BACKUP DATABASE XJZDataTest TO DISK /root/mssql.bak; 2> go使用备份文件恢复数据库 1、查询备份文件中的数据…

CSP/信奥赛C++刷题训练:经典深搜例题(1):洛谷1605 :迷宫

CSP/信奥赛C刷题训练&#xff1a;经典深搜例题&#xff08;1&#xff09;&#xff1a;洛谷1605 &#xff1a;迷宫 题目描述 给定一个 N M N \times M NM 方格的迷宫&#xff0c;迷宫里有 T T T 处障碍&#xff0c;障碍处不可通过。 在迷宫中移动有上下左右四种方式&#x…

yolov8涨点系列之Concat模块改进

文章目录 Concat模块修改步骤(1) BiFPN_Concat3模块编辑(2)在__init_.pyconv.py中声明&#xff08;3&#xff09;在task.py中声明yolov8引入BiFPN_Concat3模块yolov8.yamlyolov8.yaml引入C2f_up模块 在YOLOv8中&#xff0c; concat模块主要用于将多个特征图连接在一起。其具体…

越权访问漏洞

V2Board Admin.php 越权访问漏洞 ## 漏洞描述 V2board面板 Admin.php 存在越权访问漏洞&#xff0c;由于部分鉴权代码于v1.6.1版本进行了修改&#xff0c;鉴权方式变为从Redis中获取缓存判定是否存在可以调用… V2Board Admin.php 越权访问漏洞 漏洞描述 V2board面板 Admin.ph…