当前位置: 首页 > news >正文

kafka jdbc connector适配kadb数据实时同步

  • 测试结论

源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:

bulk: 一次同步整个表的数据

incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除

timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增

timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能

  • 环境说明

本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系统:centos 7.6
  4. jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驱动:mysql-connector-java-5.1.39-bin.jar
  6. kadb驱动:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector参考资料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector参考资料

https://kafka.apache.org/documentation/

  • 环境部署
  1. kafka部署

解压

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

产生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root   43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root  108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root   90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目标端jdbc驱动

将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root  989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 连接器配置文件:connect-standalone.properties

添加插件路径参数:(绝对路径)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source                

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector    //固定值,使用jdbc connector的类

# topic名称列表,源端和目标端的topic必须一致

topics=test

# 配置jdbc连接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量获取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 为当前connector创建的最大线程数

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置

# topic名称列表

topics=test

# 配置jdbc连接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自动创建表

auto.create=true

# 写入模式

insert.mode=insert

  1. 启动connect

bin/connect-standalone.sh

config/connect-standalone.properties                //connect配置参数

config/connect-mysql-source.properties    //源端配置参数

config/connect-kadb-sink.properties            //目标端参数

  1. 测试
  1. mysql源端创建表,目标端会自动创建对应的表

mysql> desc test

    -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type        | Null | Key | Default | Extra          |

+-------+-------------+------+-----+---------+----------------+

| a     | int(11)     | NO   | PRI | NULL    | auto_increment |    //使用increment ing方式,必须是自增列

| b     | varchar(10) | YES  |     | NULL    |                |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入数据

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目标端数据

1 | aaa

 2 | bbb

 3 | ccc

 4 | ddd

 5 | dddd

(844 rows)

test_sink=#

  1. 源端数据

mysql> select * from test;

+---+------+

| a | b    |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令参考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

http://www.xdnf.cn/news/10963.html

相关文章:

  • Spring Boot 核心注解全解:@SpringBootApplication背后的三剑客
  • 力扣每日打卡 2364. 统计坏数对的数目 (中等)
  • HTTP测试智能化升级:动态变量管理实战与效能跃迁
  • Spring开发系列教程(20)——Spring MVC
  • Flutter 自定义插件基础
  • 【unity实战】Animator启用root motion根运动动画,实现完美的动画动作匹配
  • 精准检测新选择:国产OLI-P偏振串扰分析仪正式发布
  • PHP连接MYSQL数据库
  • easyExcel单元格合并
  • React 受控表单绑定基础
  • 下载electron 22.3.27 源码错误集锦
  • 【我的创作纪念日】回望初心,分享收获,展望前行
  • <C#>.NET WebAPI 的 FromBody ,FromForm ,FromServices等详细解释
  • vscode中markdown一些插件用不了解决方式
  • 1187. 【动态规划】竞赛总分
  • ctfshow-大赛原题-web702
  • JAVA Web_定义Servlet_处理POST请求【练习】
  • 如何校验一个字符串是否是可以正确序列化的JSON字符串呢?
  • 2025-04-19 Python 强类型编程
  • 华为OD机试真题——最长的顺子(2025A卷:100分)Java/python/JavaScript/C++/C语言/GO六种最佳实现
  • 6.数据手册解读—运算放大器(二)
  • 航电系统通信与数据链技术分析
  • L1-7 矩阵列平移
  • 【Win】 cmd 执行curl命令时,输出 ‘命令管道位置 1 的 cmdlet Invoke-WebRequest 请为以下参数提供值: Uri: ’ ?
  • 使用手机归属地查询API,使效率事半功倍
  • MATLAB 控制系统设计与仿真 - 36
  • Java Web 之 Servlet 100问
  • Spring-Ioc容器的加载过程?
  • 分享传统制造业AI大模型优化升级解决方案
  • ​​从Shell到域控:内网渗透中定位域控制器的8种核心方法​