Flink笔记整理(五)

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1489068.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构 Day2 链式存储

线性表的链式存储 解决顺序存储的缺点&#xff0c;插入和删除&#xff0c;动态存储问题。特点&#xff1a;线性表链式存储结构的特点是一组任意的存储单位存储线性表的数据元素&#xff0c;存储单元可以是连续的&#xff0c;也可以不连续。可以被存储在任意内存未被占用的位置上…

JavaScript Let

ECMAScript 2015 ES2015 引入了两个重要的 JavaScript 新关键词&#xff1a;let 和 const。 这两个关键字在 JavaScript 中提供了块作用域&#xff08;Block Scope&#xff09;变量&#xff08;和常量&#xff09;。 在 ES2015 之前&#xff0c;JavaScript 只有两种类型的作…

【数据库】联合索引在b+树如何存储

什么是联合索引&#xff1f; 联合索引是一种数据库索引类型&#xff0c;它允许你基于表中两个或多个列的组合来创建索引。这种索引可以提高数据库查询的性能&#xff0c;特别是当查询条件涉及到这些列时。 如&#xff1a; create index id_name_age out users(name,age) 索引…

算法刷题day20|回溯:39. 组合总和、40. 组合总和 II、131. 分割回文串

39. 组合总和 回溯 class Solution { private:vector<vector<int>> result;vector<int> path;void backtracking(vector<int>& candidates, int target, int sum, int startIndex) {if (sum > target) {return;}if (sum target) {result.push…

C#、Net6、WebApi报表方案

目录 1 Pdf表单方案 1.1出现如下错误提示: 1.2 字体路径使用 2 Docx报表模板方案 2.1 pdf方案缺陷 2.2 解决方案 3 Spire.Doc报表方案 3.1 Docx方案缺陷 3.2 解决方案 4 插入复选框 5 WebApi文件流下载接口 6 软件获取方式 1 Pdf表单方案 使用【Adobe Acrobat P…

Python 爬虫入门(一):从零开始学爬虫 「详细介绍」

Python 爬虫入门&#xff08;一&#xff09;&#xff1a;从零开始学爬虫 「详细介绍」 前言1.爬虫概念1.1 什么是爬虫&#xff1f;1.2 爬虫的工作原理 2. HTTP 简述2.1 什么是 HTTP&#xff1f;2.2 HTTP 请求2.3 HTTP 响应2.4 常见的 HTTP 方法 3. 网页的组成3.1 HTML3.1.1 HTM…

掀桌子了!原来是咱们的大屏设计太酷,吓着前端开发老铁了

掀桌子了&#xff01;原来是咱们的大屏设计太酷&#xff0c;吓着前端开发老铁了 艾斯视觉观点认为&#xff1a;在软件开发的世界里&#xff0c;有时候创意和设计的火花会擦得特别亮&#xff0c;以至于让技术实现的伙伴们感到既兴奋又紧张。这不&#xff0c;我们的设计团队刚刚…

SpringBoot入门实战:SpringBoot整合Shiro

1.背景介绍 SpringBoot是一个用于快速开发Spring应用程序的框架。它的核心是对Spring框架的一层封装&#xff0c;使其更加简单易用。SpringBoot整合Shiro是一种将SpringBoot与Shiro整合的方法&#xff0c;以实现身份验证和授权功能。 Shiro是一个强大的Java安全框架&#xff0c…

matlab笔记 - 最小二乘法拟合直线的原理与实现

最小二乘法拟合直线原理与实现 一、引言二、原理概述1. 建模思路2.误差函数3.求解最优参数 三、matlab实现最小二乘法拟合直线1.直接代码实现2.MATLAB内置函数实现 四、扩展统计学与回归分析经济学工程学图像处理机器学习 一、引言 最小二乘法&#xff08;Least Squares Metho…

鸿蒙APP架构及开发入门

1.鸿蒙系统 1.1 什么是鸿蒙 鸿蒙是一款面向万物互联时代的、全新的分布式操作系统。 在传统的单设备系统能力基础上&#xff0c;鸿蒙提出了基于同一套系统能力、适配多种终端形态的分布式理念&#xff0c;能够支持手机、平板、智能穿戴、智慧屏、车机、PC、智能音箱、耳机、…

超燃!纯AI生成《泰坦尼克号》大片!浙大阿里发布MovieDreamer:超长电影生成“梦工厂“

论文链接&#xff1a;https://arxiv.org/pdf/2407.16655 项目主页&#xff1a;https://aim-uofa.github.io/MovieDreamer/ github链接&#xff1a;https://github.com/aim-uofa/MovieDreamer 亮点直击 MovieDreamer&#xff0c;一个新颖的分层框架&#xff0c;将自回归模型与扩…

正则表达式与文本处理

目录 一、正则表达式 1、正则表达式定义 1.1正则表达式的概念及作用 1.2、正则表达式的工具 1.3、正则表达式的组成 2、基础正则表达式 3、扩展正则表达式 4、元字符操作 4.1、查找特定字符 4.2、利用中括号“[]”来查找集合字符 4.3、查找行首“^”与行尾字符“$”…

前端江湖:从菜鸟到大侠的修炼手册

在这个数字编织的梦幻世界里&#xff0c;前端&#xff0c;这个听起来就带着几分仙气与神秘感的词汇&#xff0c;实则是每一位互联网探险家手中的魔法杖。它不仅连接着代码的冰冷逻辑与用户的炽热情感&#xff0c;更在无数次的点击与滑动间&#xff0c;绘制出一幅幅绚丽多彩的交…

通过IP获取对应的经纬度地区

背景 项目现在要通过IP获取对应的地区和经纬度。后面会根据经纬度在地图上展示 直接用大佬给出的ip-info 这是大佬给的项目地址 https://gitee.com/jthinking/ip-info这是运行实例 感谢大佬打赏 ![在这里插入图片描述]

怎样在 Nginx 中配置基于请求客户端指纹识别数据的路由?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01; 文章目录 怎样在 Nginx 中配置基于请求客户端指纹识别数据的路由 怎样在 Nginx 中配置基于请求客户端指纹识别数据的路由 在当今数字化的世界中&#xff0c;网站和应用程…

谷粒商城实战笔记-69-商品服务-API-品牌管理-JSR303自定义校验注解

文章目录 1. 需求介绍2. 创建自定义校验注解2.1 编写自定义校验注解2.1.1 注解定义2.1.2 配置文件 3. 实现自定义校验器3.1 编写自定义校验器 4. 使用自定义校验5. 多类型校验器的支持6. 测试 上一节讲解了如何使用分组校验。 这一节将详细介绍如何在Java中实现自定义校验注解以…

步进电机常见的三种驱动方式

步进电机是一种作为控制用的特种电机, 它的旋转是以固定的角度(称为"步距角")一步一步运行的, 其特点是没有积累误差( 为100%), 所以广泛应用于各种开环控制。 步进电机的运行要有一电子装置进行驱动, 这种装置就是步进电机驱动器, 它是把控制系统发出的脉冲信号转化…

宝塔单ip,新建多站点

报错如上&#xff1a; 那么如何新建多站点呢 先随便写个名字上去&#xff0c;然后再重新绑定别的端口… 这个时候访问99端口即可 。 如果是有域名&#xff0c;则不需要这样做 、直接80端口也可以多站点

富芮坤FR800X系列之按键检测模块设计

FR800X系列按键检测模块 读者对象&#xff1a; 本文档主要适用以下工程师&#xff1a; 嵌入式系统工程师 单片机软件工程师 IOT固件工程师 BLE固件工程师 文章目录 1.概要2.用户如何设计按键检测模块2.1 GPIO初始化2.2按键模块初始化2.3设计中断函数&#xff1a;2.4循环…