Debezium-MySqlConnectorTask

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

MySqlConnectorTask,用于读取MySQL的二进制日志并生成对应的数据变更事件

整体架构流程

技术名词解释

数据库模式(Database Schema)
数据库模式是指数据库中数据的组织结构和定义,它描述了数据库中所有对象(如表、视图、索引、存储过程等)的结构和关系。具体来说,数据库模式包括以下几个方面:
1  表结构:定义了数据库中各个表的名称、列的名称、数据类型、约束条件(如主键、外键、唯一性约束等)。
2  关系:描述了表与表之间的关系,如一对多、多对多等。
3  索引:定义了表上的索引,用于提高查询性能。
4  视图:定义了虚拟表,这些虚拟表基于SQL查询结果,可以简化复杂的查询操作。
5  存储过程和函数:定义了数据库中的存储过程和函数,用于执行特定的业务逻辑。
6  触发器:定义了在特定事件发生时自动执行的操作。

在 DatabaseHistory 接口中的应用
在 DatabaseHistory 接口中,数据库模式的变更记录和恢复功能主要用于以下场景:
    1  记录变更:当数据库模式发生变化时(如添加新表、修改表结构、删除表等),通过 record 方法记录这些变更。
    2  恢复:当需要恢复到某个历史点的数据库模式时,通过 recover 方法恢复到指定的历史状态。
通过这些功能,可以有效地管理和追踪数据库模式的变化,确保数据的一致性和完整性。

技术细节

@Overridepublic void start(Map<String, String> props) {if (context == null) {throw new ConnectException("Unexpected null context");}// Validate the configuration ...final Configuration config = Configuration.from(props);if (!config.validate(MySqlConnectorConfig.ALL_FIELDS, logger::error)) {throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");}// Create and configure the database history ...this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);if (this.dbHistory == null) {throw new ConnectException("Unable to instantiate the database history class " +config.getString(MySqlConnectorConfig.DATABASE_HISTORY));}Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); // do not remove// prefixthis.dbHistory.configure(dbHistoryConfig); // validatesthis.dbHistory.start();this.running.set(true);// Read the configuration ...final String user = config.getString(MySqlConnectorConfig.USER);final String password = config.getString(MySqlConnectorConfig.PASSWORD);final String host = config.getString(MySqlConnectorConfig.HOSTNAME);final int port = config.getInteger(MySqlConnectorConfig.PORT);final String initialBinLogFilename = config.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);final long serverId = config.getLong(MySqlConnectorConfig.SERVER_ID);serverName = config.getString(MySqlConnectorConfig.SERVER_NAME.name(), host + ":" + port);final boolean keepAlive = config.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);final int maxQueueSize = config.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);final long timeoutInMilliseconds = config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);final boolean includeSchemaChanges = config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);final long pollIntervalMs = config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);maxBatchSize = config.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);// Define the filter using the whitelists and blacklists for tables and database names ...Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),config.getString(MySqlConnectorConfig.TABLE_WHITELIST),config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {Predicate<TableId> isBuiltin = (id) -> {return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());};tableFilter = tableFilter.and(isBuiltin.negate());}// Create the queue ...events = new LinkedBlockingDeque<>(maxQueueSize);batchEvents = new ArrayDeque<>(maxBatchSize);// Set up our handlers for specific kinds of events ...tables = new Tables();tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);// Set up the log reader ...client = new BinaryLogClient(host, port, user, password);client.setServerId(serverId);client.setKeepAlive(keepAlive);if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);client.registerEventListener(this::enqueue);client.registerLifecycleListener(traceLifecycleListener());// Set up the event deserializer with additional types ...EventDeserializer eventDeserializer = new EventDeserializer();eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());client.setEventDeserializer(eventDeserializer);// Check if we've already processed some of the log for this database ...source.setServerName(serverName);// Get the offsets for our partition ...Map<String, ?> offsets = context.offsetStorageReader().offset(source.partition());if (offsets != null) {source.setOffset(offsets);// And set the client to start from that point ...client.setBinlogFilename(source.binlogFilename());client.setBinlogPosition(source.binlogPosition());// The event row number will be used when processing the first event ...logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}",serverName, source.binlogFilename(), source.binlogPosition(), source.eventRowNumber());// We have to make our Tables reflect the state of the database at the above source partition (e.g., the location// in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the// TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements// to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string// of DDL. However, we should consume no further than offset we recovered above.try {logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);DdlParser ddlParser = new MySqlDdlParser();dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);tableConverters.loadTables();logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));} catch (Throwable t) {throw new ConnectException("Failure while recovering database schemas", t);}} else {// initializes this position, though it will be reset when we see the first event (should be a rotate event) ...client.setBinlogFilename(initialBinLogFilename);logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",source.binlogFilename(), source.binlogPosition());}// Start the log reader, which starts background threads ...try {logger.debug("Connecting to MySQL server");client.connect(timeoutInMilliseconds);logger.debug("Successfully connected to MySQL server and beginning to read binlog");} catch (TimeoutException e) {double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host+ ":" + port + " with user '" + user + "'", e);} catch (AuthenticationException e) {throw new ConnectException("Failed to authenticate to the MySQL database at " + host + ":" + port + " with user '" + user + "'",e);} catch (Throwable e) {throw new ConnectException("Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);}}

 

  1. 验证配置:从传入的属性中创建配置对象并验证其有效性。
  2. 创建数据库历史记录:根据配置实例化 DatabaseHistory 对象并启动。
  3. 读取配置参数:从配置中读取各种必要的参数,如用户名、密码、主机、端口等。
  4. 定义表过滤器:根据白名单和黑名单定义表过滤器,忽略内置表。
  5. 创建队列:初始化事件队列和批处理队列。
  6. 设置事件处理器:为不同的事件类型设置处理器。
  7. 设置日志读取器:创建并配置 BinaryLogClient,注册事件监听器和生命周期监听器。
  8. 设置事件反序列化器:配置事件反序列化器以处理特定类型的事件。
  9. 恢复数据库状态:检查是否有已处理的日志,如果有则恢复数据库模式。
  10. 连接到 MySQL 服务器:尝试连接到 MySQL 服务器并开始读取二进制日志。

小结

/**
 * 该类负责配置和初始化MySQL连接器,包括设置数据库和表的过滤条件、创建事件队列、注册事件处理器、设置二进制日志客户端、恢复数据库模式等。
 * 主要功能包括:
 * - 应用数据库和表的黑白名单过滤条件。
 * - 配置是否忽略内置表。
 * - 创建事件队列和批处理事件队列。
 * - 注册不同类型的事件处理器。
 * - 初始化二进制日志客户端并设置相关参数。
 * - 检查并恢复已处理的日志位置。
 * - 连接到MySQL服务器并开始读取二进制日志。
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/19570.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux脚本练习

通过shell脚本分析部署nginx网络服务 1.接收用户部署的服务名称 2.判断服务是否安装 ​ 已安装&#xff1b;自定义网站配置路径为/www&#xff1b;并创建共享目录和网页文件&#xff1b;重启服务 ​ 没有安装&#xff1b;安装对应的软件包 3.测试 判断服务是否成功运…

Windows系统编程 - 进程间通信

文章目录 前言概述发送消息WM_COPYDATADLL共享段文件映射文件映射步骤相关API讲解文件映射 进程间的通信&#xff08;有文件版本&#xff09;文件映射 进程间的通信&#xff08;匿名版本&#xff09; 管道相关API讲解父子之间的匿名进程通信GetStdHandleSTARTUPINFO指定句柄测试…

基于yolov8、yolov5的植物类别识别系统(含UI界面、训练好的模型、Python代码、数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下&#xff1a; 算法模型&#xff1a;     yolov8、yolov8 SE注意力机制 或 yolov5、yolov5 SE注意力机制 &#xff0c; 直接提供最少两个训练好的模型。模型十分重要&#xff0c;因为有些同学的电脑没有 GPU&#xff0…

1+X应急响应(网络)系统信息收集分析:

系统信息收集分析&#xff1a; 系统启动项和计划任务分析&#xff1a; 系统进程&#xff0c;服务分析&#xff1a; 内存取证&#xff1a; 系统崩溃转储&#xff1a;

智慧环保平台_大数据平台_综合管理平台_信息化云平台

系统原理   智慧环保是新一代信息技术变革的产物&#xff0c;是信息资源日益成为重要生产要素和信息化向更高阶段发展的表现&#xff0c;是经济社会发展的新引擎。   现今&#xff0c;环保信息化建设进入高速发展阶段。在此轮由物联网掀起的信息浪潮下&#xff0c;环境信息…

如何通过电脑监控软件远程监控一台电脑的所有屏幕画面记录

7-1 本教程介绍一个简单的工具&#xff0c;可以安装在电脑中&#xff0c;按设置的时间间隔&#xff0c;自动对屏幕截图保存&#xff0c;并且可以在有网络的其它电脑上远程提取截图文件。 该软件用于自动记录电脑的屏幕画面内容和变化&#xff0c;如果你有这方面的使用场景&am…

深度解读混合专家模型(MoE):算法、演变与原理

假设一个专家团队共同解决复杂问题。每位专家都拥有独特的技能&#xff0c;团队通过高效分配任务实现了前所未有的成功。这就是混合专家&#xff08;Mixture-of-Experts&#xff0c;MoE&#xff09;模型架构背后的基本思想&#xff0c;这种方法允许机器学习系统&#xff0c;特别…

电商微服务项目第一天(品牌管理)

1.BaseTrademarkController&#xff08;品牌管理CRUD&#xff09; /*** 添加品牌* param baseTrademark* return*/PostMapping("baseTrademark/save")public Result<BaseTrademark> save(RequestBody BaseTrademark baseTrademark){baseTrademarkService.save(…

初探Ranking系统的离在线满意度评估

【引子】在上周发布了《大模型应用系列&#xff1a;从Ranking到Reranking》之后&#xff0c; 有AI 产品经理问我&#xff0c;如何评估Ranking 系统的性能呢&#xff1f; 再进一步&#xff0c;如何评估RAG系统的性能呢&#xff1f; 老码农整理了一下在搜索引擎方面的感受&#x…

初识C++ (五)

没事干就学习 auto关键字 auto是C程序设计语言的关键字。自C11以来&#xff0c;auto关键字用于两种情况&#xff1a;声明变量时根据初始化表达式自动推断该变量的类型、声明函数时函数返回值的占位符。C98标准中auto关键字用于自动变量的声明&#xff0c;但由于使用极少且多余…

shell脚本判断nginx安装和运行

shell脚本判断nginx安装和运行 脚本内容&#xff1a; 传入服务名称&#xff1a; read -p "请输入要判断的程序名称:" service_name 查看服务进程&#xff1a; countps -aux | grep -cw $service_name 判断nginx是否安装&#xff08;系统中是否有nginx命令&#xff…

电脑msvcr100.dll丢失的解决方法,详细介绍多个解决方法

由于系统中关键文件msvcr100.dll的缺失&#xff0c;用户可能会遭遇一系列始料未及的困扰与问题。msvcr100.dll是Microsoft Visual C运行库中的一个核心动态链接库文件&#xff0c;对于许多应用程序的正常运行至关重要。当这个特定的dll文件丢失时&#xff0c;可能会导致部分软件…

Windows安装vcpkg教程(VS2022)

内容摘要&#xff1a; 本文详细介绍如何在Windows系统上使用 Git 克隆 vcpkg 仓库来安装vcpkg工具&#xff0c;并链接Visual Studio 2022。 目录 一、关于vcpkg 二、开发环境 三、安装Git 四、使用 Git 克隆 vcpkg 仓库 一、关于vcpkg vcpkg 是一个开源的 C 包管理工具&am…

TypeScript泛型基础知识

1.1 泛型 泛型是可以在保证类型安全的前提下&#xff0c;让函数等与多种类型一起工作&#xff0c;从而实现复用&#xff0c;常用于&#xff1a;函数、接口、class中。 需求&#xff1a;创建一个id函数&#xff0c;传入什么数据就返回该数据本身&#xff08;也就是说&#xff0c…

多线程的安全问题

什么是线程安全问题&#xff1f; 多个线程&#xff0c;访问同一资源&#xff0c;出现了问题&#xff0c;就是线程安全问题&#xff08;数据不准确&#xff0c;或者直接报错&#xff09; 1&#xff09;错误演示&#xff1a; public class Demo04 {static int tickedNum 100;/…

推荐一套相片复原工具:Focus Magic

Focus Magic是一套相片复原工具&#xff0c;能够帮助你修补及强化那些模糊不清楚的影像。其他那些锐利化工具只能够修补那些只有一点模糊的相片&#xff0c;但有了Focus Magic你就可以把那些根本完全没对准焦距的相片重新对准焦距。程序还可以以插件的形式作为其他图形处理工具…

C++:类的继承

在C中&#xff0c;类的继承是一种面向对象编程&#xff08;OOP&#xff09;的重要特性&#xff0c;它允许一个类&#xff08;子类或派生类&#xff09;继承另一个类&#xff08;基类或父类&#xff09;的属性和方法。通过继承&#xff0c;可以重用现有的代码&#xff0c;减少重…

详细解读Gson 、Jackson 、FastJson 三大json序列化工具

一 gson Google提供的用来java对象和JSON数据之间进行映射的JAVA类库 优缺点 优点&#xff1a;快速、高效&#xff0c; 依赖少只有一个jar包&#xff0c;面向对象&#xff0c;数据传输解析方便 缺点&#xff1a;速度较慢 mvn依赖 <dependency><groupId>com.go…

版块控制---git

引入 设想&#xff0c;当我们写论文时&#xff0c;对第一版不够满意时&#xff0c;想做出修改但是又怕修改时回毁掉整个论文版本&#xff0c;所以我们通常会进行备份&#xff0c;以防止数据被修改后的崩毁&#xff0c;版块控制就是这个合理创建管理备份的过程&#xff0c;而且这…

BLE 蓝牙客户端和服务器连接

蓝牙通信在设计小型智能设备时非常普遍&#xff0c;之前一直没有使用过&#xff0c;最近使用ardunio ESP32 做了一些实验&#xff0c;做了一个收听播客的智能旋钮&#xff08;Smart Knob&#xff09;&#xff0c;它带有一个旋转编码器和两个按键。 本文介绍BLE 服务器Server和W…