7.2、如何理解Flink中的水位线(Watermark)

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {// 每进入一条数据,都会调用一次 onEvent 方法@Override/** 参数说明:*   @event : 进入到该方法的事件数据*   @eventTimestamp : 时间戳提取器提取的时间戳* */public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {//发射水位线output.emitWatermark(new Watermark(eventTimestamp));}// 不需要实现@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PeriodWatermarkGenerator<>();}
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 5.触发程序执行env.execute();}
}

查看运行结果:


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {// 设置变量,用来保存 当前最大的事件时间private long currentMaxTimestamp;// 设置变量,指定最大的乱序时间(等待时间)private final long maxOutOfOrderness = 0000; // 3 秒@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 只更新当前最大时间戳,不再发生水位线if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;}// 周期性 生成水位线// 每个 setAutoWatermarkInterval 时间,调用一次该方法@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));}
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PunctuatedWatermarkGenerator<>();}}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();// TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>()).withTimestampAssigner((event, timestamp) -> event.f1);// 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 创建 内置水位线生成策略WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s.withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...// 3.读取 source时添加水位线
env.fromSource(source, WatermarkStrategy实例, "source name")   .print()
;// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 2.将socket作为数据源(开启socket端口: nc -lk 9999)DataStreamSource<String> source = env.socketTextStream("localhost", 9999);source.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {if (key.equals("a")) {return 0;} else if (key.equals("b")) {return 1;} else {return 2;}}}, value -> value.split(",")[0]).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy//.<Tuple2<String, Long>>forMonotonousTimestamps().<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy()).withTimestampAssigner((element,recordTimestamp) -> element.f1).withIdleness(Duration.ofSeconds(5))  //空闲等待5s).process(new ShowProcessFunction()).setParallelism(1).print();env.execute();}
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/139394.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

在Kubernetes上安装和配置Istio:逐步指南,展示如何在Kubernetes集群中安装和配置Istio服务网格

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

爬虫实践-豆瓣读书Top250

爬虫学习与实践 一、爬虫介绍二、爬虫原理TCP3次握手&#xff0c;4次挥手过程 三、页面解析之数据提取四、正则表达式五、实践1. 抓取百度贴吧2. 拉钩招聘网 六、 进阶版 一、爬虫介绍 网络爬虫&#xff0c;其实叫作网络数据采集更容易理解。就是通过编程向网络服务器请求数据…

xp 系统 安装 python 2.7 ide pip

1 下载python http://www.python.org/ftp/python/ python-2.7.2.msi 安装完需要设置环境变量 2 下载 setuptools setuptools-0.6c11.win32-py2.7.exe https://pypi.tuna.tsinghua.edu.cn/simple/setuptools/ 3 下载 pip &#xff0c;python 2.7 最高支持 pip 20.3.4 https:…

mysql 半同步复制模式使用详解

目录 一、前言 二、mysql主从架构简介 2.1 mysql主从复制架构概述 2.2 为什么使用主从架构 2.2.1 提高数据可用性 2.2.2 提高数据可靠性 2.2.3 提升数据读写性能 2.3 主从架构原理 2.4 主从架构扩展 2.4.1 双机热备&#xff08;AB复制&#xff09; 2.4.2 级联复制 2…

[面试] k8s面试题 2

文章目录 核心组件1.什么是 Kubernetes 中的控制器&#xff08;Controller&#xff09;&#xff1f;请提供一些常见的控制器类型。2.请解释一下 Kubernetes 中的 Ingress 是什么&#xff0c;以及它的作用。3.如何通过命令行在 Kubernetes 中创建一个 Pod&#xff1f;4.Stateful…

关于IDEA没有显示日志输出?IDEA控制台没有显示Tomcat Localhost Log和Catalina Log 怎么办?

问题描述&#xff1a; 原因是;CATALINA_BASE里面没有相关的文件配置。而之前学习IDEA的时候&#xff0c;把这个文件的位置改变了。导致&#xff0c;最后输出IDEA的时候&#xff0c;不会把日志也打印出来。 检查IDEA配置; D:\work_soft\tomcat_user\Tomcat10.0\bin 在此目录下&…

初识软件工程

软件工程是一门涵盖软件开发、维护和管理的学科&#xff0c;它通过应用工程化的原则和方法来提高软件系统的质量和可靠性。在当今数字化和信息化的时代&#xff0c;软件工程对于现代社会的各个领域都具有至关重要的作用。 基本概念&#xff1f; 计算机系统中与硬件相互依存的一…

《深度学习工业缺陷检测》专栏介绍 CSDN独家改进实战

&#x1f4a1;&#x1f4a1;&#x1f4a1;深度学习工业缺陷检测 1&#xff09;提供工业小缺陷检测性能提升方案&#xff0c;满足部署条件&#xff1b; 2&#xff09;针对缺陷样品少等难点&#xff0c;引入无监督检测&#xff1b; 3&#xff09;深度学习 C、C#部署方案&#…

iPhone15线下购买,苹果零售店前门店排长队

今年的苹果新品发布会于北京时间 9 月 13 日凌晨举行&#xff0c;并于 9 月 15 日&#xff08;周五&#xff09;开启订购&#xff0c;9 月 22 日&#xff08;周五&#xff09;起正式发售。 据多位网友反馈&#xff0c;首批苹果 iPhone15 系列手机、Apple Watch Ultra 2 / Seri…

OceanBase杨传辉传递亚运火炬:国产数据库为“智能亚运”提供稳稳支持

9 月 14 日&#xff0c;亚运火炬传递到了浙江台州&#xff0c;OceanBase 的 CTO 杨传辉作为火炬手交接了第 89 棒火炬。 2010 年&#xff0c;杨传辉作为创始成员之一参与自研原生分布式数据库 OceanBase。十年磨一剑&#xff0c;国产数据库 OceanBase 交出了一张优秀的成绩单&a…

【vue2第十九章】手动修改ESlint错误 和 配置自动化修改ESlint错误

目标:认识代码规范 代码规范:一套写代码的约定规则。例如:“赋值符号的左右是否需要空格”&#xff0c;"一句结束是否是要加;”等 为什么要使用代码规范&#xff1f; 在团队开发时&#xff0c;提高代码的可读性。 在创建项目时&#xff0c;我们选择的就是一套完整的代码…

为什么定时发朋友圈会更有效呢?

这是因为在同一时段 发送的好友朋友圈 无法有效分散用户的注意力 导致曝光度难以提升 而通过推广定时发朋友圈 可根据自己的粉丝活跃度 设置发圈时间 让每一条朋友圈都能高效 传递到更多的好友手中 这样&#xff0c;曝光度自然而然地就大大提升了&#xff01; 1.多个号…

java用easyexcel按模版导出

首先在项目的resources下面建一个template包&#xff0c;之后在下面创建一个模版&#xff0c;模版格式如下&#xff1a; 名称为 financeReportBillStandardTemplateExcel.xlsx&#xff1a; {.fee}类型的属性值&#xff0c;是下面实体类的属性&#xff0c;要注意这里面的格式&a…

低代码助力企业数字化转型

在当今这个数字化快速发展的时代&#xff0c;企业面临的竞争越来越激烈&#xff0c;数字化转型已成为企业发展的必经之路。低代码平台作为一种新型的开发工具&#xff0c;正在逐渐成为企业数字化转型的重要助力。本文将从数字化转型背景、低代码平台介绍、低代码平台的应用、低…

【打开新世界大门】看测试老鸟如何把API 测试玩弄在鼓掌之间

【软件测试面试突击班】如何逼自己一周刷完软件测试八股文教程&#xff0c;刷完面试就稳了&#xff0c;你也可以当高薪软件测试工程师&#xff08;自动化测试&#xff09; 一、API 测试的基本步骤 我介绍过当今互联网产品的测试策略往往会采用菱形结构&#xff0c;即重量级 AP…

操作系统:体系结构

1.内核的划分 1.术语解释 时钟管理&#xff1a;利用时钟断实现计时功能。原语是一种特殊的程序,具有原子性。也就是说,这段程序的运行必须一气呵成&#xff0c;不可被“中断”Ubuntu、Centos的开发团队,其主要工作是实现非内核功能&#xff0c;而内核都是用了Linux内核。 内核…

exe文件运行后无输出直接闪退如何找解决办法

一.搜索栏搜事件查看器 二.点开windows日志下的应用程序 三.找到错误处 四.搜索异常代码 点开有错误的详细信息&#xff0c;直接用搜索引擎搜索这个异常代码能大致判断是什么问题&#xff0c;给了一个解决思路&#xff0c;不至于不知道到底哪里出了问题

报式套接字通讯实例

报式套接字通讯实例 使用套接字通讯流程 被动端&#xff08;先运行&#xff09; 1、取得SOCKET 2、给SOCKET取得地址 3、收/发消息 4、关闭SOCKET 主动端 1、取得SOCKET 2、给SOCKET取得地址&#xff08;可省略&#xff09; 3、发/收消息 4、关闭SOCKET 各部分代码实现 pr…

基于Yolov8的野外烟雾检测(4):通道优先卷积注意力(CPCA),效果秒杀CBAM和SE等 | 中科院2023最新发表

目录 1.Yolov8介绍 2.野外火灾烟雾数据集介绍 3.CPCA介绍 3.1 CPCA加入到yolov8 4.训练结果分析 5.系列篇 1.Yolov8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&#xff08;SOTA&#xff09;模型&a…

git:一、GIT介绍+安装+全局配置+基础操作

版本管理系统&#xff08;SVN和Git&#xff09;&#xff1a; 集中式版本控制系统&#xff08;SVN&#xff09; SVN是集中式版本控制系统&#xff0c;版本库是集中放在中央服务器的. 工作流程如下: 1.从中央服务器远程仓库下载代码 2.修改后将代码提交到中央服务器远程仓库…