208.Flink(三):窗口的使用,处理函数的使用

目录

一、窗口

1.窗口的概念

2.窗口的分类

(1)按照驱动类型分

(2)按照窗口分配数据的规则分类

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

*1)按键分区窗口(Keyed Windows)

*2)非按键分区(Non-Keyed Windows)

(2)代码中窗口API的调用

(3)窗口分配器

(4)窗口函数

*1)增量聚合函数

^1)归约函数(ReduceFunction)

^2)聚合函数(AggregateFunction)

*2)全窗口函数(full window functions)

*3)增量聚合和全窗口函数的结合使用

(5)触发器(Trigger)

(6)移除器(Evictor)

(7)窗口的简单原理

*1)一个数据来了,怎么认为他是哪个窗口内的数据?

*2)窗口特性

*3)窗口的生命周期

4.时间语义

(1)Flink中的时间语义

(2)Flink以事件时间为默认时间语义

5.水位线(Watermark)

(1)水位线的概念

*1)有序流中的水位线

*2)乱序流中的水位线

(2)水位线和窗口的工作原理

(3) 生成水位线

*1)总体原则

*2)有序流中内置水位线设置

*3)乱序流中内置水位线设置

*4)自定义水位线生成器(周期式、断点式)

*5)在数据源中发送水位线

(6)迟到数据的处理

*1)设置乱序容忍度

*2)设置窗口延迟关闭

*3)侧输出流

(7)基于时间的合流——双流联结(Join)

*1)窗口联结(Window Join)

*2)间隔联结(Interval Join)

二、处理函数

1.基本处理函数(ProcessFunction)

(1)处理函数的功能和使用

(2)ProcessFunction解析

(3)处理函数的分类

2.按键分区处理函数(KeyedProcessFunction)

(1)定时器(Timer)和定时服务(TimerService)

(2)KeyedProcessFunction注意点及实现

3.应用案例:Top N

(1)方法一:ProcessAllWindowFunction

(2)方法二:

4.侧输出流


一、窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

1.窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.窗口的分类

(1)按照驱动类型分

*1)时间窗口

一定时间作为一个窗口

*2)计数窗口

达到多少数量作为一个窗口

(2)按照窗口分配数据的规则分类

*1)滚动窗口

以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

*2)滑动窗口

窗口大小 + 步长。

如果步长 = 窗口大小,其实就是滚动窗口的情况。

步长 > 窗口大小,会有数据被漏掉。

步长 < 窗口大小,窗口会有重叠

*3)会话窗口

基于会话对数据分组

*4)全局窗口

全局有效,没有结束时间

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

*1)按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

stream.keyBy(...).window(...)

*2)非按键分区(Non-Keyed Windows)

窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

stream.windowAll(...)

(2)代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型。

.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

(3)窗口分配器

窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

(4)窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 ---  时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
//        sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
//        sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
//        sensorKS.window(GlobalWindows.create())  // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
//        sensorKS.countWindow(5)  // 滚动窗口,窗口长度=5个元素
//        sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//        .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()env.execute();}
}

*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/143583.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

接口测试——接口协议抓包分析与mock_L2

目录&#xff1a; 抓包工具charles抓包工具fiddler抓包工具证书配置app抓包实战练习接口测试实战练习 1.抓包工具charles 工具介绍 支持 SSL 代理支持流量控制支持重发网络请求&#xff0c;方便后端调试支持修改网络请求参数支持网络请求的截获并动态修改可以自动将 json 或…

探索Moonbeam路由流动性的强大功能

Moonbeam的GMP预编译作为MRL的接口&#xff0c;有助于将带有Token的消息从GMP协议&#xff08;通过XCMP&#xff09;传输到与Moonbeam链接的平行链。 为何是个重磅消息&#xff1f;因为这项技术使得将流动性从外部区块链转移到其他波卡平行链成为可能&#xff01; 这里补充一…

Python 3.12.0 正式版即将发布!

导读Python 3.12.0 发布了第 2 个 RC 版本&#xff0c;也是最后一个 RC。正式版将于 2023 年 10 月 2 日星期一发布。 开发团队表示&#xff0c;进入候选版本阶段后&#xff0c;只接受经过 review 且修复明确错误的代码。RC2 是发现并修复重要问题的最后机会。 从该版本开始&a…

Mini Linux嵌入式设备服务器

Digi International推出了具有Digi Embedded Linux的Digi Connect ME 9210。Digi Embedded Linux是为在Digi嵌入式模块和微控制器上开发而优化的最新版本。高性能嵌入式开发服务器大约只有一对骰子大小&#xff0c;是嵌入式Linux上最小的。这使OEM可以在空间受限的设备中使用Li…

【产品运营】如何做好B端产品规划

产品规划是基于当下掌握的多维度信息&#xff0c;为追求特定目的&#xff0c;而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息&#xff08;客户需求、市场趋势、竞争对手、竞争策略等&#xff09;&#xff0c;为追求特定目的&#xff08;商业增长、客户满意等&…

SSM - Springboot - MyBatis-Plus 全栈体系(十三)

第三章 MyBatis 一、MyBatis 简介 1. 简介 MyBatis 最初是 Apache 的一个开源项目 iBatis, 2010 年 6 月这个项目由 Apache Software Foundation 迁移到了 Google Code。随着开发团队转投 Google Code 旗下&#xff0c; iBatis3.x 正式更名为 MyBatis。代码于 2013 年 11 月迁…

一文教你如何配置路由策略

【微|信|公|众|号&#xff1a;厦门微思网络】 微思-课程介绍 组网需求 如图1所示&#xff0c;某公司的部门A和部门B相距较远&#xff0c;Router_1和Router_6分别作为这两个部门的出口设备&#xff0c;AS 100内部使用OSPF作为IGP。现要求&#xff1a; 通过部署BGP&#xff0c;使…

每日一练 | 华为认证真题练习Day115

1、FEC(Forwarding Equivalence Class)转发等价类&#xff0c;是一组具有某些共性的数据流的集合&#xff1b;FEC可以根据地址进行划分&#xff0c;但是不能根据业务类型、QoS等要素进行划分。 A. 对 B. 错 2、关于OSI参考模型中网络层的功能说法正确的是&#xff1f; A. OS…

阿里云服务器共享型和企业级独享有什么区别?

阿里云ECS云服务器共享型和企业级有什么区别&#xff1f;企业级就是独享型&#xff0c;共享型和企业级云的主要区别CPU调度模式&#xff0c;共享型是非绑定CPU调度模式&#xff0c;企业级是固定CPU调度模式&#xff0c;共享型云服务器在高负载时计算性能可能出现波动不稳定&…

PHP自动识别采集何意网址文章正文内容

在做PHP采集内容时&#xff0c;用过querylist采集组件&#xff0c;但是这个插件采集页面内容时&#xff0c;都必须要写个采集选择器。这样比较麻烦&#xff0c;每个文章页面都必须指定一条采集规则 。就开始着手找一个插件可以能自动识别任意文章url正文内容并采集的&#xff0…

Python:Django框架的Hello wrold示例

Django是Python的目前很常用的web框架&#xff0c;遵循MVC设计模式。 以下介绍如何安装Django框架&#xff0c;并生成最简单的项目&#xff0c;输出Hello world。(开发工具VScode) 一、安装Django 在VScode终端控制台执行以下指令安装Django python install django 如果要查…

前端新轮子Nue,号称替代Vue、React和Svelte

新的简约前端开发工具集Nue.js 于周三发布。在 Hacker News 上介绍它时&#xff0c;前端开发者和Nue.js 的创作者Tero Piirainen表示&#xff0c;它是 React、Vue、Next.js、Vite、Svelte 和 Astro 的替代品。他在 Nue.js的 FAQ 中进一步解释说&#xff0c;它是为网站和响应式用…

Chrome获取RequestId

Chrome获取RequestId 参考&#xff1a;https://help.aliyun.com/zh/redis/how-do-i-obtain-the-id-of-a-request 在浏览器页面按下F12键&#xff0c;打开开发者工具页面&#xff1b; 在开发者工具页面&#xff0c;单击Network(网络)&#xff1b; 在playload(载荷)窗口中找到目…

Nginx代理victoriametrics集群配置

1,首先安装nginx yum install -y nginx 2,生成密钥文件 安装htpasswd工具 yum install -y httpd-tools 生成密钥文件,prometheus为用户名 htpasswd -c /etc/nginx/conf.d/passwd prometheus 3,修改nginx配置文件nginx.conf,增加如下内容 upstream vmselect {server 10.…

【新版】系统架构设计师 - 案例分析 - 架构设计<SOA与微服务>

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 案例分析 - 架构设计&#xff1c;SOA与微服务&#xff1e;例题1例题2例题3例题4 架构 - 案例分析 - 架构设计&#xff1c;SOA与微服务&#xff1e; 这里SOA与微服务的例题只对应找寻了几个&#x…

微信小程序 工具使用(HBuilderX)

微信小程序 工具使用:HBuilderX 一 HBuilderX 的下载二 工具的配置2.1 工具 --> 设置 --> 运行配置2.1.1 微信开发者工具路径2.1.2 node 运行配置 2.2 插件 工具 --> 插件安装2.2.1 下载插件 三 微信小程序端四 同步运行五 BUG5.1 nodemon在终端无法识别 一 HBuilderX…

ubuntu中的系统消息中显卡显示llvmpipe (LLVM 10.0.0, 256 bits)

这是我在使用ubuntu系统时出现的问题&#xff0c;网上搜到很多解决的办法&#xff0c;我是一顿操作&#xff0c;后来看到这位老哥的帖子解决了。 集Linux / Ubuntuwin10双系统安装记录(2):AMD核显驱动引发的问题 - 知乎上一篇中我们提到了 astroR2&#xff1a;Linux / Ubuntuw…

如何正确监测蓄电池健康?狠狠学到!

蓄电池在现代生活和工业中发挥着关键作用&#xff0c;它们为无数设备和系统提供了必要的电力支持。然而&#xff0c;蓄电池的性能和可靠性对许多应用至关重要。监控蓄电池状态和性能变得越来越重要&#xff0c;以确保它们在需要时始终可用。 为此&#xff0c;蓄电池监控系统应运…

LabVIEW开发具有栅极感应漏极电流的电荷泵

LabVIEW开发具有栅极感应漏极电流的电荷泵 由操作压力引起的接口陷阱一直是一个长期问题&#xff0c;因为它们会降低栅极电介质的质量&#xff0c;引起器件参数的不必要变化&#xff0c;例如导通状态电流、亚阈值摆幅、阈值电压和跨导。因此&#xff0c;表征界面疏水阀对于确保…

机器人过程自动化(RPA)入门 4. 数据处理

到目前为止,我们已经了解了RPA的基本知识,以及如何使用流程图或序列来组织工作流中的步骤。我们现在了解了UiPath组件,并对UiPath Studio有了全面的了解。我们用几个简单的例子制作了我们的第一个机器人。在我们继续之前,我们应该了解UiPath中的变量和数据操作。它与其他编…