Seata 源码篇之AT模式启动流程 - 下 - 04

Seata 源码篇之AT模式启动流程 - 下 - 04

  • 全局事务提交
  • 分支事务全局提交
  • 全局事务回滚
  • 分支事务全局回滚
  • 小结


本系列文章:

  • Seata 源码篇之核心思想 - 01
  • Seata 源码篇之AT模式启动流程 - 上 - 02
  • Seata 源码篇之AT模式启动流程 - 中 - 03

上一篇文章,我们看了Seata AT模式一阶段提交流程,本文我们来看看AT模式的二阶段流程和全局事务提交回滚逻辑的实现。


全局事务提交

当某个分支事务执行完本地业务SQL语句后,下一步就进入全局事务提交环节了,此处我们可以回顾TransactionalTemplate模版类的execute方法,如下所示:

    public Object execute(TransactionalExecutor business) throws Throwable {// 1. 获取当前全局事务的上下文信息TransactionInfo txInfo = business.getTransactionInfo();...// 1.1 判断当前是否已经存在一个全局事务GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 根据不同的全局事务传播行为进行处理Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {...}// 1.3 将当前全局锁配置设置到本地线程缓存中,然后返回先前的配置GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. 如果当前线程是全局事务的发起者,即TM,则给TC发送一个开启全局事务的请求,否则只是简单回调相关钩子方法beginTransaction(txInfo, tx);Object rs;try {// 3. 执行当前分支事务对应的本地事务rs = business.execute();} catch (Throwable ex) {// 4. 分支事务执行发生异常,判断对应异常是否需要回滚,如果需要则回滚当前全局事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 5. 当前分支事务执行正常,由TM发送提交全局事务的请求commitTransaction(tx, txInfo);return rs;} finally {// 6. 资源清理和恢复,同时触发钩子回调resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// 7. 如果存在被挂起的全局事务,则进行恢复if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}

本节我们来看一下全局事务提交的commitTransaction方法实现:

    private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)throws TransactionalExecutor.ExecutionException, TransactionException {// 1. 判断全局事务是否超时if (isTimeout(tx.getCreateTime(), txInfo)) {// business execution timeoutException exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,String.format("client detected transaction timeout before commit, so change to rollback, xid = %s", tx.getXid()));rollbackTransaction(tx, exx);return;}try {// 2. 触发回调埋点,同时指向事务提交动作triggerBeforeCommit();tx.commit();// 3. 记录事务执行提交动作后的状态GlobalStatus afterCommitStatus = tx.getLocalStatus();TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;switch (afterCommitStatus) {case TimeoutRollbacking:code = TransactionalExecutor.Code.Rollbacking;break;case TimeoutRollbacked:code = TransactionalExecutor.Code.RollbackDone;break;case Finished:code = TransactionalExecutor.Code.CommitFailure;break;default:}// 4. 如果事务提交失败或者超时,则抛出对应的异常信息Exception statusException = null;if (GlobalStatus.isTwoPhaseHeuristic(afterCommitStatus)) {statusException = new TmTransactionException(TransactionExceptionCode.CommitHeuristic,String.format("Global transaction[%s] not found, may be rollbacked.", tx.getXid()));} else if (GlobalStatus.isOnePhaseTimeout(afterCommitStatus)) {statusException = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid()));}if (null != statusException) {throw new TransactionalExecutor.ExecutionException(tx, statusException, code);}//  5. 正常提交后,触发对应的回调埋点triggerAfterCommit();} catch (TransactionException txe) {// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}

DefaultGlobalTransaction 的 commit 方法负责完成全局事务的提交,当然全局事务提交由TM执行,如果当前分支事务角色是RM,这里直接返回,啥也不干:

    @Overridepublic void commit() throws TransactionException {// 1. 如果是RM角色,直接返回if (role == GlobalTransactionRole.Participant) {return;}// 2. 如果是TM角色,则尝试执行全局事务提交,如果提交失败了,则进行多轮尝试int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {retry--;status = transactionManager.commit(xid);break;} catch (Throwable ex) {if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {// 3. TM全局事务提交成功后,执行XID解绑if (xid.equals(RootContext.getXID())) {suspend(true);}}...}

DefaultTransactionManager 在Seata中的职责主要作为防腐层存在,负责屏蔽与TC的通信过程,下面我们看看其commit方法实现:

    @Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();}

当TM完成全局事务提交后,下面便是由TC通知其他分支事务执行全局提交了,也就是二阶段提交,二阶段提交的主要任务就是异步删除本地的undo日志。


分支事务全局提交

各个分支事务的全局提交由TC异步回调通知完成,如下图所示:

在这里插入图片描述

具体代码存在于DefaultRMHandler的handle方法中:

    @Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {MDC.put(RootContext.MDC_KEY_XID, request.getXid());MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));// 利用分支事务类型,获取对应的处理器,然后调用处理器的handle方法,处理分支事务提交请求return getRMHandler(request.getBranchType()).handle(request);}

这里获取的处理器类型为RMHandlerAT,所以最终会调用RMHandlerAT的handle方法处理分支事务提交请求:

public abstract class AbstractRMHandler extends AbstractExceptionHandlerimplements RMInboundHandler, TransactionMessageHandler {@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {// 真正执行分支事务提交的方法doBranchCommit(request, response);}}, request, response);return response;}...
}
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();// 调用资源管理器的branchCommit方法,完成分支事务提交BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);}

由于这里我们使用的是AT模式,所以最终会调用DataSourceManager的branchCommit方法完成分支事务的提交:

    @Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {return asyncWorker.branchCommit(xid, branchId, resourceId);}

asyncWorker 通过名字可以猜到,此处采用的是异步提交方式,所以下面我们来看看asyncWorker是如何进行异步提交的:

    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {// 准备两阶段提交上下文信息,然后加入分支事务提交队列中Phase2Context context = new Phase2Context(xid, branchId, resourceId);addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;}

不难看出,此处采用的是典型的生产者-消费者模式,下面看看具体是谁会从提交队列中取出任务执行:

    private void addToCommitQueue(Phase2Context context) {// 直接将任务加入队列中去,然后返回if (commitQueue.offer(context)) {return;}// 如果队列满了,则立即让线程池处理一波任务,然后再尝试将当前任务加入队列// 此处的thenRun方法是异步执行的CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));}

队列和定时任务线程池初始化过程可以在AsyncWorker类的构造函数中寻见:

    public AsyncWorker(DataSourceManager dataSourceManager) {this.dataSourceManager = dataSourceManager;// 默认队列大小为10000 commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);// 启动定时任务线程池,每秒执行一次任务ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);}

下面可以来看看具体执行的是什么样的任务:

    void doBranchCommitSafely() {doBranchCommit();}private void doBranchCommit() {if (commitQueue.isEmpty()) {return;}// 1. 取出队列中所有任务List<Phase2Context> allContexts = new LinkedList<>();commitQueue.drainTo(allContexts);// 2. 按照资源ID对分支事务提交任务进行分组Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);// 3. 依次处理每个任务groupedContexts.forEach(this::dealWithGroupedContexts);}  private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {...// 1. 通过资源ID获取对应的数据源代理器DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);// 2. 删除undo日志Connection conn = null;conn = dataSourceProxy.getPlainConnection();...UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);for (List<Phase2Context> partition : splitByLimit) {deleteUndoLog(conn, undoLogManager, partition);}...} private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {Set<String> xids = new LinkedHashSet<>(contexts.size());Set<Long> branchIds = new LinkedHashSet<>(contexts.size());contexts.forEach(context -> {xids.add(context.xid);branchIds.add(context.branchId);});...// 删除undo_log表中的undo_log日志// 这里提交的事务是为了确保批量删除undo_log日志这一过程的原子性 undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);if (!conn.getAutoCommit()) {conn.commit();}...} 

可以看到分支事务全局提交逻辑很简单,就是借助asyncWorker完成undo日志的异步删除。


全局事务回滚

全局事务回滚逻辑存在于TransactionalTemplate的completeTransactionAfterThrowing方法中:

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)throws TransactionalExecutor.ExecutionException, TransactionException {// 如果需要对当前异常执行回滚,则执行全局事务回滚操作,否则还是执行全局事务提交if (txInfo != null && txInfo.rollbackOn(originalException)) {rollbackTransaction(tx, originalException);} else {// not roll back on this exception, so commitcommitTransaction(tx, txInfo);}}
    private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {try {// 执行全局事务回滚逻辑和前后回调埋点triggerBeforeRollback();tx.rollback();triggerAfterRollback();}...}

DefaultGlobalTransaction 的 rollback 方法负责完成全局事务的回滚,当然全局事务回滚也由TM执行,如果当前分支事务角色是RM,这里直接返回,啥也不干:

   @Overridepublic void rollback() throws TransactionException {// 1. 如果当前分支事务的角色是RM,则直接返回if (role == GlobalTransactionRole.Participant) {...return;}...// 2. 如果当前分支事务角色是TM,则通知TC负责发起全局事务回滚请求int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {retry--;status = transactionManager.rollback(xid);break;} catch (Throwable ex) {...}}} finally {if (xid.equals(RootContext.getXID())) {suspend(true);}}...}

DefaultTransactionManager 在Seata中的职责主要作为防腐层存在,负责屏蔽与TC的通信过程,下面我们看看其rollback方法实现:

    @Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}

分支事务全局回滚

各个分支事务的本地由TC异步回调通知完成,如下图所示:

在这里插入图片描述

具体代码存在于DefaultRMHandler的handle方法中:

    @Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {MDC.put(RootContext.MDC_KEY_XID, request.getXid());MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));// 利用分支事务类型,获取对应的处理器,然后调用处理器的handle方法,处理分支事务提交请求return getRMHandler(request.getBranchType()).handle(request);}

这里获取的处理器类型为RMHandlerAT,所以最终会调用RMHandlerAT的handle方法处理分支事务提交请求:

public abstract class AbstractRMHandler extends AbstractExceptionHandlerimplements RMInboundHandler, TransactionMessageHandler {@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response = new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {// 执行全局回滚doBranchRollback(request, response);}}, request, response);return response;}...
}
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();// 调用资源管理器的branchRollback方法,完成分支事务回滚BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);}

由于这里我们使用的是AT模式,所以最终会调用DataSourceManager的branchRollback方法完成分支事务的回滚:

    @Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);...// 利用undo日志完成回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);...return BranchStatus.PhaseTwo_Rollbacked;}

关于如何利用undo日志完成回滚,这块内容将在本系列后面进行讲解,本文暂时不做深究。


小结

到目前为止,我们大体浏览了Seata AT模式整体实现流程。后面,我们将深入Server模块进行研究,以及Seata的RPC模块,感兴趣的童鞋可以持续关注。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149643.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

maven 初学

1. maven 安装 配置安装 路径 maven 下载位置: D:\software\apache-maven-3.8.6 默认仓库位置: C:\Users\star-dream\.m2\repository 【已更改】 本地仓库设置为&#xff1a;D:\software\apache-maven-3.8.6\.m2\repository 镜像已更改为阿里云中央镜像仓库 <mirrors>…

数据结构与算法(一):概述与复杂度分析

参考引用 Hello 算法 Github 仓库&#xff1a;hello-algo 1. 初识算法 1.1 算法无处不在 1.1.1 二分查找&#xff1a;查阅字典 在字典里&#xff0c;每个汉字都对应一个拼音&#xff0c;而字典是按照拼音字母顺序排列的。假设我们需要查找一个拼音首字母为 r 的字&#xff0…

常见的软件脱壳思路

单步跟踪法 1.本方法采用OD载入。 2.跟踪F8&#xff0c;实现向下的跳。 3.遇到程序回跳按F4。 4.绿色线条表示跳转没实现&#xff0c;不用理会&#xff0c;红色线条表示跳转已经实现&#xff01; 5.刚载入程序有一个CALL的&#xff0c;我们就F7跟进去&#xff0c;不然程序很容…

AUTOSAR通信篇 - CAN网络通信(六:CanNm)

文章目录 功能介绍协调算法工作模式网络模式Repeat Message State&#xff08;重复消息状态&#xff09;Normal Operation State&#xff08;正常运行/工作状态&#xff09;Ready Sleep State&#xff08;就绪睡眠状态&#xff09; Prepare Bus Sleep Mode&#xff08;预休眠模…

新款UI动态壁纸头像潮图小程序源码

新款UI动态壁纸头像潮图小程序源码&#xff0c;不需要域名服务器&#xff0c;直接添加合法域名&#xff0c;上传发布就能使用。 可以对接开通流量主&#xff0c;个人也能运营&#xff0c;不需要服务器源码完整。整合头像&#xff0c;动态壁纸&#xff0c;文案功能齐全。 源码…

H5移动端购物商城系统源码 小型商城全新简洁风格全新UI 支持易支付接口

一款比较简单的 H5 移动端购物商城系统源码&#xff0c;比较适合单品商城、小型商城使用。带有易支付接口。 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/88391704 源码下载2&#xff1a;评论留言或私信留言

微服务的初步使用

环境说明 jdk1.8 maven3.6.3 mysql8 idea2022 spring cloud2022.0.8 微服务案例的搭建 新建父工程 打开IDEA&#xff0c;File->New ->Project&#xff0c;填写Name&#xff08;工程名称&#xff09;和Location&#xff08;工程存储位置&#xff09;&#xff0c;选…

arm代码

RISC精简指令集 长度和执行周期固定 长度为一条机器指令在计算机占用的内存大小 指令周期为CPU执行一条机器指令所发费的时间(时钟周期由CPU工作频率决定) CISC复杂指令集 其架构一般用于PC端 X86和X64都是负载指令集CPU 更注重指令的功能性 指令周期和长度都不固定 ar…

斯坦福数据挖掘教程·第三版》读书笔记(英文版)Chapter 10 Mining Social-Network Graphs

来源&#xff1a;《斯坦福数据挖掘教程第三版》对应的公开英文书和PPT。 Chapter 10 Mining Social-Network Graphs The essential characteristics of a social network are: There is a collection of entities that participate in the network. Typically, these entiti…

Scala第十七章节

Scala第十七章节 scala总目录 文档资料下载 章节目标 了解集合的相关概念掌握Traversable集合的用法掌握随机学生序列案例 1. 集合 1.1 概述 但凡了解过编程的人都知道程序 算法 数据结构这句话, 它是由著名的瑞士计算机科学家尼古拉斯沃斯提出来的, 而他也是1984年图灵…

Linux环境搭建SVN服务器并实现公网访问 - cpolar端口映射

文章目录 前言1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件 3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口 5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6…

@ConfigurationProperties配置绑定~

ConfigurationProperties注解是Spring Boot中的一个注解&#xff0c;用于将配置文件中的属性值绑定到Java类中的字段上。 ConfigurationProperties注解的作用包括&#xff1a; 实现配置文件属性和Java类字段的映射&#xff0c;简化了读取配置文件的操作。 可以指定配置文件中…

Elasticsearch:什么时候应该考虑在 Elasticsearch 中添加协调节点?

仅协调节点&#xff08;coordinating only nodes&#xff09;充当智能负载均衡器。 仅协调节点的这种特殊角色通过减轻数据和主节点的协调责任&#xff0c;为广泛的集群提供了优势。 加入集群后&#xff0c;这些节点与任何其他节点类似&#xff0c;都会获取完整的集群状态&…

全志ARM926 Melis2.0系统的开发指引⑤

全志ARM926 Melis2.0系统的开发指引⑤ 编写目的8. 固件修改工具(ImageModify)使用8.1.界面说明8.2.操作步骤8.2.1. 配置平台8.2.2. 选择固件8.2.3. 选择要替换的文件8.2.4. 替换文件8.2.5. 保存固件 8.3.注意事项8.4.增加固件修改权限设置8.4.1. 概述8.4.2. 操作说明8.4.2.1.打…

【C语言经典100例题-70】求一个字符串的长度(指针)

代码 使用指针来遍历字符串&#xff0c;直到遇到字符串结尾的空字符\0为止&#xff0c;统计字符数量即为字符串长度。 #include<stdio.h> #define n 20 int getlength(char *a) {int len 0;while(*a!\0){len;a;}return len; } int main() {char *arr[n] { 0 };int l…

Stable diffusion的架构解读(本博客还是以unet架构为主)

博客只是简单的记录一下自己学的&#xff0c;基于自己的一些情况&#xff0c;所以简单了一些只是将来忘记&#xff0c;用来回顾用。 论文的大体框架 unet结构位于 unet会接受prompt特征、latent特征、和t时间步特征&#xff0c;最后生成新一轮的特征 可以参考知乎大佬htt…

计算机竞赛 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的行人重识别算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c…

全志ARM926 Melis2.0系统的开发指引⑦

全志ARM926 Melis2.0系统的开发指引⑦ 编写目的11. 调屏11.1. 调屏步骤简介11.1.1. 判断屏接口。11.1.2. 确定硬件连接。11.1.3. 配置显示部分 sys_config.fex11.1.3.1. 配置屏相关 IO 11.1.4. Lcd_panel_cfg.c 初始化文件中配置屏参数11.1.4.1. LCD_cfg_panel_info11.1.4.2. L…

虚拟机联网

桥接 桥接模式就是虚拟机与你的电脑平起平做&#xff0c;都有同样的IP&#xff0c;且与你的电脑在同一网段下&#xff0c;就能够上网。 电脑的IP的地址 虚拟机的ip地址 设置vm1的ip地址与网关与电脑相同 如果出现ssh连接虚拟机不成功的问题&#xff0c;无其他问题时&#xff0…

Next.js 入门笔记

前言 之前初步体验了 React 的魅力, 又看文档理解了一下 useState 和 useEffect, 目前初步理解的概念是: useState 用来声明在组件中使用并且需要修改的变量 useEffect 用来对 useState 声明的变量进行初始化赋值 可能理解的不太准确, 不过大概差不多是这么个意思. 但是再往后…