Flink CDC MySQL同步MySQL错误记录

1、启动 Flink SQL

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

在这里插入图片描述

2、新建源表

问题1:Encountered “(”
处理方法:去掉int(11),改为int

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int(11) DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
Was expecting one of:"CONSTRAINT" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` datetime ,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'Flink SQL> 

创建成功:

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` TIMESTAMP(3),
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[INFO] Execute statement succeed.Flink SQL> 

3、创建目标表

Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );

4、将源表加载到目标表

错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

Flink SQL> insert into t_user select * from ods_t_user;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.Flink SQL> 

原因:方向搞反了,插入表应该是目标表

Flink SQL> insert into ods_t_user select * from t_user;
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block dataFlink SQL> 

错误2:unexpected block data
解决办法:
(1)更新jar包如下

[appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
total 223320
-rw-r--r-- 1 appuser appuser    196491 May 19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 appuser appuser    542620 May 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 appuser appuser    266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
-rw-r--r-- 1 appuser appuser    345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
-rw-r--r-- 1 appuser appuser    102472 May 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 appuser appuser   8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1 appuser appuser    180248 May 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 appuser appuser  21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 appuser appuser  15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 appuser appuser  38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 appuser appuser   3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 appuser appuser    208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser    301872 May 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser   1790452 May 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 appuser appuser     24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-r--r-- 1 appuser appuser   2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
[appuser@whtpjfscpt01 flink-1.17.1]$

(2)重启flink

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host whtpjfscpt01.
Starting taskexecutor daemon on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

(3)重新执行

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
>  );
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );
[INFO] Execute statement succeed.Flink SQL>

(4)成功执行

Flink SQL> insert into ods_t_user select * from t_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c2e69d061f3777c031b0acb4ec03d13a

在这里插入图片描述

错误3:无目标表
在这里插入图片描述

 CREATE TABLE demo.ods_t_user (`uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',`did` int(11) DEFAULT NULL COMMENT 'dept id',`username` varchar(14) DEFAULT NULL,`add_time` datetime DEFAULT NULL,PRIMARY KEY (`uid`) 
) 

在这里插入图片描述
源表添加新纪录

INSERT INTO test.t_user(did,username)values('3','test'); 

目标表自动同步数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145516.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

1 论文笔记:Efficient Trajectory Similarity Computation with ContrastiveLearning

2022CIKM 1 intro 1.1 背景 轨迹相似度计算是轨迹分析任务(相似子轨迹搜索、轨迹预测和轨迹聚类)最基础的组件之一现有的关于轨迹相似度计算的研究主要可以分为两大类: 传统方法 DTW、EDR、EDwP等二次计算复杂度O(n^2)缺乏稳健性 会受到非…

【Linux】进程控制基础知识

目录 一,fack回顾 二,进程终止 1.进程终止,操作系统做了什么? 2.进程终止,常见的方式 1.main函数的,return 返回码 2. exit()函数 三,进程等待 1. 回收进程方法 (1. wait…

Node.js 学习笔记

小插件Template String Converter 当输入${}时,自动为其加上 反引号 一、node入门 node.js是什么 node的作用 开发服务器应用 开发工具类应用 开发桌面端应用 1.命令行工具 命令的结构 常用命令 切换到D盘——D: 查看D盘目录——dir 切换工作目录——c…

FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程

FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程 FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程实例 FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程 实例 ffmpeg -i test_1920x1080.mp4 -acodec copy -vcodec libx264 -s 1280x…

ElasticSearch - 基于 DSL 、JavaRestClient 实现数据聚合

目录 一、数据聚合 1.1、基本概念 1.1.1、聚合分类 1.1.2、特点 1.2、DSL 实现 Bucket 聚合 1.2.1、Bucket 聚合基础语法 1.2.2、Bucket 聚合结果排序 1.2.3、Bucket 聚合限定范围 1.3、DSL 实现 Metrics 聚合 1.4、基于 JavaRestClient 实现聚合 1.4.1、组装请求 …

XSS详解

XSS一些学习记录 XXS短标签、属性、事件、方法短标签属性事件函数弹窗函数一些对于绕过有用的函数一些函数使用payload收集 浏览器编码问题XML实体编码URL编码JS编码混合编码 一些绕过方法利用constructor原型污染链构造弹框空格绕过圆括号过滤绕过其他的一些绕过 参考 XXS短标…

Zygisk-IL2CppDumper对抗方案

众所周知,Unity引擎中有两种脚本编译器,分别是 Mono 和 IL2CPP 。这两种脚本编译器各有优势,同时也存在一些安全性问题,本文将从游戏安全角度对其进行分析并提供对策。 Mono 是由跨平台的开源.NET 实现,它允许开发者使…

关于 自定义的RabbitMQ的RabbitMessageContainer注解-实现原理

概述 RabbitMessageContainer注解 的主要作用就是 替换掉Configuration配置类中的各种Bean配置; 采用注解的方式可以让我们 固化配置,降低代码编写复杂度、减少配置错误情况的发生,提升编码调试的效率、提高业务的可用性。 为什么说“降低…

PMSM——转子位置估算基于QPLL

文章目录 前言仿真模型观测器速度观测位置观测转矩波形电流波形 前言 今后是电机控制方向的研究生的啦,期待有同行互相交流。 仿真模型 观测器 速度观测 位置观测 转矩波形 电流波形

QSS之QScrollArea

QScrollArea在实际的开发过程中经常使用,主要是有些界面一屏显示不下,所以得用QScorllArea带滚动条拖动显示剩余的界面。默认的QScrollArea滚动条不满设计的风格,因此我们必须设置自已的滚动条风格,QScrollBar分为水平horizontal和…

Spring整合RabbitMQ——生产者

1.生产者整合步骤 添加依赖坐标,在producer和consumer模块的pom文件中各复制一份。 配置producer的配置文件 配置producer的xml配置文件 编写测试类发送消息

2023 年前端 UI 组件库概述,百花齐放!

UI组件库提供了各种常见的 UI 元素,比如按钮、输入框、菜单等,只需要调用相应的组件并按照需求进行配置,就能够快速构建出一个功能完善的 UI。 虽然市面上有许多不同的UI组件库可供选择,但在2023年底也并没有出现一两个明确的解决…

【C++】map、set,multiset和multimap的使用及底层原理【完整版】

目录 一、map和set的使用 1、序列式容器和关联式容器 2、set的使用讲解 3、map的使用讲解 二、multiset和multimap 1、multiset和multimap的使用 2、OJ题:前k个高频单词 一、map和set的使用 1、序列式容器和关联式容器 序列式容器:vector/list/s…

基于微信小程的流浪动物领养小程序设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言系统主要功能:具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding)有保障的售后福利 代码参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计…

CSS 选择器Day01

CSS 定义:层叠样式表(Cascading Style Sheets,缩写为 CSS),是一种用于定义网页或文档的外观和样式的标记语言。 CSS是一种 样式表 语言,用来描述 HTML 文档的呈现 (美化内容)。它用于控制文本的字体、颜色、间距、布局、背景等各…

如何使用Docker安装最新版本的Redis并设置远程访问(含免费可视化工具)

文章目录 安装Docker安装Redisredis.conf文件远程访问Redis免费可视化工具相关链接Docker是一种开源的应用容器引擎,使用Docker可以让我们快速部署应用环境,本文介绍如何使用Docker安装最新版本的Redis。 安装Docker 首先需要安装Docker,具体的安装方法可以参考Docker官方文…

C++标准模板库STL——list的使用及其模拟实现

1.list的介绍 list的文档介绍 1. list是可以在常数范围内在任意位置进行插入和删除的序列式容器,并且该容器可以前后双向迭代。 2. list的底层是双向链表结构,双向链表中每个元素存储在互不相关的独立节点中,在节点中通过指针指向 其前一个…

Error: node: unknown or unsupported macOS version: :dunno 错误解决

一、原因 今天安装 brew install node报错了,错误信息如下: 二、解决方案 1)查找homebrew-cask安装位置 echo $(brew --repo homebrew/homebrew-cask) // 输出 /opt/homebrew/Library/Taps/homebrew/homebrew-cask2)使用 gi…

Win11下无法打开丛林之狐,提示未检测到DirectX 8.1

新装的win11系统,打开丛林之狐提示未检测到DirectX 8.1. 运行dxdiag检查DirectX版本: DX版本已经是12了: 最终参考了这篇文章解决了: 罪恶都市出现XX-directx version 8.1处理方法 - 知乎 控制面板 > 程序 > 启用或关闭Wi…

AI-FGNet降噪算法

上一篇文章介绍AI-CGNet降噪算法和AI-GruNet降噪算法,本篇文章介绍一个新的轻量级降噪做法AI-FGNet。 一、模型结构 AI-FGNet网络相比AI-GruNet,额外添加一层全连接实现特征的维度变换,作为频谱压缩、控制计算量的一种手段。此外&#xff0c…