flink cdc 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>cdc</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><version.debezium>1.9.8.Final</version.debezium><flink.version>1.19.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.9.8.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version></dependency><!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><exclusions><exclusion><artifactId>spring-boot-starter-logging</artifactId><groupId>org.springframework.boot</groupId></exclusion></exclusions></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-slf4j-impl</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-jul</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion><exclusion><artifactId>log4j-core</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments><fork>true</fork></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><includes><include>**/*</include></includes></resource></resources></build></project>
一定要和mysql的时区ID 一致
public class Application {public static void main(String[] args) {TimeZone.setDefault(TimeZone.getTimeZone("UTC"));System.setProperty("log4j2.contextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");System.setProperty("com.alibaba.nacos.client.naming.tls.enable", "true");SpringApplication.run(Application.class, args);}
}
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class CustomSink extends RichSinkFunction<String> {private ObjectMapper mapper = new ObjectMapper();@Overridepublic void invoke(String value, Context context) throws Exception {System.out.printf("数据发生变化: %s%n", value);TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {};Map<String, Object> result = mapper.readValue(value, valueType);Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op") ;// 不对读操作处理if (!"r".equals(op)) {MonitorMySQLCDC.queue.put(result);}}
}
import DebeziumProperties;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;@Component
public class MonitorMySQLCDC implements InitializingBean {// 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>();private final StringRedisTemplate stringRedisTemplate;// 保存到redis中key的前缀private final String PREFIX = "users:";// 数据发生变化后的sink处理private final CustomSink customSink;@Resourceprivate DebeziumProperties debeziumProperties;public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {this.customSink = customSink;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void afterPropertiesSet() throws Exception {// 启动异步线程,实时处理队列中的数据new Thread(() -> {while (true) {try {Map<String, Object> result = queue.take();this.doAction(result);} catch (Exception e) {e.printStackTrace();}}}).start();DebeziumProperties.Datasource datasource = debeziumProperties.getDatasource();Properties jdbcProperties = new Properties();jdbcProperties.setProperty("useSSL", "false");MySqlSource<String> source = MySqlSource.<String>builder().hostname(datasource.getHostname()).port(datasource.getPort()).serverTimeZone("UTC")// 可配置多个数据库.databaseList("energy_pay_server")// 可配置多个表.tableList("energy_pay_server.tb_request_log").username(datasource.getUser()).password(datasource.getPassword()).jdbcProperties(jdbcProperties)// 包括schema的改变.includeSchemaChanges(true)// 反序列化设置// .deserializer(new StringDebeziumDeserializationSchema()).deserializer(new JsonDebeziumDeserializationSchema(true))// 启动模式;关于启动模式下面详细介绍/*** initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。** earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取** latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。** specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。** timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。*/.startupOptions(StartupOptions.initial()).build();// 环境配置Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;// 设置 6s 的 checkpoint 间隔env.enableCheckpointing(6000);// 设置 source 节点的并行度为 4env.setParallelism(4);env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")// 添加Sink.addSink(this.customSink);env.execute();}@SuppressWarnings("unchecked")private void doAction(Map<String, Object> result) throws Exception {Map<String, Object> payload = (Map<String, Object>) result.get("payload");String op = (String) payload.get("op");switch (op) {// 更新和插入操作case "u":case "c":Map<String, Object> after = (Map<String, Object>) payload.get("after");String id = after.get("id").toString();System.out.printf("操作:%s, ID: %s%n", op, id);stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after));break;// 删除操作case "d":Map<String, Object> after1 = (Map<String, Object>) payload.get("before");String id1 = after1.get("id").toString();stringRedisTemplate.delete(PREFIX + id1);}}}