当前位置: 首页 > news >正文

flink cdc 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>cdc</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><version.debezium>1.9.8.Final</version.debezium><flink.version>1.19.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.9.8.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><exclusions><exclusion><artifactId>spring-boot-starter-logging</artifactId><groupId>org.springframework.boot</groupId></exclusion></exclusions></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-slf4j-impl</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-jul</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-core</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments><fork>true</fork></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><includes><include>**/*</include></includes></resource></resources></build></project>

一定要和mysql的时区ID 一致

public class Application {public static void main(String[] args) {TimeZone.setDefault(TimeZone.getTimeZone("UTC"));System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");System.setProperty("com.alibaba.nacos.client.naming.tls.enable", "true");SpringApplication.run(Application.class, args);}
}
 import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class CustomSink extends RichSinkFunction<String> {private ObjectMapper mapper = new ObjectMapper();@Overridepublic void invoke(String value, Context context) throws Exception {System.out.printf("数据发生变化: %s%n", value);TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {};Map<String, Object> result = mapper.readValue(value, valueType);Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op") ;// 不对读操作处理if (!"r".equals(op)) {MonitorMySQLCDC.queue.put(result);}}
}

 import DebeziumProperties;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;@Component
public class MonitorMySQLCDC implements InitializingBean {// 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>();private final StringRedisTemplate stringRedisTemplate;// 保存到redis中key的前缀private final String PREFIX = "users:";// 数据发生变化后的sink处理private final CustomSink customSink;@Resourceprivate DebeziumProperties debeziumProperties;public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {this.customSink = customSink;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void afterPropertiesSet() throws Exception {// 启动异步线程,实时处理队列中的数据new Thread(() -> {while (true) {try {Map<String, Object> result = queue.take();this.doAction(result);} catch (Exception e) {e.printStackTrace();}}}).start();DebeziumProperties.Datasource datasource = debeziumProperties.getDatasource();Properties jdbcProperties = new Properties();jdbcProperties.setProperty("useSSL", "false");MySqlSource<String> source = MySqlSource.<String>builder().hostname(datasource.getHostname()).port(datasource.getPort()).serverTimeZone("UTC")// 可配置多个数据库.databaseList("energy_pay_server")// 可配置多个表.tableList("energy_pay_server.tb_request_log").username(datasource.getUser()).password(datasource.getPassword()).jdbcProperties(jdbcProperties)// 包括schema的改变.includeSchemaChanges(true)// 反序列化设置// .deserializer(new StringDebeziumDeserializationSchema()).deserializer(new JsonDebeziumDeserializationSchema(true))// 启动模式;关于启动模式下面详细介绍/*** initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。** earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取** latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。** specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。** timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。*/.startupOptions(StartupOptions.initial()).build();// 环境配置Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;// 设置 6s 的 checkpoint 间隔env.enableCheckpointing(6000);// 设置 source 节点的并行度为 4env.setParallelism(4);env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")// 添加Sink.addSink(this.customSink);env.execute();}@SuppressWarnings("unchecked")private void doAction(Map<String, Object> result) throws Exception {Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op");switch (op) {// 更新和插入操作case "u":case "c":Map<String, Object> after = (Map<String, Object>) payload.get("after");String id = after.get("id").toString();System.out.printf("操作:%s, ID: %s%n", op, id);stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after));break;// 删除操作case "d":Map<String, Object> after1 = (Map<String, Object>) payload.get("before");String id1 = after1.get("id").toString();stringRedisTemplate.delete(PREFIX + id1);}}}

http://www.xdnf.cn/news/206839.html

相关文章:

  • 1.5 城镇道路工程安全质量控制
  • Go 1.25为什么要废除核心类型
  • Educational Codeforces Round 178 div2(题解ABCDE)
  • 简化excel校验提高开发效率
  • 精益数据分析(31/126):电商关键指标深度解析与实战策略
  • 51LA使用方法与悟空统计,网站数据分析的双重选择
  • Twitter 工作原理|架构解析|社交APP逻辑
  • 微信小程序封装选择年月日时分秒组件
  • “兴火·燎原”总冠军诞生,云宏信息《金融高算力轻量云平台》登顶
  • uni-app 中封装全局音频播放器
  • 无人机航拍牛只检测数据集VOC+YOLO格式906张1类别
  • Codigger Desktop:重新定义数字工作与生活方式
  • 8.idea创建maven项目(使用Log4j日志记录框架+Log4j 介绍)
  • 如何解决 Xcode 签名证书和 Provisioning Profile 过期问题
  • Linux系统基础:基础指令简介(网络概念部分)
  • AtCoder Beginner Contest 403(题解ABCDEF)
  • PLOT: PROMPT LEARNING WITH OPTIMAL TRANSPORT FOR VISION -LANGUAGE MODELS
  • Vue使用Sortablejs拖拽排序 视图显示与数据不一致、拖拽结束后回跳问题
  • 4.27搭建用户界面
  • PostgreSQL数据库批量删除唯一索引
  • 【AI】OrinNX上安装RIVA-2.19.0,实现文本转语音
  • Spring Boot 集成 ActiveMQ 实现异步消息通信(二)
  • Sce2DriveX: 用于场景-到-驾驶学习的通用 MLLM 框架——论文阅读
  • Tauri 跨平台开发指南及实战:用前端技术征服桌面应用(合集-万字长文)
  • 前端安全中的XSS(跨站脚本攻击)
  • 【3dmax笔记】010: 创建标准基本体、扩展基本体
  • Liunx安装Apache Tomcat
  • 阿里云服务迁移实战: 04-IP 迁移
  • Python 环境管理工具使用差别比对文档
  • 扣子智能体2:优化提示词