canal 自定义客户端 优雅实现 (3)

1、为啥要有数据同步?

比如要做一些推荐或者其他与业务不强依赖的业务,这个时候,又不想直接去业务库取数据或者查数据进行计算,但是又需要业务库的某些数据: 比如用户行为。。。等

2、有很多数据同步,为啥要定制?

如果只抓取部分表的部分数据,这个时候,定制同步服务,肯定是有必要的。

目前有很多中间件 DBSyncer , Canal 等可以选择;

3、介绍下方案

3.1 部署canal deployer ,运行它,相当于:一个自助监听读取 mysql bin log的服务,并可以把监听到的数据,生产到 MQ中

3.2 MQ 中间件,部署和启动。承载Canal deployer生产的消息队列

3.3 自定义微服务-消费者: 消费MQ队列中生产的数据,可做数据统计等再加工处理。

3.4 解耦,削峰填谷

4、重点介绍下 微服务消费者

 MQ数据结构: 包括数据库,表,data表字段及数据,type: 新增 | 更新 | 删除

show core code:(原则是不写一行SQL)

Service

@Service
@Slf4j
@SuppressWarnings(value = "unchecked")
public class SyncServiceImpl implements ISyncService {@Overridepublic void processData(MQMessage message) {String tableName = message.getTable();// 构建映射表Map<String, TableMapperEnum> tableMapperEnumMap = TableMapperEnum.buildAllTableInfo();TableMapperEnum tableMapperEnum = MapUtils.getObject(tableMapperEnumMap, tableName);// 验证 tableName,确保它是预期的值if (Objects.isNull(tableMapperEnum)) {log.error("Invalid table name: " + tableName);return;}try {executeByMapper(tableMapperEnum, message);} catch (NoSuchBeanDefinitionException e) {log.error("Bean not found: " + e.getMessage());} catch (Exception e) {e.printStackTrace();log.error("Error processing data: " + e.getMessage());}}private void executeByMapper(TableMapperEnum tableMapperEnum, SyncDataMessage message) throws NoSuchBeanDefinitionException {IService service = null;List<JSONObject> data = message.getData();String type= message.getType();// 假设配置文件中存储了JSON字符串String jsonString = data.toString();switch (tableMapperEnum) {           case USER_ACTION:service = SpringUtils.getBean(IUserActionService.class);break;  default:log.error("Unsupported table mapper enum: " + tableMapperEnum.name());break;}boolean updateFlag = SYNC_TYPE_INSERT.equals(type) ||SYNC_TYPE_UPDATE.equals(type) ;boolean deleteFlag = SYNC_TYPE_DELETE.equals(type);if (service != null && updateFlag) {service.saveOrUpdateBatch(JSONArray.parseArray(jsonString, tableMapperEnum.getEntityClazz(), JSONReader.Feature.SupportSmartMatch));}if (service != null && deleteFlag) {List<Object> deleteIds = Lists.newArrayList();String pkName = message.getPkNames()[0];message.getData().forEach(item-> deleteIds.add(item.get(pkName)));service.removeByIds(deleteIds);}}
}

枚举类定义: 建立 表名,service ,实体class的关系

import cn.hutool.core.util.ClassUtil;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Getter;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;@Getter
@AllArgsConstructor
public enum TableMapperEnum {USER_ACTION(IUserActionService.class, UserAction.class)  ;/*** 要扫描的包名*/private static final String PACKAGE_NAME = "com.xxxxx.xxxx.domain.entity";private final Class<?> clazz;private final Class<?> entityClazz;public static TableMapperEnum getByClazz(Class<?> clazz) {return Arrays.stream(TableMapperEnum.values()).filter(pointRuleEnum -> pointRuleEnum.getEntityClazz().equals(clazz)).findFirst().orElse(null);}/*** 构建表名 与枚举关系** @return*/public static Map<String, TableMapperEnum> buildAllTableInfo() {// 获取指定包下所有类Set<Class<?>> classes = ClassUtil.scanPackage(PACKAGE_NAME);// 遍历类,检查是否有指定注解Map<String, TableMapperEnum> tableMapperEnumMap = new HashMap<>();Set<Class<?>> configClasses = Arrays.stream(TableMapperEnum.values()).map(TableMapperEnum::getEntityClazz).collect(Collectors.toSet());for (Class<?> clazz : classes) {// 在这里可以进一步处理找到的类,比如获取注解实例等TableName clazzAnnotation = clazz.getAnnotation(TableName.class);if (!configClasses.contains(clazz) || Objects.isNull(clazzAnnotation)) {continue;}buildMapperData(tableMapperEnumMap, clazz, clazzAnnotation);}return tableMapperEnumMap;}private static void buildMapperData(Map<String, TableMapperEnum> tableMapperEnumMap, Class<?> clazz, TableName clazzAnnotation) {String tableName = clazzAnnotation.value();TableMapperEnum curEnum = getByClazz(clazz);if (Objects.nonNull(curEnum) && hasCustomAnnotation(clazz)) {tableMapperEnumMap.put(tableName, curEnum);}}/*** 检查类是否包含指定的自定义注解。** @param clazz 类* @return 类上是否存在该自定义注解*/private static boolean hasCustomAnnotation(Class<?> clazz) {// 获取自定义注解的Class对象// 检查类、方法、字段上是否有该注解return clazz.isAnnotationPresent(TableName.class);}
}

其他的就是 Mybatis-plus 常用的写法

service ,mapper,entity等

import com.baomidou.mybatisplus.extension.service.IService;
import com.sikaryofficial.adaptermq.domain.entity.UserAction;/*** 用户行为Service接口** @author qinjinyuan* @date 2023-11-09*/
public interface IUserActionService extends IService<UserAction> {
}import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class UserActionServiceImpl extends ServiceImpl<UserActionMapper, UserAction> implements IUserActionService {@Autowiredprivate UserActionMapper userActionMapper;}

小结:

此方案解决了: 消息字段映射问题(下划线转驼峰),无需编写任何SQL,按需消费数据

缺点:未解决MySQL 表字段是 json 数组等复杂结构数据解析问题

其他文章参看:

canal 增量数据同步es 自定义客户端(1)_linux canal-CSDN博客

canal 增量数据同步es 自定义客户端(2)-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1410024.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【RAG 论文】Selfmem:使用 LLM 自己的输出来作为下一轮的 context 从而提升自己的生成效果

论文&#xff1a;Lift Yourself Up: Retrieval-augmented Text Generation with Self Memory ⭐⭐⭐⭐ NeurIPS 2023&#xff0c;北大 文章目录 一、论文速读二、实现细节2.1 检索增强的 Generator2.2 Memory Selector2.3 Generator 的两种 mode 总结 一、论文速读 在以为 RAG…

LLM系列(4):通义千问7B在Swift/DeepSpeed上微调秘诀与实战陷阱避坑指南

LLM系列(4):通义千问7B在Swift/DeepSpeed上微调秘诀与实战陷阱避坑指南 阿里云于 2023年8 月 3 日开源通义千问 70 亿参数模型,包括通用模型 Qwen-7B 以及对话模型 Qwen-7B-Chat,这也是国内首个开源自家大模型的大厂。在诸多权威大模型能力测评基准上,如 MMLU、C-Eval、…

规控不分家,实际岗位职责是如何划分的

1. 实践和演练 2. 自动驾驶技术分类 3. 自动驾驶关键技术 4. 自动驾驶架构 5. 感知perception

前端高频算法

分析算法排序&#xff1a; 时间复杂度: 一个算法执行所耗费的时间。 空间复杂度: 运行完一个程序所需内存的大小。 执行效率、内存消耗、稳定性 三方面入手。 1. 排序 1.1 冒泡排序 冒泡的过程只涉及相邻数据的交换操作&#xff0c;所以它的空间复杂度为 O(1)。 为了保证…

solidworks出现slderrresu.dll错误如何解决?亲测有效

通过近来给客户安装SolidWorks发现&#xff0c;SolidWorks2010、SolidWorks2012、SolidWorks2014、SolidWorks2015、SolidWorks2016、SolidWorks2017都会出现这个slderrresu.dll安装错误问题&#xff1a; 其实这个错误很好解决,主要是因為安裝中文版solidworks沒有選擇安裝中文…

社交媒体数据恢复:Tandem

Tandem数据恢复方法 1. 概述 Tandem 是致力於提供語言學習者和母語者交流的語言交換app&#xff0c;已發行iOS及Android版本。 使用者可以透過文字或者語音對談找到語言交換對象。 該應用程序於2020年4月支援超過160種語言&#xff0c;其中包含12種手語。 2. 操作步骤 2.1.…

linux 光驱(光盘)安装

文章目录 选择光盘自带 YUM 库创建 repo创建文件夹挂载光驱开机自启动挂载安装软件YUM 安装RPM 安装源码包安装 选择光盘 vmware 选择光盘 自带 YUM 库 ls /etc/yum.repos.d创建 repo vim /etc/yum.repo.d/demo.repo // 编写 repo 相关配置 [demo] namedemo baseurlfile://…

预训练模型介绍

一、什么是GPT GPT 是由人工智能研究实验室 OpenAI 在2022年11月30日发布的全新聊天机器人模型, 一款人工智能技术驱动的自然语言处理工具 它能够通过学习和理解人类的语言来进行对话, 还能根据聊天的上下文进行互动,能完成撰写邮件、视频脚本、文案、翻译、代码等任务 二、 为…

【Canvas技法】流星雨的实现

【关键点】 流星的绘制&#xff0c;本质上还是绘制一条直线&#xff0c;但在渲染上有差别。 通常绘制直线都是给的固定颜色&#xff0c;绘制流星给的是渐变色&#xff0c;渐变色的开头是与背景色对比度明显的亮色&#xff0c;结尾是与背景色相同的暗色&#xff0c;中间渐变过…

基于SSM的“一汽租车辆共享平台”的设计与实现(源码+数据库+文档+PPT)

基于SSM的“一汽租车辆共享平台”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 登录界面 租车界面 订单管理界面 财务报表界面 理赔界面 …

链表(数组实现的伟大二踢脚)

一.链表与数组 链表作为 C 语言中一种基础的数据结构&#xff0c;在平时写程序的时候用的并不多&#xff0c;但在操作系统里面使用的非常多。不管是RTOS还是Linux等使用非常广泛&#xff0c;所以必须要搞懂链表&#xff0c;链表分为单向链表和双向链表&#xff0c;单向链表很少…

c++大湾区模拟题4

一、单项选择题(共 15 题&#xff0c;每题 2 分&#xff0c;共计 30 分&#xff1b;每题有且仅有一个正确选项) 1. 以下哪些不是属于国家顶级域名的是&#xff08;&#xff09; A..au B..cn C.com D..jp 2. 一棵完全二叉树&#xff0c;共有 1234 个节点&#xff0c;其叶子…

2024年教你怎么将学浪视频保存到本地

你是否曾为无法将学浪视频保存到本地而烦恼&#xff1f;现在&#xff0c;我们将在2024年教给你如何解决这个问题&#xff01;只需简单几步操作&#xff0c;即可轻松将学浪视频保存到您的本地设备&#xff0c;随时随地想看就看&#xff01; 我已经将下载学浪的工具打包好了&…

与Apollo共创生态:探索自动驾驶的未来蓝图

目录 引言Apollo开放平台Apollo开放平台企业生态计划Apollo X 企业自动驾驶解决方案&#xff1a;加速企业场景应用落地Apollo开放平台携手伙伴共创生态生态共创会员权益 个人心得与展望技术的多元化应用数据驱动的智能化安全与可靠性的重视 结语 引言 就在2024年4月19日&#x…

解码Starknet Verifier:深入逆向工程之旅

1. 引言 Sandstorm为&#xff1a; 能提交独立proof给StarkWare的Ethereum Verifier&#xff0c;的首个开源的STARK prover。 开源代码见&#xff1a; https://github.com/andrewmilson/sandstorm&#xff08;Rust&#xff09; L2Beat 提供了以太坊上Starknet的合约架构图&…

一探究竟轻松畅玩:我独自升级崛起怎么玩 怎么快速上手的教程

一探究竟轻松畅玩&#xff1a;我独自升级崛起怎么玩 怎么快速上手的教程 最近一款漫改的MMORPG游戏《我独自升级&#xff1a;崛起》给玩家们带来了不少惊喜。在刚进入游戏时&#xff0c;玩家们需要从E级猎人开始玩起&#xff0c;逐步成长为S级猎人&#xff0c;通过升级学习新技…

ngrinder项目-本地调试遇到的坑

前提-maven mirrors配置 <mirrors><!--阿里公有仓库--><mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</ur…

在龙梦迷你电脑福珑2.0上试了三款操作系统

最近抽时间在龙梦迷你电脑上试了三款操作系统。 这几款操作系统以前都下载过。试用速度会快很多。 试用第一款&#xff1a;统信操作系统龙芯版。能正常安装。安装好了以后&#xff0c;下载了一个软件&#xff1a;龙芯游览器。修改该游览器的界面&#xff0c;不能实现所有页面…

C语言----贪吃蛇(补充)

各位看官好&#xff0c;我想大家应该已经看过鄙人的上一篇博客贪吃蛇了吧。鄙人在上一篇博客中只是着重的写了贪吃蛇的实现代码&#xff0c;但是前期的一些知识还没有具体的介绍&#xff0c;比如确认光标位置&#xff0c;句柄等。那么我这一篇博客就来补充上一篇博客所留下来的…

nacos2.3.x 修改登陆密钥

在使用nacos2.3.x的时候&#xff0c;启动之后&#xff0c;可以不用登陆&#xff0c;直接进入nacos的控制台&#xff0c;但是会提示去开启鉴权&#xff0c;开启的方式如下&#xff1a; 重启nacos之后&#xff0c;再次访问nacos时&#xff0c;就会跳到登陆页面&#xff0c;默认登…