目录
背景
问题
思路
方案
后记
背景
最近一直在忙一个能源类项目的物料管理平台,前一段算是正式上线了。 这个系统本身的业务不算复杂,难点在于新系统是对已经运行了5年的旧系统重构,并且还要将旧系统的存量数据都承接过来,旧系统的核心业务数据已经上亿,很大一部分表的数据都是千万,百万级别,这对于新系统的挑战很大。 之所以要重构,也是因为数据量太大,旧系统有一些核心功能无法再正常使用,像一些查询、物料提报等功能都不能很好的运行,加上现在都在说大模型以及智能化,我们的新平台也从这个角度有一些新的能力提供。
考虑到数据量的级别,对于常见的查询等功能,在满足现有功能要求的前提下,肯定是不能像旧系统一样完全靠Mysql的引擎去查询,所以我们引入的Elasticsearch作为查询引擎。这就延伸出Mysql中的数据和Elasticsearch中的数据一致性的挑战,从功能层面,就是有物料新增或者编辑的时候,要同步的查询或者更新ES中的数据,保证数据的一致性。 比较直观的一种方式就是通过同步程序,消费binlog保证两边数据一致,这种方式很多,例如这个这篇博文就讲的很好。但是我们的场景又有不同,ES中的索引并不是完全对应某一张表,索引中的字段来源于Mysql中一张主表和多张表的融合,不能单纯通过同步写入,所以入ES或者更新ES的动作是和对Mysql的操作耦合在一块的。
这就有分布式事务的问题,如何解决呢?
实际写代码的时候,给具体的方法加上事务注解,将所有Mysql的操作都放到方法的最前面,当所有Sql操作完了,最后再调用操作ES的代码,这样如果Mysql操作有异常,就回滚,此时还未写入ES;如果ES写入或者更新有异常,就会整体回退,这样就能保证数据的一致性了。
问题
前几天业务上有一批物料启用操作,物料启用会将状态是限制的物料启用,同时将使用了这个物料的分配关系也启用。 这个功能开发和测试的时候,考虑的批量导入的数据量不会超过1000,所以代码层面都是直接将这一批数据拿过来,批量去做查询和更新。但是业务上这次操作的时候,直接导入了1w+的记录,这就造成当时执行这个异步导入任务的节点出现CPU 700%的,并且很多服务出现502的情况。
经过紧急排查发现是直接拿1w+的code编码查询ES的时候出的问题,这对ES的压力和当前节点都有很大的压力,基本上把当前节点的CPU以及内存都吃完,当前节点基本卡死。后来对代码做了优化,改成超过500条,就分配次查询和处理,该问题解决。
但是后来业务上发现一些很奇怪的数据,这些数据在ES中存在,但是在Mysql中不存在。 这就很诡异了,正常来说,如果物料提报能写入ES,就说明mysql的执行操作肯定都正常,因为都在一个事务内,ES中的数据也是查询Mysql中得到的,不应该出现Mysql没有ES有的情况,除非有人手工删除Mysql数据,但是经过排查确认不存在这样的行为。 后来又对比发现这一批异常数据的时间都是前面导入数据时,服务出现502的那个时间段出现的。
经过排查和思考,感觉只能是Mysql相关操作都正常执行,但是最后事务提交失败了才会出现这样的情况。这就又回到分布式事务上来了!!但是为什么SQL都执行成功了,事务最后commit会失败呢?
思路
尝试在本地写了一个简单的逻辑代码,如下:
@Transactional(rollbackFor = Exception.class)public void testTransactional(){logger.info("新增配置数据");SysCommPara sysCommPara = new SysCommPara();sysCommPara.setpKey("EMOTION_FEAR_TT");sysCommPara.setpValue("哈哈");sysCommPara.setpRemark("哈哈");sysCommParaMapper.insert(sysCommPara);logger.info("更新机器人信息!");BotInfo botInfo = new BotInfo();botInfo.setBotId(5651401290434658952L);botInfo.setBotName("测试机器人");botInfo.setTenantId(10001L);botInfoMapper.updateByPrimaryKey(botInfo);System.out.println("虽然报异常,但是还是可以正常执行导这里的");logger.info("新增配置数据");SysCommPara sysCommPara1 = new SysCommPara();sysCommPara1.setpKey("EMOTION_FEAR_TT_tt");sysCommPara1.setpValue("哈哈");sysCommPara1.setpRemark("哈哈");sysCommParaMapper.insert(sysCommPara1);System.out.println("ok,function end~");//下面这段模拟ES操作for (int i=0;i<1000;i++){System.out.println("hahah");}System.out.println("OVER");}
在这段代码中,我尝试模拟以下几种场景,查看错误异常以及数据一致性
(1)Mysql的某个更新直接失败
这是比较常见也比较容易理解的场景,系统会直接报SQL异常,并且做回退,这种没什么问题。
(2)更新操作对应表本身被锁
这种其实和(1)是一样的,也是正常回退,并且报的是有事务未提交,无法更新。也就是说,会在对应Mapper这一行代码直接报异常,走正常的事务回滚。
(3)SQL都可以正常执行成功,在执行到for循环这个地方时,断开数据库连接
这种是不常见的,也是上面提到出问题的主要原因。 正常来说是不会出现这种情况的,这种异常信息如下:
om.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_101]at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_101]at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_101]at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_101]at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.Util.getInstance(Util.java:387) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:917) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:896) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:885) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1618) ~[mysql-connector-java-5.1.39.jar:5.1.39]at com.alibaba.druid.pool.DruidPooledConnection.commit(DruidPooledConnection.java:748) ~[druid-1.1.14.jar:1.1.14]
org.springframework.dao.DataAccessResourceFailureException: JDBC commit; Communications link failure during commit(). Transaction resolution unknown.; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:81)at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:79)at org.springframework.jdbc.support.JdbcTransactionManager.translateException(JdbcTransactionManager.java:184)at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:336)at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654)at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407)at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:698)at com.jingu.ai.robot.tool.SyncUploadOrderToCrmService$$EnhancerBySpringCGLIB$$b590bc55.testTransactional(<generated>)
可以看到在commit操作的时候失败了,到数据库中去看,发现数据是没提交的,也就是说Mysql自己做数据回滚。 但是ES其实是已经操作成功了。
注:从错误日志可以看到,这个异常是在上层调用的那个方法地方捕获的。
什么场景下会出现这种情况?简单整理,有以下三种场景:
① 当前节点fullgc了,也就是有高并发,或者高IO,系统资源不足了
② 网络出问题,到mysql不通了
③ Mysql负载压力太大,commit提交的命令没发送过去,或者发过去了,没执行,失败了
方案
针对这个问题,我们可以考虑在上层方法这里加一个try catch,在catch中讲写入ES的数据再删除掉做回退,从而保证数据的一致性!
try{//物料信息入库po = transactionService.saveWzbmRecord(reqBo, validationRspBO);
}
catch (Exception e)
{/*** 事务失败有几种情况:* 1、某个表被锁,这种情况,在内部方法执行到对这个表操作的时候,就往下走不动* 2、指定到最后,sql都正常执行了,但是事务失败了,这个会在上层调用这个方法的地方抛异常* 故,在上层这里做容错,避免出现数据不一致的情况,这种情况一般比较少见,主要的场景包括:* 1、当前节点fullgc了,也就是有高并发,或者高IO,系统资源不足了* 2、网络出问题,到mysql不通了* 3、Mysql负载压力太大,commit提交的命令没发送过去,或者发过去了,没执行,失败了*/logger.error("保存或者新增物料失败,异常出错,回退ES数据,失败信息为:{}",e.getMessage());//回退ES中数据List<Long> codeIdList = new ArrayList<>();codeIdList.add(po.getCodeId());syncDataService.deleteMaterialToEs(codeIdList,true, reqBo.getAddType() == 2 || reqBo.getAddType() == 3);rspBO.setCode(CONST.FAILED_CODE);rspBO.setMessage("操作失败,请联系运维核查数据库状态及应用负载情况,或者提交重试!");return rspBO;
}
后记
① 没有调用commit的Mysql操作,或者调用commit失败,到底是谁操作回退的呢? 这是Mysql自身的机制,如果在调用commit之前,mysql挂了,那么数据是会自动回滚;如果最后没有调用commit,事务也会自动回滚。
② 之所以记录这个问题,主要还是在于对事物的理解以及对框架处理事务的机制了解不够清晰,我最开始想的是直接将ES的操作放到事物外面,当Mysql操作成功事务提交了,再去处理ES,这就不就刚好解决问题,但是这又回到原点了,因为ES出错的可能性更大。所以还是要放到一块;但是在什么地方捕获这个commit的失败异常呢? 所以才有了上面的尝试,最终发现是在调用加了事务的方法那一层去捕获,这样就可以在catch中对数据做回退,从而解决问题。