Canal 扩展篇(阿里开源用于数据同步备份,监控表和表字段(日志))

1.Canal介绍

Canal把自己伪装成从数据库,获取mysql主数据库的日志(binlog)信息,所以要想使用canal就得先开启数据库日志

https://github.com/alibaba/canal

Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,工作原理如下:

Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )

Canal 解析 binary log 对象(原始为 byte 流)

可以用于以下业务场景:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

2.原理

1. 准备MySql主库(开启Mysql日志)

MySql binlog介绍

1.在服务器新建文件夹 mysql/data,新建文件 mysql/conf.d/my.cnf

其中my.cnf 内容如下

[mysqld]
log_timestamps=SYSTEM
default-time-zone='+8:00'server-id=1log-bin=mysql-binbinlog-do-db = mall # 要监听的库binlog_format=ROW

配置解读:

server-id :指定当前服务的id,必须指定,否则会报错

log-bin :设置binlog文件的存放地址和文件名,叫做mysql-bin,此处指定的目录前缀是mysql容器的数据存放目录,所以可以在挂载目录中看到相关的文件,命名格式是mysql-bin.000001开始慢慢叠加

binlog-do-db :指定针对哪个数据库记录binlog的events事件,此处记录mall库

如果在 MySQL 配置中没有显式地配置 binlog-do-db 参数,那么 MySQL 的二进制日志(binlog)将会记录所有数据库的变更操作。

binlog-do-db 参数用于指定需要被记录到 binlog 中的数据库。通过配置 binlog-do-db,你可以选择只记录指定的数据库的变更操作,而忽略其他数据库的变更操作。

如果没有配置 binlog-do-db 参数,或者将其设置为一个空值(例如 binlog-do-db=),那么 MySQL 将会监听并记录所有数据库的变更操作到 binlog 中。

4. binlog_format

在 MySQL 配置中,binlog_format 是用于配置二进制日志(binlog)的格式。binlog 是 MySQL 中用于记录数据库的变更操作的日志文件。

binlog_format 可以设置为以下几种值:

1. STATEMENT:以 SQL 语句的形式记录数据库的变更操作。这种格式记录的是每个执行的 SQL 语句,可以通过 replay SQL 语句的方式来还原数据变更。

2. ROW:以行的形式记录数据库的变更操作。这种格式记录的是每一行数据的变更情况,包括被修改、插入或删除的数据。ROW 格式记录了更加详细和精确的变更信息,但相对于 STATEMENT 格式来说,会占用更多的存储空间。

3. MIXED:混合模式,根据具体的情况自动选择 STATEMENT 或 ROW 格式来记录数据库的变更操作。MIXED 模式会根据 SQL 语句的类型和特性来决定使用哪种格式,以达到性能和存储空间的平衡。

binlog_format 的选择需要根据实际需求和应用场景来决定。不同的格式具有不同的优缺点,需要根据具体情况来进行权衡和选择

2.启动数据库

docker run --name mysql01 \
-p 3306:3306 \
-v /opt/mysql/conf.d:/etc/mysql/conf.d \
-v /opt/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0

3.校验是否开启成功

show variables like 'log_%';show variables like 'binlog_format';
show variables like 'server_id';
--查看所有日志
show binlog events;
--查看最新的日志
show master status
-- 查询指定的binlog日志
show binlog events in 'XTZJ-20221008CY-bin.000020'
--清空所有的 binlog 日志文件reset master

2.DML与DDL

MySQL中,DML(Data Manipulation Language)和DDL(Data Definition Language)是两种不同类型的SQL语句,它们分别用于不同的数据库操作目的:

DML(数据操作语言): DML语句主要用于对数据库表中的实际数据进行操作,主要包括以下几种命令:

  1. INSERT:向表中插入新的行数据。

  2. UPDATE:更新表中已存在的行数据。

  3. DELETE:从表中删除满足特定条件的行数据。

  4. SELECT:从表中检索数据,虽然SELECT不改变数据本身,但因其属于对数据的操作,所以也被归类于DML。

DML操作通常发生在事务中,可以被用户手动控制事务的开启、提交和回滚,确保数据的一致性和完整性。

DDL(数据定义语言): DDL语句主要用于创建、修改或删除数据库的结构元素,例如:

  1. CREATE:创建新的数据库、表、索引、视图等。

  2. ALTER:更改现有数据库对象的结构,例如增加或删除列,修改列的数据类型,重命名表等。

  3. DROP:删除数据库对象,如表、索引、视图等。

  4. TRUNCATE:清空表的内容,但保留表的结构。

2. 安装canal

1.新建文件夹logs, 新建文件canal.properties instance.properties docker.compose.yml

如下图

registry.cn-beijing.aliyuncs.com/all100/canal-server:v1.1.5

docker-compose.yml

version: '3'
services:canal:container_name: canal_latestimage: canal/canal-server:v1.1.5restart: alwaysports:- 11111:11111volumes:                                - ./canal.properties:/home/admin/canal-server/conf/canal.properties- ./instance.properties:/home/admin/canal-server/conf/example/instance.properties- ./logs:/home/admin/canal-server/logs

instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=192.168.14.3:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

canal.properties

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =

修改canal.properties内容如下

修改instance.properties内容如下

2.启动容器

docker-compose up -d

3.查看结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1558954.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot2.x教程:(五)日志分割

日志分割 1、概述2、为什么选择Logback2.1、创建配置文件2.2、配置说明2.3、修改应用程序配置2.4、启动应用程序 3、总结 大家好,我是欧阳方超,可以扫描下方二维码关注我的公众号“欧阳方超”,后续内容将在公众号首发。 1、概述 在现代应用程…

ubuntu18.04系统中图形化界面

一、Ubuntu 18.04 中,使用 GDM 作为默认的图形用户界面(GUI)管理器。GDM 是 GNOME Display Manager 的缩写,它是用于 Ubuntu 的显示管理器,负责处理登录和会话管理。 通过命令行重启 Ubuntu 18.04 上的图形界面服务&am…

技术路线图用什么画?用这个在线工具轻松完成绘制!

在当今快速发展的技术世界中,技术路线图已成为企业和团队不可或缺的战略规划工具。它不仅能够清晰地展示技术发展方向,还能帮助团队成员、利益相关者和投资者更好地理解和参与技术战略的制定过程。但不可否认的是,创建一个有效的技术路线图并…

付费计量系统实体和接口(6)

13.7.3 Sub-classification of the Metering functions计量功能的子分级 The Metering function primarily deals with the measurement of the quantity of delivered electrical energy to the consumer. These measurements are made available for use by other functions …

qt 3D编程

Qt 3D是一个用于构建交互式3D图形应用的库,它是Qt库的一 部分。Qt 3D提供了一组C和QMLAPI,帮助开发者快速构 建3D应用程序。 一、核心模块 Qt3DCore 功能:提供3D场景中的基本概念,如实体(Entity)、组件&…

R语言运行地理探测器模型

地理探测器(GeoDetector)是一种用于空间分析的统计模型,它能够探测空间分异性以及揭示其背后驱动力的一组方法。它的核心思想是基于这样的假设:如果某个自变量对某个因变量有重要影响,那么自变量和因变量的空间分布应该…

java的LinkedList

java的LinkedList 什么是LinkedListLinkedList的模拟实现LinkedList的使用ArrayList和LinkedList的区别 什么是LinkedList LinkedList的官方文档 LinkedList的底层是双向链表结构,由于链表没有将元素存储在连续的空间中,元素存储在单独的结点中&#xf…

【Redis】Set类型的常用命令与应用场景

目录 1.命令小结 2.命令解析 3.编码方式与应用场景 1.命令小结 (1)set的特点 1)set中存放的数据也都是String类型 2)set集合中的元素是无须的 3)set集合中的元素是唯一的,不可重复 (2&a…

MySql 之 Binglog 复制

复制是一种将数据从一个 MySQL 数据库服务器异步复制到另一个的技术。使用 MySQL 复制选项,您可以复制所有数据库、选定的数据库甚至选定的表,具体取决于您的使用情况。 前提条件 确保在源服务器上启用了二进制日志记录。确保复制配置中的所有服务器都有…

uniapp——h5的控制台调试、h5调试

介绍 小程序在调试的时候可以打开调试模式,可以看到console.log的打印情况。 但是H5运行到手机上没有默认的调试的模式,但是可以人为手动加一个。 如何实现 1、main.js文件 import Vconsole from ‘vconsole’ /** 关闭正式环境打印日志&#xff…

Centos7.5 安装和配置jdk17

目录 一、下载JDK17包 二、将安装包放入服务器 三、解压jdk包到/usr/lib/jvm 四、修改JDK环境配置 1、打开配置文件 2、最后一行插入 3、立即生效 4、检查版本 一、下载JDK17包 访问网址:Java Downloads | Oraclehttps://www.oracle.com/java/technologies/downloads…

音频功放工作原理:【B类】

上一节我们讲了A类音频功放的工作原理,也知道了它的优缺点: A类功放优点:高增益,低失真,音质好 A类功放缺点:热量高,效率低,功率小 为了解决A类功放的缺点,业界又引入…

重学SpringBoot3-集成Redis(十)之实时统计和分析

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(十)之实时统计和分析 1. 实时统计和分析的常见场景2. 使用 Redis 数据结构进行实时统计3. 使用Redis String实现计数器…

原来机器学习那么简单——K近邻回归

引言: 在正文开始之前,首先给大家介绍一个不错的人工智能学习教程:https://www.captainbed.cn/bbs。其中包含了机器学习、深度学习、强化学习等系列教程,感兴趣的读者可以自行查阅。 一、什么是K近邻回归? K近邻回归&…

10.9QT对话框以及QT的事件机制处理

MouseMoveEvent(鼠标移动事件) widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);// 设置窗口为无边框,去掉标题栏等装饰this->setWi…

电脑缺失msvcr120.dll怎样修复,马上教你6种修复方法

在用电脑的时候,经常会碰到各种错误提示,比如“msvcr120.dll丢失”,导致的结果就是某些程序无法正常启动。那么,这个dll文件到底是啥,为什么会丢失,怎么解决呢?将通过这篇文章详细解释一下&…

智能优化算法-引力搜索优化算法(GSA)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 引力搜索优化算法 (Gravitational Search Algorithm, GSA) 是一种基于牛顿万有引力定律的元启发式优化算法,由Rashedi等人于2009年提出。GSA通过模拟天体之间的引力作用来搜索最优解,适用…

[ROS2]解决PyQt5和sip的各种报错问题 stderr: qt_gui_cpp

前言 编译ros环境的时候遇到了qt_gui_cpp各种编译问题,但是鉴于网上解决方法基本没有,故记录下来帮助后来者。整篇文章总结下来就是一句话:PyQt5和sip安装过程或安装版本有问题,需要重新安装。 问题与解决方法 如果PyQt5你是正…

P-Tuning v2:一种普遍有效的提示调整方法

人工智能咨询培训老师叶梓 转载标明出处 预训练语言模型通过微调(fine-tuning)来适应特定任务虽然效果显著,但存在训练成本高、参数存储量大等问题。为了解决这些问题,清华大学的研究者们提出了一种名为P-Tuning v2的提示调整&am…

colab+ngork本地访问多模态大模型

allenai/Molmo-7B-D-0924 1)colab准备环境,我这里用的是l4 2)安装对应的python库 !pip install transformers Pillow requests einops!pip install accelerate>0.26.0 bitsandbytes!pip install --no-deps accelerate bitsandbytes !p…