玩转数据-大数据-Flink SQL 中的时间属性

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145788.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

排序篇(二)----选择排序

排序篇(二)----选择排序 1.直接选择排序 基本思想&#xff1a; 每一次从待排序的数据元素中选出最小&#xff08;或最大&#xff09;的一个元素&#xff0c;存放在序列的起始位置&#xff0c;直到全部待排序的数据元素排完 。 直接选择排序: ​ 在元素集合array[i]–array[…

Spring修炼之路(1)基础入门

一、简介 1.1Spring概述 Spring框架是一个轻量级的Java开发框架&#xff0c;它提供了一系列底层容器和基础设施&#xff0c;并可以和大量常用的开源框架无缝集成&#xff0c;可以说是开发Java EE应用程序的必备。Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器&…

虾皮商品详情数据接口

虾皮商品详情数据接口可以提供众多API读取内容&#xff0c;可传输大量数据&#xff0c;数据更新速度尤其快&#xff0c;保证了跨境电商接口服务数据的及时性及准确性&#xff1b;安全性强&#xff1a;使用SSL及虾皮网自主的安全技术&#xff0c;确保了跨境电商接口服务数据的安…

【2023保研】双非上岸东南网安

个人情况 学校&#xff1a;henu 专业&#xff1a;信息安全 排名&#xff1a;1/66 英语&#xff1a;六级500 竞赛&#xff1a;蓝桥杯PB国一&#xff0c;ISCC国一&#xff0c;密码数学挑战赛国三&#xff0c;还有其他一些省级水奖 论文&#xff1a;一篇EI在投&#xff08;三作通…

[Qt]QListView 重绘实例之二:列表项覆盖的问题处理

0 环境 Windows 11Qt 5.15.2 MinGW x64 1 系列文章 简介&#xff1a;本系列文章&#xff0c;是以纯代码方式实现 Qt 控件的重构&#xff0c;尽量不使用 Qss 方式。 《[Qt]QListView 重绘实例之一&#xff1a;背景重绘》 《[Qt]QListView 重绘实例之二&#xff1a;列表项覆…

elementui引入弹出框报错:this.$alert is not defined 解决方案

1.按需引入文件element.js 注意&#xff1a;引入Message&#xff0c;MessageBox两个组件就行&#xff0c;alert包括在MessageBox里面了。 之前我引入了Alert组件&#xff0c;发现不行 2.在vue的prototype里注册伪名字 3.组件里直接调用就行了 4.实现效果 我发现elementui调用…

【C++进阶】:C++11

C11 一.统一列表的初始化1.{}初始化2.initializer_list 二.声明1.decltype2.nullptr 三.右值引用和移动语义1.左值和右值1.转义语句2.完美转发 四.可变参数模板1.基本概念2.STL里emplace类接口 五.lambda表达式六.新的类功能 一.统一列表的初始化 1.{}初始化 在C98中&#xf…

图像处理: 马赛克艺术

马赛克 第一章 马赛克的历史渊源 1.1 马赛克 艺术中的一种表面装饰&#xff0c;由紧密排列的、通常颜色各异的小块材料&#xff08;如石头、矿物、玻璃、瓷砖或贝壳&#xff09;组成。与镶嵌不同的是&#xff0c;镶嵌是将要应用的部件放置在已挖空以容纳设计的表面中&#xff0…

【教学类-35-03】学号+姓名+班级(小3班)学号字帖(A4竖版2份)

图片展示: 背景需求: 本周排到小3班&#xff0c;还没有来得及设计小班主题活动书的内容&#xff0c;于是就把小2班的学号字帖微调一下&#xff0c;做一份竖版2份的学号字帖。 让幼儿熟悉自己的学号&#xff0c;让我也熟悉幼儿的名字和学号 材料准备&#xff1a; 描字写&#…

关于RabbitMQ你了解多少?

关于RabbitMQ你了解多少&#xff1f; 文章目录 关于RabbitMQ你了解多少&#xff1f;基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性…

妙不可言的Python之旅----(一)

初识Python python的起源 1989年&#xff0c;为了打发圣诞节假期&#xff0c;Gudio van Rossum吉多 范罗苏姆&#xff08;龟叔&#xff09;决心开发一个新的解释程序&#xff08;Python雏形&#xff09; 1991年&#xff0c;第一个Python解释器诞生 Python这个名字&#xff…

Golang中的包和模块设计

Go&#xff0c;也被称为Golang&#xff0c;是一种静态类型、编译型语言&#xff0c;因其简洁性和对并发编程的强大支持而受到开发者们的喜爱。Go编程的一个关键方面是其包和模块系统&#xff0c;它允许创建可重用、可维护和高效的代码。本博客文章将深入探讨在Go中设计包和模块…

Servlet操作与用法(保姆式教学)

Servlet介绍 什么是servlet Servlet&#xff08;Servlet Applet的缩写&#xff0c;全称 Java Servlet&#xff09;&#xff1a;适用于Java编写的服务器程序&#xff0c;其主要功能是在于交互式的浏览和修改数据&#xff0c;生成动态Web内容。狭义的Servlet是指Java语言实现的…

MySQL学习笔记27

MySQL主从复制的核心思路&#xff1a; 1、slave必须安装相同版本的mysql数据库软件。 2、master端必须开启二进制日志&#xff0c;slave端必须开启relay log 日志。 3、master主服务器和slave从服务器的server-id号不能一致。 4、slave端配置向master端来同步数据。 master…

云安全之访问控制的常见攻击及防御

访问控制攻击概述 访问控制漏洞即应用程序允许攻击者执行或者访问某种攻击者不具备相应权限的功能或资源。 常见的访问控制可以分为垂直访问控制、水平访问控制及多阶段访问控制 (上下文相关访问控制)&#xff0c;与其相应的访问控制漏洞为也垂直越权漏洞(普通用户可以访问或…

【QT】使用toBase64方法将.txt文件的明文变为非明文(类似加密)

目录 0.环境 1.背景 2.详细代码 2.1 .h主要代码 2.2 .cpp主要代码&#xff0c;主要实现上述的四个方法 0.环境 windows 11 64位 Qt Creator 4.13.1 1.背景 项目需求&#xff1a;我们项目中有配置文件&#xff08;类似.txt&#xff0c;但不是这个格式&#xff0c;本文以…

计算机网络之传输层

计算机网络 - 传输层 计算机网络 - 传输层 UDP 和 TCP 的特点UDP 首部格式TCP 首部格式TCP 的三次握手TCP 的四次挥手TCP 可靠传输TCP 滑动窗口TCP 流量控制TCP 拥塞控制 1. 慢开始与拥塞避免2. 快重传与快恢复 网络层只把分组发送到目的主机&#xff0c;但是真正通信的并不是…

leetcode-----二叉树习题

目录 前言 1. 二叉树的中序遍历 2. 相同的树 3. 二叉树的最大深度 4. 二叉树的最小深度 5.二叉树的前序遍历 6. 二叉树的后序遍历 7. 对称二叉树 前言 前面我们学习过了二叉树的相关知识点&#xff0c;那么今天我们就做做练习&#xff0c;下面我会介绍几道关于二叉树的…

JUnit介绍

JUnit是用于编写和运行可重复的自动化测试的开源测试框架&#xff0c; 这样可以保证我们的代码按预期工作。JUnit可广泛用于工业和作为支架(从命令行)或IDE(如Eclipse)内单独的Java程序。 JUnit提供&#xff1a; 断言测试预期结果。 测试功能共享通用的测试数据。 测试套件轻…

Java实现word excel ppt模板渲染与导出及预览 LibreOffice jodconverter

Java Office 一、文档格式转换 文档格式转换是office操作中经常需要进行一个操作&#xff0c;例如将docx文档转换成pdf格式。 java在这方面有许多的操作方式&#xff0c;大致可以分为内部调用&#xff08;无需要安装额外软件&#xff09;&#xff0c;外部调用&#xff08;需…