flink-connector-mysql-cdc:01 mysql-cdc础配置代码演示

flink-connector-mysql-cdc:

  • 01 mysql-cdc基础配置代码演示
  • 02 mysql-cdc高级扩展
  • 03 mysql-cdc常见问题汇总
  • 04 mysql-cdc-kafka生产级代码分享
  • 05 flink-kafka-doris生产级代码分享
  • 06 flink-kafka-hudi生产级代码分享

flink-cdc版本:3.2.0

flink版本:flink-1.18.0

mysql版本:8.0.26

java版本:1.8

maven版本:3.8.4

目录

1. Mysql数据库设置

1.1 开启binlog日志

1.2 创建用户

1.3 准备测试数据 

2. 编写测试代码

2.1 maven 依赖

2.2 测试代码

3. mysql-cdc扩展

3.1 时区设置

3.2 为每个读取器设置不同的 SERVER ID


1. Mysql数据库设置

1.1 开启binlog日志

编辑 MySQL 配置文件

  • 在 Unix/Linux 系统中,通常是 /etc/my.cnf 或 /etc/mysql/my.cnf

  • 在 Windows 上,可能位于 C:\ProgramData\MySQL\MySQL Server X.Y\my.ini

# 在 mysqld 部分下添加以下内容(如果已经存在,请确认其值):
[mysqld]
log-bin=mysql-bin  # 二进制日志文件前缀,MySQL将生成名为 mysql-bin.000001, mysql-bin.000002 等的文件。
binlog-format=row   # 设置二进制日志格式为行级(row),可选值为 STATEMENT、ROW 和 MIXED;这里推荐使用行级。
expire_logs_days=7  # 设置二进制日志的过期时间,单位为天,超过这个天数后的日志将被自动删除,这里以 7 天为例。
max-binlog-size=100M # 设置单个二进制日志文件的最大大小,超出后将自动创建一个新的日志文件(可以根据需要调整)。

1.2 创建用户

以 “flinkcdc”用户为例

# 创建 MySQL 用户:
CREATE USER 'flinkcdc'@'localhost' IDENTIFIED BY '123456';
# 向用户授予所需的权限:
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY '123456';
# 授权所有权限
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'localhost';
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'%';
# 完成用户的权限:
FLUSH PRIVILEGES;

1.3 准备测试数据 

# 使用flinkcdc用户登录数据库# 创建测试数据库
create database cdc_demo;
# 创建测试表
CREATE TABLE cdc_demo.flink_cdc_test (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50) NOT NULL,description TEXT,age INT,balance DECIMAL(10, 2),is_active BOOLEAN DEFAULT TRUE,created_at DATETIME DEFAULT CURRENT_TIMESTAMP,birth_date DATE,long_value BIGINT,last_login TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
# 插入测试数据
INSERT INTO cdc_demo.flink_cdc_test (name, description, age, balance, is_active, created_at, birth_date, long_value, last_login) VALUES
('Alice Smith', 'Alice is a software engineer with 5 years of experience.', 30, 2500.50, TRUE, '2023-01-01 10:00:00', '1992-05-15', 12345678901234, '2023-05-20 10:00:00'),
('Bob Johnson', 'Bob enjoys hiking and outdoor activities.', 25, 1500.00, TRUE, '2023-02-15 12:30:00', '1998-08-22', 987654321054321, '2023-05-18 14:00:00'),
('Charlie Brown', 'Charlie is an avid reader and coffee lover.', 35, 3200.75, FALSE, '2023-03-22 14:45:00', '1988-01-11', 135792468012345, '2023-05-19 09:20:00'),
('Daisy Miller', 'Daisy loves painting and traveling.', 28, 1800.25, TRUE, '2023-04-05 09:15:00', '1994-11-03', 24681357901234, '2023-05-21 12:30:00'),
('Ethan White', 'Ethan enjoys playing guitar and writing songs.', 40, 5000.00, TRUE, '2023-05-18 16:20:00', '1983-07-30', 98765432102468, '2023-05-22 15:00:00');

2. 编写测试代码

2.1 maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toroidal</groupId><artifactId>flink-connector-mysql-cdc-demo</artifactId><name>flink-connector-mysql-cdc-demo</name><version>1.0-SNAPSHOT</version><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public/</url></repository><repository><id>mirrorId</id><name>Human Readable Name for this Mirror.</name><url>http://my.repository.com/repo/path</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><flinkcdc.version>3.2.0</flinkcdc.version><mysql.version>8.0.26</mysql.version><log4j.version>2.17.1</log4j.version><lombok.version>1.18.24</lombok.version><fastjson.version>1.2.83</fastjson.version></properties><dependencies><!-- flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><!-- mysql-cdc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flinkcdc.version}</version></dependency><!-- mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- log --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.9.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 设置jar包的入口类(可选) --><mainClass>com.toroidal.mysql.MysqlCdcStreamApp</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

2.2 测试代码

package com.toroidal.mysql;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;/*** @Author Toroidal* @Date 2024/12/04 14:42* @Version 1.0*/
public class MysqlCdcStreamApp {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(4).print().setParallelism(1);env.execute("Print MySQL Snapshot + Binlog");}
}

运行结果 

3. mysql-cdc扩展

3.1 时区设置

mysql-cdc读取出来的 timestamp 字段时区相差8小时,将时区和MySQL服务器时区设置一致即可:

查询当前数据库时区:

SELECT * FROM mysql.time_zone_name;

设置时区为东八时区

.serverTimeZone("Asia/Shanghai")

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test").serverTimeZone("Asia/Shanghai")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

3.2 为每个读取器设置不同的 SERVER ID

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 ID,称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 ID,则可能会导致从错误的 binlog 位置读取。 

.serverId("flink-cdc-01")
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306)// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc.flink_cdc_test").username("flinkcdc").serverTimeZone("Asia/Shanghai").serverId("flink-cdc-01").password("123456")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/33767.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

工业-实时数据采集

1.编写新的 Flume 配置文件&#xff0c;将数据备份到 HDFS 目录 /user/test/flumebackup 下&#xff0c;要求所有主题 的数据使用同一个 Flume配置文件完成。 1. 配置概览 Flume 的主要任务是从多个来源&#xff08;如日志文件&#xff09;读取数据&#xff0c;经过处理后通过…

mmdet 加载预训练模型多卡训练过程中,存在显卡占用显存不均匀

1. 问题描述 基于mmdet https://github.com/open-mmlab/mmdetection代码仓库&#xff0c;修改了自己的检测代码&#xff0c;加载了预训练模型&#xff0c;进行分布式训练。 在训练过程中&#xff0c;出现了显卡的占用显存不均匀的问题。 如图所示&#xff0c;可以看到显卡2 占…

使用ALB将HTTP访问重定向至HTTPS

HTTPS是加密数据传输协议&#xff0c;安全性高。当企业进行HTTPS安全改造后&#xff0c;为了方便用户访问&#xff0c;可以使用ALB在用户无感知的情况下将HTTP访问重定向至HTTPS。 前提条件 您已创建ALB实例&#xff0c;并为该实例添加了HTTP监听和监听端口为443的HTTPS监听。…

力扣92.反转链表Ⅱ

题目描述 题目链接92. 反转链表 II 给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], left …

Java版-速通ETL工具中简单的DAG执行实现

DAG作用 在ETL工具中&#xff0c;一般使用DAG图来进行任务的配置&#xff0c;将任务配置在有向无环图中&#xff0c;执行时候从首层节点&#xff0c;依次往下&#xff0c;下层节点的执行依赖于父节点是否执行完毕的状态&#xff0c;当最后一层的节点执行完成之后&#xff0c;整…

Web安全深度剖析

1.Web安全简介 ​ 攻击者想要对计算机进行渗透&#xff0c;有一个条件是必须的&#xff0c;就是攻击者的计算机与服务器必须能够正常通信&#xff0c;服务器与客户端进行通信依靠的就是端口。 ​ 如今的web应该称之为web应用程序&#xff0c;功能强大&#xff0c;离不开四个要…

策略模式的理解和实践

在软件开发中&#xff0c;我们经常遇到需要在不同算法之间进行选择的情况。这些算法可能实现相同的功能&#xff0c;但使用不同的方法或逻辑。为了增强代码的可维护性和可扩展性&#xff0c;我们可以使用设计模式来优化这些算法的实现和管理。策略模式&#xff08;Strategy Pat…

在 Linux 环境下搭建 OpenLab Web 网站并实现 HTTPS 和访问控制

实验要求 综合练习&#xff1a;请给openlab搭建web网站 ​ 网站需求&#xff1a; ​ 1.基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab!!! ​ 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c…

Java开发利器:IDEA的安装与使用(下)

文章目录 8. 快捷键的使用8.1 常用快捷键8.2 查看快捷键8.3 自定义快捷键8.4 使用其它平台快捷键 9. IDEA断点调试(Debug)9.1 为什么需要Debug9.2 Debug的步骤9.3 多种Debug情况介绍9.3.1 行断点9.3.2 方法断点9.3.3 字段断点9.3.4 条件断点9.3.5 异常断点9.3.6 线程调试9.3.7 …

非对称任意进制转换器(安卓)

除了正常进制转换&#xff0c;还可以输入、输出使用不同的数字符号&#xff0c;达成对数值进行加密的效果 点我下载APK安装包 使用unity开发。新建一个c#代码文件&#xff0c;把代码覆盖进去&#xff0c;再把代码文件添加给main camera即可。 using System.Collections; usin…

神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型

(一) pytorch 官网内置的网络模型 图像处理&#xff1a; Models and pre-trained weights — Torchvision 0.20 documentation (二) CIFAR10数据集的分类网络模型&#xff08;仅前向传播&#xff09;&#xff1a; 下方的网络模型图片有误&#xff0c;已做修改&#xff0c;具…

linux 系列服务器 高并发下ulimit优化文档

系统输入 ulimit -a 结果如下 解除 Linux 系统的最大进程数 要解除或提高 Linux 系统的最大进程数&#xff0c;可以修改 ulimit 设置和 /etc/security/limits.conf 文件中的限制。 临时修改 ulimit 设置 可以使用 ulimit 命令来查看和修改当前会话的最大进程数&#xff1a; 查…

Elasticsearch数据迁移(快照)

1. 数据条件 一台原始es服务器&#xff08;192.168.xx.xx&#xff09;&#xff0c;数据迁移后的目标服务器&#xff08;10.2.xx.xx&#xff09;。 2台服务器所处环境&#xff1a; centos7操作系统&#xff0c; elasticsearch-7.3.0。 2. 为原始es服务器数据创建快照 修改elas…

基于 SpringBoot 构建校园失物招领智能平台:优化校园失物处理流程

4系统设计 4.1系统概要设计 本文通过B/S结构(Browser/Server,浏览器/服务器结构)开发的该校园失物招领系统&#xff0c;B/S结构的优点很多&#xff0c;例如&#xff1a;开发容易、强的共享性、便于维护等&#xff0c;只要有网络&#xff0c;用户可以随时随地进行使用。 系统工作…

图解SSL/TLS 建立加密通道的过程

众所周知&#xff0c;HTTPS 是 HTTP 安全版&#xff0c;HTTP 的数据以明文形式传输&#xff0c;而 HTTPS 使用 SSL/TLS 协议对数据进行加密&#xff0c;确保数据在传输过程中的安全。 那么&#xff0c;HTTPS 是如何做到数据加密的呢&#xff1f;这就需要了解 SSL/TLS 协议了。 …

HTTP协议图--HTTP 工作过程

HTTP请求响应模型 HTTP通信机制是在一次完整的 HTTP 通信过程中&#xff0c;客户端与服务器之间将完成下列7个步骤&#xff1a; 建立 TCP 连接 在HTTP工作开始之前&#xff0c;客户端首先要通过网络与服务器建立连接&#xff0c;该连接是通过 TCP 来完成的&#xff0c;该协议…

BurpSuite工具-Proxy代理用法(抓包、改包、放包)

一、Burp Suite 项目管理 二、Proxy&#xff08;代理抓包模块&#xff09; 1. 简要说明 1.1. Intercept&#xff08;拦截&#xff09; 1.2. HTTP History&#xff08;HTTP 历史&#xff09; 1.3. WebSockets History&#xff08;WebSocket 历史&#xff09; 1.4. Options…

前端测试框架 jasmine 的使用

最近的项目在使用AngulaJs,对JS代码的测试问题就摆在了面前。通过对比我们选择了 Karma jasmine ,使用 Jasmine做单元测试 &#xff0c;Karma 自动化完成&#xff0c;当然了如果使用 Karma jasmine 前提是必须安装 Nodejs。 安装好 Nodejs &#xff0c;使用 npm 安装好必要…

Blender均匀放缩模型

解决办法&#xff1a; 首先选中模型&#xff0c;按下“s”键&#xff0c;如下图所示&#xff0c;此时模型根据鼠标的移动放缩 或者在按下“s”后输入数值&#xff0c;再按回车键Enter&#xff0c;模型会根据你该数值进行均匀放缩 指定放大2倍结果——