大数据Flink(一百一十八):Flink SQL水印操作(Watermark)

文章目录

Flink SQL水印操作(Watermark)

一、为什么要有WaterMark

二、​​​​​​​​​​​​​​Watermark解决的问题

三、​​​​​​​​​​​​​​代码演示


Flink SQL水印操作(Watermark)

一、​​​​​​​为什么要有WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、​​​​​​​​​​​​​​Watermark解决的问题

上面的问题在于如何将迟来的EventTime 为11的元素正确处理?

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下: 

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。

watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。

三、​​​​​​​​​​​​​​代码演示

  • 使用Socket模拟接收数据
  • 设置WaterMark
    • 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
  • 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

若输入第一条数据:hello,2022-03-25 16:39:45

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000

3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算

根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

 Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。

数据乱序的场景

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃

乱序时间的设置:

为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据

修改最大乱序时间(新建的表仅水印与之前不同)

CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

在监听终端中,输入数据

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink输出结果:  

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1538691.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

《黑神话悟空》黄眉打法技巧图文攻略详解

​黄眉是黑神话悟空第三章的关底的boss,很多的玩家都非常的好奇这个boss到底要怎么打,这里小编就为大家带来了黄眉这个boss的打法,我们不要使用法术,只使用禁字诀就可以击败这个boss,详细的内容可以在这里进行了解和查…

DevEco Profiler调优工具(二)

一、Profiler调优模板 3、Snapshot Insight 4、CPU Insight 5、Frame Insight 6、Launch Insight

硬件(驱动开发)

一、OSC基本架构(片上系统) OSC(On-chip System Control,片上系统控制)基本架构通常涉及片上系统中的各个组件如何进行协调与控制,以实现高效的处理、通信和管理。OSC架构在现代微处理器和系统单芯片&…

WebApi开发中依赖注入和RESTful 详解

Web API 开发中的依赖注入和 RESTful 详解 在现代 Web API 开发中,依赖注入(Dependency Injection, DI)和 RESTful 架构 是两个极为重要的概念。本文将详细探讨它们的定义、应用场景及在 Web API 开发中的最佳实践。 一、依赖注入 (Depende…

[PICO VR眼镜]眼动追踪串流Unity开发与使用方法,眼动追踪打包报错问题解决(Eye Tracking/手势跟踪)

前言 最近在做一个工作需要用到PICO4 Enterprise VR头盔里的眼动追踪功能,但是遇到了如下问题: 在Unity里面没法串流调试眼动追踪功能,根本获取不到Device,只能将整个场景build成APK,安装到头盔里,才能在…

【java面向对象二】static(一)

文章目录 前言一、static修饰成员变量二、static修饰成员变量的应用场景三、static修饰成员方法四、搞懂main方法总结 前言 学习static修饰类变量,类方法,以及main方法的使用。 一、static修饰成员变量 static 叫静态,可以修饰成员变量&…

高密原型验证系统解决方案(下篇)

0 引言 我们在上篇中和大家探讨了用户在进行大规模 复杂 SoC 设计原型验证时在全局时钟及复位同步, 大规模设计分割以及高速接口与先进 Memory 控制 器 IP 验证等方面遇到的关键困难,并提出了相应的 解决方案帮助用户来克服这些困难。接下来我们会 和用户…

Django ORM(多表)

文章目录 前言一、关联关系模型二、一对多写入数据二、多对多写入数据二、跨表查询1.查找test 标签的文章2.查找作者名为 test 的文章及标签 三、跨表删除 前言 表与表之间的关系可分为以下三种: 一对一: 一对一关系表示一个模型的每个实例与另一个模型的每个实例…

华为OD机试 - 字符串划分(Java 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(E卷D卷A卷B卷C卷)》。 刷的越多,抽中的概率越大,私信哪吒,备注华为OD,加…

Python | Leetcode Python题解之第416题分割等和子集

题目&#xff1a; 题解&#xff1a; class Solution:def canPartition(self, nums: List[int]) -> bool:n len(nums)if n < 2:return Falsetotal sum(nums)if total % 2 ! 0:return Falsetarget total // 2dp [True] [False] * targetfor i, num in enumerate(nums…

最低成本的游戏串流方案分享 如何自己打造云电脑?

今天教大家如何最低成本实现串流 出门在外也可以随时随地游玩端游大作 硬件准备&#xff1a;一台电脑 手机/平板一台 软件&#xff1a;Gameviewer远程 为啥不用moonlight等其他软件呢 因为设置公网穿透等复杂操作对小白来说不太友好 而GameViewer从安装到使用仅需一键 对比同类…

MATLAB系列07:输入/输入函数

MATLAB系列07&#xff1a;输入/输入函数 8. 输入/输入函数8.1 函数textread8.2 关于load和save命令的进一步说明8.3 MATLAB文件过程简介8.4 文件的打开和关闭8.4.1 fopen函数8.4.2 fclose函数 8.5 二进制 I/O 函数8.5.1 fwrite 函数8.5.2 fread函数 8.6 格式化 I/O 函数8.6.1 f…

【C++ 差分数组 前后缀分解】P7404家庭菜园

本文涉及知识点 C差分数组 C前后缀分解 P7404家庭菜园 出自洛谷&#xff0c;我简述一下。 已知数组a&#xff0c;长度为n(1<n<2e5),1 <a[i] <1e9。一次操作如下&#xff1a;将a[i…j]全1。问最少操作多少次&#xff0c;使得a成为山形数组&#xff0c;即存在k&am…

力扣题解2332

大家好&#xff0c;欢迎来到无限大的频道。 今日继续给大家带来力扣题解。 题目描述&#xff08;中等&#xff09;​&#xff1a; 坐上公交的最晚时间 给你一个下标从 0 开始长度为 n 的整数数组 buses &#xff0c;其中 buses[i] 表示第 i 辆公交车的出发时间。同时给你一…

算法题——最小会议室数量

题目1: 1.假定会议时间从凌晨零点开始&#xff0c;即 0<si<ei<1440&#xff0c;si和ei代表当天某一时刻从零点开始计算的开始和结束分钟数&#xff0c;如[90,360]&#xff0c;表示会议时间从1:30到6:00。 2.不考虑房间的容量问题&#xff0c;并且公司为了提升大家的体…

计算机组成原理-存储系统(二)半导体存储器

2.1DAM芯片 分类&#xff1a; DRAM芯片&#xff1a;使用栅极电容存储信息SRAM芯片&#xff1a;使用双稳态触发器存储信息 核心区别&#xff1a;储存元不一样 2.2DRAM和SRAM的比较 对于DRAM中&#xff1a; 1&#xff1a;电容内存储了电荷0&#xff1a;电容内未存储电荷 DR…

HTML讲解(一)body部分

目录 1.什么是HTML 2.HTML基本框架 3.标题声明 4.修改标题位置 5.段落声明 6.修改段落位置 7.超链接访问 8.图像访问 9.改变网页背景及文本颜色 10.添加网页背景图 11.超链接改变颜色 12.设置网页边距 小心&#xff01;VS2022不可直接接触&#xff0c;否则&#xff…

Qt窗口——QMenuBar

文章目录 QMenuBar示例演示给菜单栏设置快捷键给菜单项设置快捷键添加子菜单添加分割线添加图标 QMenuBar Qt中采用QMenuBar来创建菜单栏&#xff0c;一个主窗口&#xff0c;只允许有一个菜单栏&#xff0c;位于主窗口的顶部、主窗口标题栏下面&#xff1b;一个菜单栏里面有多…

【技术解析】消息中间件MQ:从原理到RabbitMQ实战(深入浅出)

文章目录 【技术解析】消息中间件MQ&#xff1a;从原理到RabbitMQ实战(深入浅出)1.简介1.1 什么是消息中间件1.2 传统的http请求存在那些缺点1.3 Mq应用场景有那些1.4 为什么需要使用mq1.5 Mq与多线程之间区别1.6 Mq消息中间件名词1.7主流mq区别对比1.8 Mq设计基础知识 2.Rabbi…

python画图|3D bar进阶探索

前述学习过程只能怪&#xff0c;已经探究了3D直方图的基础教程&#xff0c;详见下述链接&#xff1a; python画图|3D直方图基础教程-CSDN博客 实际上&#xff0c;基础文章直接进入了堆叠教程&#xff0c;相对来说基础的程度不够&#xff0c;因此有必要再次探索。 【1】官网教…