如何在 Java 中使用 Canal 同步 MySQL 数据到 Redis

文章目录

  • 一、引言
  • 二、工作原理
    • 1. MySQL主备复制原理
    • 2. canal 工作原理
  • 三、环境准备
    • 1. 安装和配置 MySQL
    • 2. 安装和配置 Canal
    • 3. 安装和配置 Redis
  • 四、开发 Java 应用
    • 1. 添加依赖
    • 2. 编写 Canal 客户端代码
    • 3. 运行和测试
      • 3.1 启动 Canal 服务:
      • 3.2 启动 Redis 服务:
      • 3.3 启动 Java 应用:
      • 3.4 测试数据同步:
  • 五、注意事项
  • 六、结论
  • 七、参考资料

一、引言

在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。
项目地址:alibaba/canal

二、工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2. canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、环境准备

1. 安装和配置 MySQL

Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重启 MySQL 服务以使配置生效:

sudo service mysql restart

2. 安装和配置 Canal

下载并解压 Canal 服务端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

启动 Canal 服务:

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。

3. 安装和配置 Redis

确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

四、开发 Java 应用

1. 添加依赖

在你的 pom.xml 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.1.5</version></dependency>
</dependencies>

2. 编写 Canal 客户端代码

创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:

package org.hbin.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;
import java.util.List;public class CanalToRedisSync {public static void main(String[] args) {// 创建 Canal 连接InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");// 连接到 Canal 服务connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 创建 Redis 客户端Jedis jedis = new Jedis("127.0.0.1", 6379);while (true) {Message message = connector.getWithoutAck(100); // 获取最多 100 条记录long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {handleEntry(message.getEntries(), jedis);}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}}private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}CanalEntry.EventType eventType = rowChange.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} else if (eventType == CanalEntry.EventType.INSERT) {syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} else {System.out.println("-------> before");syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());System.out.println("-------> after");syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());}}}}private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();StringBuilder value = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());} else {value.append(column.getName()).append(":").append(column.getValue()).append(",");}}System.out.println("Insert: " + key.toString() + " -> " + value.toString());jedis.hset(schema + ":" + table, key.toString(), value.toString());}private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();StringBuilder value = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());} else {value.append(column.getName()).append(":").append(column.getValue()).append(",");}}System.out.println("Update: " + key.toString() + " -> " + value.toString());jedis.hset(schema + ":" + table, key.toString(), value.toString());}private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());}}System.out.println("Delete: " + key.toString());jedis.hdel(schema + ":" + table, key.toString());}
}

3. 运行和测试

3.1 启动 Canal 服务:

sh /opt/canal/bin/startup.sh

3.2 启动 Redis 服务:

确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

3.3 启动 Java 应用:

编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。

3.4 测试数据同步:

在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。
相关SQL如下:

drop database if exists canal;
create database canal;
use canal;drop table if exists user;
create table user(`id` bigint AUTO_INCREMENT primary key,`name` varchar(20) NOT NULL,`age` tinyint DEFAULT 0,`detail` varchar(100) DEFAULT '',`create_time` date,`update_time` date
);insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

输出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事项

  • 性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。
  • 错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。
  • 安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。

六、结论

通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。希望本文对你有所帮助。

七、参考资料

  • canal QuickStart
  • canal ClientExample

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/7182.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Redis设计与实现 学习笔记 第十六章 Sentinel

Sentinel&#xff08;哨岗、哨兵&#xff09;是Redis的高可用性&#xff08;high availability&#xff09;解决方案&#xff1a;由一个或多个Sentinel实例&#xff08;instance&#xff09;组成的Sentinel系统可以监视任意多个主服务器&#xff0c;以及这些主服务器属下的从服…

I.MX6U 裸机开发3. GPIO操作控制LED灯

I.MX6U 裸机开发3. GPIO操作控制LED灯 一、创建项目目录及源文件1. 新建目录2. 远程开发环境3. 创建源文件 二、代码编写1. 打开时钟2. 配置端口复用功能为GPIO3. 配置端口电气属性4. 设置GPIO方向&#xff08;GDIR寄存器&#xff09;5. 输出6. 死循环等待 三、编译程序1. 整体…

雷军-2022.8小米创业思考-11-新零售:用电商思维做新零售,极致的效率+极致的体验。也有弯路,重回极致效率的轨道上。

第十一章 新零售 当我们说到小米模式的时候&#xff0c;其实我们说的是两件东西&#xff1a; 一是小米模式的本质&#xff0c;即高效率的商业模式&#xff1b; 另一件是小米这家公司具象的商业模式&#xff0c;这是小米在实践中摸索、建立的一整套业务模型。 从2015年到202…

Java:多态的调用

1.什么是多态 允许不同类的对象对同一消息做不同的响应。即同一消息可以根据发送对象的不同而采用多种不同的行为方式。&#xff08;发送消息就是函数调用&#xff09;。多态使用了一种动态绑定&#xff08;dynamic binding&#xff09;技术&#xff0c;指在执行期间判断所引用…

基于Python的学生宿舍管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

基于springboot+vue实现的网上预约挂号管理系统 (源码+L文+ppt)4-104

结合现有六和医院网上预约挂号管理系统的特点&#xff0c;应用新技术&#xff0c;构建了六和医院网上预约挂号管理系统。首先从需求出发&#xff0c;对目前传统的六和医院网上预约挂号管理进行了详细的了解和分析。根据需求分析结果&#xff0c;对系统进行了设计&#xff0c;并…

C++初阶(九)--初识模板

目录 引入 一、什么是模板 二、函数模板 1.函数模板的概念 2.函数模板的格式 template关键字 模板参数列表 3.函数模板的原理 4.函数模板的实例化 5.数模板的匹配原则 三、类模板 1.类模板的定义格式 2.类模板的实例化 引入 在编程的世界里&#xff0c;我们经常…

C语言 | Leetcode C语言题解之第537题复数乘法

题目&#xff1a; 题解&#xff1a; bool parseComplexNumber(const char * num, int * real, int * image) {char *token strtok(num, "");*real atoi(token);token strtok(NULL, "i");*image atoi(token);return true; };char * complexNumberMulti…

牛客网项目总结

下面这幅图是牛客网项目的架构图&#xff0c;最下层是Spring Boot&#xff0c;表示我们所有的技术都是基于Spring Boot&#xff0c;上面一层是Spring&#xff0c;Spring上面是Spring MVC、Spring MyBatis 和 Spring Security。 通过Spring MVC 解决前后端请求交互的问题&#…

想画一个沙漠掘金游戏地图

想画一个沙漠掘金游戏地图 沙漠掘金生成一个地图htmljs 沙漠掘金 沙漠掘金是一个企业培训课程游戏&#xff0c;规则大致是&#xff1a; 玩家从大本营出发&#xff0c;到达矿山掘金后返回&#xff0c;如果规定的天数未回来&#xff0c;则失败&#xff0c;如果回来&#xff0c;…

【Java爬虫的淘宝寻宝记】—— 淘宝商品类目的“藏宝图”

引言&#xff1a; 在淘宝这个广袤的“商品宇宙”中&#xff0c;每一件商品都是一颗璀璨的星球&#xff0c;而商品类目就是连接这些星球的星际航道。今天&#xff0c;我们将派遣一位勇敢的Java爬虫宇航员&#xff0c;去揭开这些星际航道背后的秘密——商品类目。准备好了吗&…

内网穿透-SSF内网穿透反向socks代理之渗透内网thinkphp主机上线msf

1 ssf 简介 Secure Socket Funneling socks正反向代理&#xff0c;linux版较好的免杀 1.1下载地址 https://github.com/securesocketfunneling/ssf 1.2下载编译好的执行文件 https://github.com/securesocketfunneling/ssf/releases/tag/3.0.0 2.环境 kali 攻击机 网卡 桥…

【HarmonyOS】键盘遮挡输入框UI布局处理

【HarmonyOS】键盘遮挡输入框UI布局处理 问题背景&#xff1a; 在开发输入框UI时&#xff0c;特别是登录页面的密码输入框靠下&#xff0c;或者是评论底部的pop弹框。 当我们输入框获得焦点后&#xff0c;键盘自下而上显示&#xff0c;一般情况下会遮挡住我们的UI布局。 导致…

ssm基于BS的仓库在线管理系统的设计与实现+vue

系统包含&#xff1a;源码论文 所用技术&#xff1a;SpringBootVueSSMMybatisMysql 免费提供给大家参考或者学习&#xff0c;获取源码看文章最下面 需要定制看文章最下面 目 录 第一章 绪论 1 1.1 研究背景 1 1.2 研究意义 1 1.3 研究内容 2 第二章 开发环境与技术3 …

QML项目实战:自定义TextField

目录 一.添加模块 import QtQuick.Controls 1.2 import QtQuick.Controls.Styles 1.4 import QtGraphicalEffects 1.15 二.自定义TextField 1.属性设置 2.输入框设置 3.按钮开关 三.效果 1.readonly为false 2.readonly为true 四.代码 一.添加模块 import QtQuick.…

C++ | Leetcode C++题解之第523题连续的子数组和

题目&#xff1a; 题解&#xff1a; class Solution { public:bool checkSubarraySum(vector<int>& nums, int k) {int m nums.size();if (m < 2) {return false;}unordered_map<int, int> mp;mp[0] -1;int remainder 0;for (int i 0; i < m; i) {r…

AFL++实战入门与afl-fuzz流程解析(源码流程图)

简介 本项目为模糊测试的零基础教学,适合了解 pwn 且会使用 Linux 的 gcc、gdb 的读者。模糊测试旨在通过向程序投喂数据使其崩溃,从而获取崩溃样本以寻找程序漏洞。本文前半部分介绍 AFL++ 的 docker 环境配置,帮助读者解决入门时的环境和网络问题; 后半部分全面解析 afl…

Java 网络编程(一)—— UDP数据报套接字编程

概念 在网络编程中主要的对象有两个&#xff1a;客户端和服务器。客户端是提供请求的&#xff0c;归用户使用&#xff0c;发送的请求会被服务器接收&#xff0c;服务器根据请求做出响应&#xff0c;然后再将响应的数据包返回给客户端。 作为程序员&#xff0c;我们主要关心应…

用Python脚本执行安卓打包任务

这个样例是基于windows系统写的python打包安卓的脚本&#xff1a; 一、配置AndroidStudio下的打包任务 1.在Android项目根目录下的build.gradle文件配置生成Release包的任务&#xff1a; task cleanAll(type: Delete) {delete rootProject.buildDirrootProject.subprojects.e…

C++学习笔记----9、发现继承的技巧(六)---- 有趣且令人迷惑的继承问题(7)

6、非公共继承 在前面所有的例子中&#xff0c;父类总是用public关键字列出。你可能会想是否父类也可以是private或protected。实际上&#xff0c;是可以的&#xff0c;但都不像public一样常见。如果不对父类进行访问指示符的指定&#xff0c;对于类来讲就是private继承&#x…