1、为啥要有数据同步?
比如要做一些推荐或者其他与业务不强依赖的业务,这个时候,又不想直接去业务库取数据或者查数据进行计算,但是又需要业务库的某些数据: 比如用户行为。。。等
2、有很多数据同步,为啥要定制?
如果只抓取部分表的部分数据,这个时候,定制同步服务,肯定是有必要的。
目前有很多中间件 DBSyncer , Canal 等可以选择;
3、介绍下方案
3.1 部署canal deployer ,运行它,相当于:一个自助监听读取 mysql bin log的服务,并可以把监听到的数据,生产到 MQ中
3.2 MQ 中间件,部署和启动。承载Canal deployer生产的消息队列
3.3 自定义微服务-消费者: 消费MQ队列中生产的数据,可做数据统计等再加工处理。
3.4 解耦,削峰填谷
4、重点介绍下 微服务消费者
MQ数据结构: 包括数据库,表,data表字段及数据,type: 新增 | 更新 | 删除
show core code:(原则是不写一行SQL)
Service
@Service
@Slf4j
@SuppressWarnings(value = "unchecked")
public class SyncServiceImpl implements ISyncService {@Overridepublic void processData(MQMessage message) {String tableName = message.getTable();// 构建映射表Map<String, TableMapperEnum> tableMapperEnumMap = TableMapperEnum.buildAllTableInfo();TableMapperEnum tableMapperEnum = MapUtils.getObject(tableMapperEnumMap, tableName);// 验证 tableName,确保它是预期的值if (Objects.isNull(tableMapperEnum)) {log.error("Invalid table name: " + tableName);return;}try {executeByMapper(tableMapperEnum, message);} catch (NoSuchBeanDefinitionException e) {log.error("Bean not found: " + e.getMessage());} catch (Exception e) {e.printStackTrace();log.error("Error processing data: " + e.getMessage());}}private void executeByMapper(TableMapperEnum tableMapperEnum, SyncDataMessage message) throws NoSuchBeanDefinitionException {IService service = null;List<JSONObject> data = message.getData();String type= message.getType();// 假设配置文件中存储了JSON字符串String jsonString = data.toString();switch (tableMapperEnum) { case USER_ACTION:service = SpringUtils.getBean(IUserActionService.class);break; default:log.error("Unsupported table mapper enum: " + tableMapperEnum.name());break;}boolean updateFlag = SYNC_TYPE_INSERT.equals(type) ||SYNC_TYPE_UPDATE.equals(type) ;boolean deleteFlag = SYNC_TYPE_DELETE.equals(type);if (service != null && updateFlag) {service.saveOrUpdateBatch(JSONArray.parseArray(jsonString, tableMapperEnum.getEntityClazz(), JSONReader.Feature.SupportSmartMatch));}if (service != null && deleteFlag) {List<Object> deleteIds = Lists.newArrayList();String pkName = message.getPkNames()[0];message.getData().forEach(item-> deleteIds.add(item.get(pkName)));service.removeByIds(deleteIds);}}
}
枚举类定义: 建立 表名,service ,实体class的关系
import cn.hutool.core.util.ClassUtil;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Getter;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;@Getter
@AllArgsConstructor
public enum TableMapperEnum {USER_ACTION(IUserActionService.class, UserAction.class) ;/*** 要扫描的包名*/private static final String PACKAGE_NAME = "com.xxxxx.xxxx.domain.entity";private final Class<?> clazz;private final Class<?> entityClazz;public static TableMapperEnum getByClazz(Class<?> clazz) {return Arrays.stream(TableMapperEnum.values()).filter(pointRuleEnum -> pointRuleEnum.getEntityClazz().equals(clazz)).findFirst().orElse(null);}/*** 构建表名 与枚举关系** @return*/public static Map<String, TableMapperEnum> buildAllTableInfo() {// 获取指定包下所有类Set<Class<?>> classes = ClassUtil.scanPackage(PACKAGE_NAME);// 遍历类,检查是否有指定注解Map<String, TableMapperEnum> tableMapperEnumMap = new HashMap<>();Set<Class<?>> configClasses = Arrays.stream(TableMapperEnum.values()).map(TableMapperEnum::getEntityClazz).collect(Collectors.toSet());for (Class<?> clazz : classes) {// 在这里可以进一步处理找到的类,比如获取注解实例等TableName clazzAnnotation = clazz.getAnnotation(TableName.class);if (!configClasses.contains(clazz) || Objects.isNull(clazzAnnotation)) {continue;}buildMapperData(tableMapperEnumMap, clazz, clazzAnnotation);}return tableMapperEnumMap;}private static void buildMapperData(Map<String, TableMapperEnum> tableMapperEnumMap, Class<?> clazz, TableName clazzAnnotation) {String tableName = clazzAnnotation.value();TableMapperEnum curEnum = getByClazz(clazz);if (Objects.nonNull(curEnum) && hasCustomAnnotation(clazz)) {tableMapperEnumMap.put(tableName, curEnum);}}/*** 检查类是否包含指定的自定义注解。** @param clazz 类* @return 类上是否存在该自定义注解*/private static boolean hasCustomAnnotation(Class<?> clazz) {// 获取自定义注解的Class对象// 检查类、方法、字段上是否有该注解return clazz.isAnnotationPresent(TableName.class);}
}
其他的就是 Mybatis-plus 常用的写法
service ,mapper,entity等
import com.baomidou.mybatisplus.extension.service.IService;
import com.sikaryofficial.adaptermq.domain.entity.UserAction;/*** 用户行为Service接口** @author qinjinyuan* @date 2023-11-09*/
public interface IUserActionService extends IService<UserAction> {
}import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class UserActionServiceImpl extends ServiceImpl<UserActionMapper, UserAction> implements IUserActionService {@Autowiredprivate UserActionMapper userActionMapper;}
小结:
此方案解决了: 消息字段映射问题(下划线转驼峰),无需编写任何SQL,按需消费数据
缺点:未解决MySQL 表字段是 json 数组等复杂结构数据解析问题
其他文章参看:
canal 增量数据同步es 自定义客户端(1)_linux canal-CSDN博客
canal 增量数据同步es 自定义客户端(2)-CSDN博客