[实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响

table.local-time-zone

  • table.local-time-zone
  • DataStream-to-Table Conversion(拓展知识)
  • 代码测试
  • flinksql代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)

如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)

DataStream-to-Table Conversion(拓展知识)

datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时

        // 水位线 允许乱序WatermarkStrategy<String> waterStrategy = WatermarkStrategy.<String>forMonotonousTimestamps() //ofSeconds(20).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {try {Mybook book= JSON.parseObject(element,Mybook.class);return boo.time+8*60*60*1000  //转成东八区}catch (Exception e){return recordTimestamp;}}}).withIdleness(Duration.ofSeconds(timeWindowIdleness));SingleOutputStreamOperator<UserSlotGame> processStream = env.fromSource(source, waterStrategy, "readKafka").process(new ProcessFunction<String, UserSlotGame>() {@Overridepublic void processElement(String value, Context ctx, Collector<UserSlotGame> out) throws Exception {// 省略}}) ;

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (`operation_code` int DEFAULT NULL,`update_time` varchar(255) DEFAULT NULL, -- 注意这是字符串`product_id` varchar(255) DEFAULT NULL,`product_name` varchar(255) DEFAULT NULL,`price` float DEFAULT NULL,`time_long` bigint NOT NULL DEFAULT '0' -- 注意这是long
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ciINSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:01:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:02:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_001', 'scooter', 12.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_002', 'basketball', 19.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(4, '2024-01-01 18:00:00', 'p_001', 'scooter', 12.99, 1730346179000);

flinksql代码

package com.pg.TableAndDataStreamApi;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;/*
*
* */
public class version_table {private static final String SOURCE="CREATE TABLE source_table(\n" +"\toperation_code int,\n" +"\tupdate_time string,\n" +"\tup_t AS TO_TIMESTAMP(update_time),\n" +"\ttime_long bigint,\n" +"\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n" +"    ) WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://ip:3306/flink',\n" +"   'driver'='com.mysql.cj.jdbc.Driver',\n "+"   'username'='用户名',\n"+"   'password'='密码',\n"+"   'table-name' = 'versioned_rates'\n" +")";public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql(SOURCE);Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");tableEnv.getConfig().addConfiguration(configuration);// 从 MySQL 表中选择所有行Table t = tableEnv.sqlQuery("select * from source_table");t.execute().print();}
}

执行结果截图

TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的

  1. 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
  2. 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1246.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

通过哪些性能指标来评估微调后的大模型实际业务效果?【大模型行业应用落地系列】

ct) 大模型应用场景探讨 ● 通过哪些性能指标来评估微调后的大模型实际业务效果&#xff1f; **【议题说明】**本议题主要探讨评估微调后大模型在实际业务场景中效果的性能指标&#xff0c;确保模型优化与业务目标一致。探讨该议题对用户企业具有多方面的价值&#xff0c;精确…

12-Docker发布微服务

12-Docker发布微服务 Docker发布微服务 搭建SpringBoot项目 新建一个SpringBoot项目 选择依赖项Spring Web和Spring Boot Actuator 在com.qi.docker_boot下创建controller目录&#xff0c;并在该目录下创建OrderController的java类 OrderControllerjava类的内容如下&#xf…

【大语言模型】ACL2024论文-06 探索思维链COT在多模态隐喻检测中的应用

【大语言模型】ACL2024论文-06 探索思维链COT在多模态隐喻检测中的应用 目录 文章目录 【大语言模型】ACL2024论文-06 探索思维链COT在多模态隐喻检测中的应用目录摘要研究背景问题与挑战如何解决创新点算法模型1. 知识总结模块&#xff08;Knowledge Summarization Module&…

HTML CSS

目录 1. 什么是HTML 2. 什么是CSS ? 3. 基础标签 & 样式 3.1 新浪新闻-标题实现 3.1.1 标题排版 3.1.1.1 分析 3.1.1.2 标签 3.1.1.3 实现 3.1.2 标题样式 3.1.2.1 CSS引入方式 3.1.2.2 颜色表示 3.1.2.3 标题字体颜色 3.1.2.4 CSS选择器 3.1.2.5 发布时间字…

应用在汽车控制系统安全气囊的爱普生可编程晶振SG-8018CG

在汽车安全领域&#xff0c;安全气囊是保护驾乘人员生命安全的关键防线。而作为安全气囊控制系统的关键元件 —— 爱普生可编程晶振 SG - 8018CG&#xff0c;以其卓越的性能成为汽车安全的坚实守护者。 一、高精度频率输出&#xff1a;安全气囊触发的精准之选 在汽车安全气囊控…

第112届全国糖酒会(3月成都)正式官宣!

作为食品饮料行业内备受瞩目的年度盛事&#xff0c;全国糖酒商品交易会&#xff08;简称“糖酒会”&#xff09;一直是各大厂商与经销商展现企业风采、寻觅合作伙伴及签署订单的关键舞台。2024年10月31日&#xff0c;第111届全国糖酒商品交易会&#xff08;秋糖&#xff09;在深…

【Javaee】网络原理-http协议(二)

前言 上一篇博客初步介绍了抓包工具的安装及使用&#xff0c;介绍了http请求报文与响应报文的格式。​​​​​​【Javaee】网络原理—http协议&#xff08;一&#xff09;-CSDN博客 本篇将详细介绍http的方法和http报文中请求头内部键值对的含义与作用&#xff0c;以及常见状…

Python实现摇号系统:详细指南与案例解析

目录 一、摇号系统的基本概念与原理 二、摇号系统的准备工作 三、摇号系统的详细实现步骤 1. 数据读取 2. 随机摇号 3. 结果存储 4. 结果查询 5. 主函数 四、案例解析 五、常见问题与解答 如何确保摇号过程的公平性&#xff1f; 如何处理大量用户数据&#xff1f; …

python将数据集中所有文件名升序制作txt文件(医学影像)

import os import re # 设定图像文件所在的路径 img_path ./2d/images/ #需修改路径 # 获取该路径下的所有文件名 img_list os.listdir(img_path) # 过滤出以.nii结尾的文件名 nii_list [f for f in img_list if f.endswith(.nii)] # 使用正则表达式从文件名中提…

Stable diffusion 3.5本地运行环境配置记录

1.环境配置 创建虚环境 conda create -n sd3.5 python3.10Pytorch(>2.0) conda install pytorch2.2.2 torchvision0.17.2 torchaudio2.2.2 pytorch-cuda12.1 -c pytorch -c nvidiaJupyter能使用Anaconda虚环境 conda install ipykernel python -m ipykernel install --user …

2001-2023年A股上市公司数字化转型数据(MDA报告词频统计)(三种方法)

2001-2023年A股上市公司数字化转型数据&#xff08;MD&A报告词频统计&#xff09;&#xff08;三种方法&#xff09; 1、时间&#xff1a;2001-2023年 2、来源&#xff1a;上市公司MD&A报告 3、指标&#xff1a;年份、股票代码、股票简称、行业名称、行业代码、MD&a…

【力扣专题栏】字母异词分组,如何利用强大的容器(unordered_map)解决该问题?

题解目录 1、题目描述解释2、算法原理解析3、代码编写 1、题目描述解释 2、算法原理解析 3、代码编写 class Solution { public:vector<vector<string>> groupAnagrams(vector<string>& strs) {//创建哈希表unordered_map<string,vector<string&g…

基于python的语音识别与蓝牙通信的温控系统毕设项目

基于python的语音识别与蓝牙通信的温控系统毕设项目 大家好&#xff0c;我是俊星学长&#xff0c;一名在 Java 圈辛勤劳作的码农。今日&#xff0c;要和大家分享的是一款基于python的语音识别与蓝牙通信的温控系统毕设项目。项目源码以及部署相关事宜&#xff0c;请联系小村学…

MySQL第四次作业

一、题目要求 二、创建相关的数据库和表以及表的处理 1. 修改student 表中年龄(sage)字段属性&#xff0c;数据类型由int 改变为smallint 2.为Course表中Cno 课程号字段设置索引,并查看索引 3.为SC表建立按学号(sno)和课程号(cno)组合的升序的主键索引&#xff0c;索引名为SC_I…

Docker-安装

操作系统&#xff1a;Ubuntu 20.04.6 LTS 更新apt sudo apt update 删除旧版本docker sudo apt-get remove docker docker-engine docker.io 安装docker sudo apt install docker.io 查看docker版本 docker --version 启动docker 启动docker sudo systemctl start docker 启用…

Elasticsearch 安装教程:驾驭数据海洋的星际导航仪

目录 一、准备工作1. ES的下载 二、安装步骤三、注意事项四、启动报错1. org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root2. max virtual memory areas vm.max_map_count [65530] is too low, increase to at l…

SparkSQL整合Hive后,如何启动hiveserver2服务

当spark sql与hive整合后&#xff0c;我们就无法启动hiveserver2的服务了&#xff0c;每次都要先启动hive的元数据服务&#xff08;nohup hive --service metastore&#xff09;才能启动hive,之前的beeline命令也用不了&#xff0c;hiveserver2的无法启动&#xff0c;这也导致我…

【网络安全】揭示 Web 缓存污染与欺骗漏洞

未经许可,不得转载。 文章目录 前言污染与欺骗Web 缓存污染 DoS1、HTTP 头部超大 (HHO)2、HTTP 元字符 (HMC)3、HTTP 方法覆盖攻击 (HMO)4、未键入端口5、重定向 DoS6、未键入头部7、Host 头部大小写规范化8、路径规范化9、无效头部 CP-DoS10、HTTP 请求拆分Web 缓存污染与有害…

网络自动化01:netmiko基础、netmiko简单demo

本系列应该是记录我在网络自动化中的学习、使用。具体更新多少期、什么频率都不太清楚。 同时本文的记录方式不会是那么的符合学习的思路&#xff0c;需要更加详细的内容建议阅读官方文档等。 本人学习的路径是基于九净老师的NetDevOps加油站&#xff0c;但本文有所简化&#x…

一篇文章理解CSS垂直布局方法

方法1&#xff1a;align-content: center 在 2024 年的 CSS 原生属性中允许使用 1 个 CSS 属性 align-content: center进行垂直居中。 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewpo…