Seata 源码篇之AT模式启动流程 - 中 - 03

Seata 源码篇之AT模式启动流程 - 中 - 03

  • 数据源代理
    • 会话代理
      • 锁定查询执行器
        • 本地事务提交
        • 本地事务回滚
      • 更新执行器
      • 删除执行器
      • 插入执行器
  • 小节


本系列文章:

  • Seata 源码篇之核心思想 - 01
  • Seata 源码篇之AT模式启动流程 - 上 - 02

数据源代理

当我们的数据源被代理后,代理数据源方法调用会走AOP拦截逻辑,也就是被SeataAutoDataSourceProxyAdvice的invoke方法拦截。invoke方法内部会将原本调用DataSource的方法转发给SeataDataSourceProxy执行:

   @Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {// 1. 检查当前是否存在全局事务,或者是否需要获取全局锁if (!inExpectedContext()) {return invocation.proceed();}// 2. 获取当前调用的是数据源的哪个方法Method method = invocation.getMethod();String name = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();Method declared;try {declared = DataSource.class.getDeclaredMethod(name, parameterTypes);} catch (NoSuchMethodException e) {return invocation.proceed();}// 3. 取出当前数据源对应的SeataDataSourceProxy,然后调用代理数据源对应的方法DataSource origin = (DataSource) invocation.getThis();SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);Object[] args = invocation.getArguments();return declared.invoke(proxy, args);}

在这里插入图片描述

SeataDataSourceProxy这里采用装饰器模式实现对DataSource的增强,同时借助动态代理实现对用户的无感装饰。这里有趣的一点在于,为什么不直接在拦截器invoke方法内部实现拦截逻辑,而是借助装饰器倒了一手,大家可以思考一下原因。

我们通常会通过DataSource的getConnection方法从连接池中获取一个空闲连接,然后借助Connection创建一个会话对象Statement,最后利用Statament对象完成SQL语句的执行。Seata需要拦截SQL执行,那么就不仅需要在DataSource层面做装饰增强,还需要在Connection和Statement层面同样进行装饰增强。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {@Overridepublic ConnectionProxy getConnection() throws SQLException {Connection targetConnection = targetDataSource.getConnection();// 返回的Connection对象同样采用装饰器进行增强return new ConnectionProxy(this, targetConnection);}
...
}    

ConnectionProxy 内部会在创建Statement会话对象的时候进行装饰增强:

public abstract class AbstractConnectionProxy implements Connection {@Overridepublic Statement createStatement() throws SQLException {Statement targetStatement = getTargetConnection().createStatement();// 返回的Statement采用装饰器进行增强return new StatementProxy(this, targetStatement);}... 
}    

但是ConnectionProxy不仅仅负责对Statement对象进行装饰,Seata还需要能够在commit和rollback等时间点进行拦截,因此ConnectionProxy的commit和rollback方法就不能只是简单的方法转发了,而是需要增加相关拦截逻辑,这一点后文会讲到。

同时ConnectionProxy内部还需要维护本次连接期间的上下文信息,上下文信息由ConnectionContext保存:

public class ConnectionContext {// 全局事务IDprivate String xid;// 分支事务IDprivate Long branchId;// 是否需要获取全局锁private boolean isGlobalLockRequire;// 自动提交的状态是否变更过private boolean autoCommitChanged;private final Map<String, Object> applicationData = new HashMap<>(2, 1.0001f);private final Map<Savepoint, Set<String>> lockKeysBuffer = new LinkedHashMap<>();private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();private final List<Savepoint> savepoints = new ArrayList<>(8);...
}

StatementProxy 负责拦截本次会话中执行的每条SQL语句,并通过解析,查询前置和后置镜像,组装undo_log日志,最终完成本地事务的提交。

下面我们将来仔细分析一下StatementProxy的模版流程实现。


会话代理

StatementProxy 类中的拦截逻辑也是以模版方法固定下来的,但是由于模版逻辑存在于query,insert,delete 和 update 逻辑中,所以这里将模版逻辑抽取到了ExecuteTemplate类中:

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {@Overridepublic ResultSet executeQuery(String sql) throws SQLException {this.targetSQL = sql;return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);}@Overridepublic int executeUpdate(String sql) throws SQLException {this.targetSQL = sql;return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);}...
}

到此,应该可以猜到,SQL解析和前后镜像组织的核心逻辑都汇聚于ExecteTemplate类的execute方法中,下面我们来详细看看具体实现:

public class ExecuteTemplate {public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {return execute(null, statementProxy, statementCallback, args);}public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {// 1. 当前事务执行无需获取全局锁,直接调用原本的Statement方法if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {return statementCallback.execute(statementProxy.getTargetStatement(), args);}// 2. 根据DB类型,获取对应的SQL解析器String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}Executor<T> executor;// 3. 当前SQL无需执行任何拦截处理,直接调用原本的Statement方法if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {// 4. 如果SQL解析器只存在唯一的一个if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);// 5. 根据当前SQL类型,获取不同类型的执行拦截器switch (sqlRecognizer.getSQLType()) {case INSERT:executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},new Object[]{statementProxy, statementCallback, sqlRecognizer});break;// 下面就是case update , case delete 等同质的选择逻辑了...}} else {// 6. 当存在多个SQL解析器时executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;// 7. 执行器的execute方法中包含SQL解析等逻辑,但是不同操作对于的处理逻辑不太一样,所以需要使用不同的执行器类型rs = executor.execute(args);...return rs;}}

ExecuteTemplate 会根据所执行的SQL语句类型不同,通过SPI加载不同类型的执行器来执行,执行器继承体系如下所示:
在这里插入图片描述
SQLRecognizer 作为SQL解析器,由于内部使用Druid作为最终解析工具,所有看做是Seata与Druid的一层隔离,防止两者直接耦合在一起。不同类型的SQL语句同样对应不同类型的SQLRecognizer实现,具体如下图所示:
在这里插入图片描述
执行器主要负责解析SQL语句来组织回滚日志,执行本地事务,获取全局锁以及提交本地事务。

下面我们来看看不同场景下,执行器执行逻辑的区别。


锁定查询执行器

我们首先来看最简单的SelectForUpdateExecutor实现,针对select … for update 语句进行拦截增强:

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {@Overridepublic T execute(Object... args) throws Throwable {// 1. 获取全局事务ID,并绑定到当前连接上下文中String xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}// 2. 将是否需要全局锁这一标识设置到连接上下文中 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());// 3. 真正执行查询的方法return doExecute(args);}...
}
public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {@Overridepublic T doExecute(Object... args) throws Throwable {// 1. 获取代理数据源连接Connection conn = statementProxy.getConnection();// 2. 从代理数据源连接中获取当前数据库元数据信息DatabaseMetaData dbmd = conn.getMetaData();T rs;Savepoint sp = null;boolean originalAutoCommit = conn.getAutoCommit();try {// 3. 如果当前数据源开启了事务自动提交,则将自动提交暂时关闭if (originalAutoCommit) {conn.setAutoCommit(false);} else if (dbmd.supportsSavepoints()) {// 4. 如果当前数据源关闭了事务自动提交,则在当前数据源支持回滚点的前提下,创建一个回滚点// 如果因为全局锁获取失败,需要执行全局回滚,则可以直接回滚到当前事务执行到此处的状态,而非把之前的操作全部执行回滚sp = conn.setSavepoint();} else {throw new SQLException("not support savepoint. please check your db version");}LockRetryController lockRetryController = new LockRetryController();ArrayList<List<Object>> paramAppenderList = new ArrayList<>();// 5. 构建SQL语句负责查询出本次查询涉及到的所有记录: select 主键 from 表 需要执行的sql的where子句String selectPKSQL = buildSelectSQL(paramAppenderList);// 这里的while循环为的是全局锁获取失败后,进行重试while (true) {try {// 6. 执行目标SQL查询语句rs = statementCallback.execute(statementProxy.getTargetStatement(), args);// 7. 执行before image对应的SQL语句,同时利用返回的记录列表,构建全局锁的key,该全局锁覆盖本次查询得到的记录列表TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);String lockKeys = buildLockKey(selectPKRows);if (StringUtils.isNullOrEmpty(lockKeys)) {break;}// 8. 本地事务执行完毕,提交前,尝试获取全局锁  if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {// Do the same thing under either @GlobalTransactional or @GlobalLock, // that only check the global lock  here.statementProxy.getConnectionProxy().checkLock(lockKeys);} else {throw new RuntimeException("Unknown situation!");}break;} catch (LockConflictException lce) {// 9. 获取全局锁失败,会先回滚当前本地事务,然后休眠指定时间后,再次重试if (sp != null) {conn.rollback(sp);} else {conn.rollback();}// trigger retrylockRetryController.sleep(lce);}}} finally {// 10. 如果有需要,则释放先前临时的创建的回滚点,同时将自动提交设置更改回来if (sp != null) {try {if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {conn.releaseSavepoint(sp);}} catch (SQLException e) {LOGGER.error("{} release save point error.", getDbType(), e);}}// 11. 如果先前开启了自动提交,此处需要提交本地事务,同时将原本的自动提交设置更改回来if (originalAutoCommit) {conn.setAutoCommit(true);}}// 12. 返回执行目标SQL语句得到的结果return rs;}...
}

由于select … for update 锁定读不涉及数据修改,所以也就无需前置和后置镜像了,但是这里有四点需要注意一下:

  1. selectPKSQL 如何构建出来的 ?
// 目标SQL语句
select * from test where id = 1 and name = 'dhy' for update;
// 构建得到的selectPKSQL,组装规则为: select + 主键 + from 表 + 目标SQL语句的where子句
SELECT id FROM test WHERE id = 1 AND name = 'dhy' FOR UPDATE;
  1. 全局锁的key是如何构成的 ?
// 对于只有单个主键的情况,例如上面的SQL语句,则key的模样如下
// 表名:记录1主键值,记录2主键值,记录3主键值
test:1,2
// 对于联合主键的情况,则key的模样如下
// 表名:记录1主键1值_记录1主键2值
test:1_a,2_b
  1. 如何判断全局锁是否获取成功 ?
public class ConnectionProxy extends AbstractConnectionProxy {public void checkLock(String lockKeys) throws SQLException {if (StringUtils.isBlank(lockKeys)) {return;}try {// 全局锁等资源由默认的资源管理器管理boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);// 获取全局锁失败,抛出锁冲突异常if (!lockable) {throw new LockConflictException(String.format("get lock failed, lockKey: %s",lockKeys));}} catch (TransactionException e) {// 识别是否为锁冲突异常,如果是的话,抛出锁冲突异常recognizeLockKeyConflictException(e, lockKeys);}}...
}    

获取全局锁是否成功完全由资源管理器说的算,所以下面我们来看看资源管理器是如何判断全局锁是否获取成功的:

public class DataSourceManager extends AbstractResourceManager {@Overridepublic boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException {// 1. 构建全局锁获取亲戚GlobalLockQueryRequest request = new GlobalLockQueryRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);try {GlobalLockQueryResponse response;// 2. 借助RmNettyRemotingClient发送同步请求if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);} else {throw new RuntimeException("unknow situation!");}// 3. 判断请求是否成功if (response.getResultCode() == ResultCode.Failed) {throw new TransactionException(response.getTransactionExceptionCode(),"Response[" + response.getMsg() + "]");}// 4. 判断全局锁是否获取成功return response.isLockable();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);}}...
}

全局锁资源具体是如何管理的,由server端负责承接逻辑实现,这一点在本系列文章后面会进行详细分析,这里暂时不展开。

最后还有一点,LockRetryController主要负责两件事:

  1. 维护重试次数和重试间隔
    public LockRetryController() {this.lockRetryInterval = getLockRetryInterval();this.lockRetryTimes = getLockRetryTimes();}
  1. 负责休眠等待逻辑实现
    public void sleep(Exception e) throws LockWaitTimeoutException {// prioritize the rollback of other transactionsif (--lockRetryTimes < 0 || (e instanceof LockConflictException&& ((LockConflictException)e).getCode() == TransactionExceptionCode.LockKeyConflictFailFast)) {throw new LockWaitTimeoutException("Global lock wait timeout", e);}try {Thread.sleep(lockRetryInterval);} catch (InterruptedException ignore) {}}

本地事务提交

关于本地事务提交这一点,由于内容较多,我想单独开一节进行讲解。

本地事务提交有两种方式,一种是在设置autoCommit属性为false的前提下,由开发者手动提交;另一种就是在设置autoCommit属性为true的前提下,由框架内部的模版代码先将自动提交关闭,执行完本地SQL语句和附加逻辑后,再最后由框架内部调用commit方法完成提交,同时恢复原先自动提交的属性设置。

要想享受上面框架提供的事务模版服务,我们需要首先把要执行的事务交付于框架托管,比如在事务方法上标注@Transactional注解,表示当前事务执行交由Spring事务模块托管;下面我们复习一下Spring为我们提供的事务执行的整套模版流程:

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {...// 1. 准备平台事务管理器  PlatformTransactionManager ptm = asPlatformTransactionManager(tm);...// 2. 根据事务传播行为,决定是否开启新的本地事务TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);// 3. 执行本地事务Object retVal;try {retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// 4. 出现异常则回滚completeTransactionAfterThrowing(txInfo, ex);throw ex;}...// 5. 执行正常则提交commitTransactionAfterReturning(txInfo);return retVal;}...}
}    				
  1. 开启新的本地事务过程中,会调用PlatformTransactionManager的doBegin方法真正开启一次新的事务连接
	@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;...con = txObject.getConnectionHolder().getConnection();// 核心: 如果连接开启了自动提交,则关闭自动提交,并设置标记if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);con.setAutoCommit(false);}...}
  1. 无论我们是设置了手动提交,还是默认的自动提交,只要事务交由了Spring托管,那么Spring便会在目标方法正常执行完毕后,进行commit
	@Overrideprotected void doCommit(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();...con.commit();...}
  1. 在事务提交完毕后的资源清理环节,会将先前更改的自动提交恢复过来
	@Overrideprotected void doCleanupAfterCompletion(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;...// 重新将连接设置回自动提交Connection con = txObject.getConnectionHolder().getConnection();if (txObject.isMustRestoreAutoCommit()) {con.setAutoCommit(true);}...}

Spring声明式事务实现的特点在于,如果当前连接设置的是自动提交,则更改为手动提交,同时目标方法执行完毕后,统一由Spring框架内部帮我们调用commit方法完成本地事务提交。我们下面会看到Seata的实现有所不同。

Seata在遇到设置为自动提交模式的连接处理上,和Spring处理思路一致,但是当连接处于手动提交模式时,Seata不会在目标方法执行完毕后,帮助我们统一调用commit进行提交,而是需要开发者自行调用commit方法进行提交。

对于SelectForUpdateExecutor来说,其execute方法可以简化为如下过程:

    @Overridepublic T doExecute(Object... args) throws Throwable {... // 1. 如果当前连接开启了自动提交,则先关闭自动提交boolean originalAutoCommit = conn.getAutoCommit();if (originalAutoCommit) {conn.setAutoCommit(false);}// 2. 执行业务SQL...// 3. 如果先前为自动提交模式,则在此处提交本地事务,同时恢复自动提交if (originalAutoCommit) {conn.setAutoCommit(true);}return rs;}

Seata只会在自动提交模式下,才会由框架内部在业务SQL执行完毕后,帮助我们调用commit方法完成本地事务提交,这一点需要注意。

下面看看本地事务提交的具体过程:

public class ConnectionProxy extends AbstractConnectionProxy {@Overridepublic void setAutoCommit(boolean autoCommit) throws SQLException {// 1. 在开启了全局事务场景下,并且原先是自动提交模式,则在此处由框架内部手动帮助我们提交事务if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {doCommit();}// 2. 恢复自动提交设置targetConnection.setAutoCommit(autoCommit);}...
}    

真正的本地事务提交逻辑在doCommit方法中,我们下面来看看其具体实现:

    private void doCommit() throws SQLException {// 1. 如果处于全局事务模式下,则走seata拦截逻辑if (context.inGlobalTransaction()) {processGlobalTransactionCommit();} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();} else {// 2. 否则直接走正常事务提交逻辑  targetConnection.commit();}}

下面看看对标注了@GlobalTransactional注解的目标方法的事务提交逻辑实现:

    private void processGlobalTransactionCommit() throws SQLException {try {// 1. 当前分支事务执行本地提交前,先执行分支事务注册(分支注册过程同时包含获取全局锁逻辑)register();} catch (TransactionException e) {// 识别是否为全局锁获取失败抛出的异常recognizeLockKeyConflictException(e, context.buildLockKeys());}try {// 2. 刷新UNDO日志到undo_log表中UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 3. 执行本地事务提前(这里包括原有的业务逻辑和undo_log日志的落盘逻辑)targetConnection.commit();} catch (Throwable ex) {// 4. 上报分支事务执行失败report(false);throw new SQLException(ex);}// 5. 上报分支事务执行成功if (IS_REPORT_SUCCESS_ENABLE) {report(true);}// 6. 清空当前连接上下文context.reset();}private void register() throws TransactionException {if (!context.hasUndoLog() || !context.hasLockKey()) {return;}// 1. 分支事务提交前,到TC中进行注册Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),null, context.getXid(), context.getApplicationData(),// 传入需要获取的全局锁key,不难猜出,分支事务注册的同时,还包括获取全局锁的逻辑context.buildLockKeys());// 2. 保存分支事务IDcontext.setBranchId(branchId);}

如果将连接设置为手动提交模式,则需要开发者手动调用ConnectionProxy的commit方法完成分支事务提交:

    @Overridepublic void commit() throws SQLException {try {// 分支事务一阶段提交包括注册和获取全局锁两个过程,如果全局锁获取失败// 此处由lockRetryPolicy提供的模版方法,完成重试抢锁lockRetryPolicy.execute(() -> {// 同样是调用doCommit方法完成分支事务一阶段提交doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {rollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

SelectForUpdateExecutor 自动提交模式下无需在setAutoCommit方法中处理抢锁失败逻辑,是因为这段逻辑已经存在于了execute方法中,有遗忘的可以回看。而doCommit方法不只SelectForUpdateExecutor 会调用,所以内部需要处理获取全局锁失败逻辑。


本地事务回滚

本地事务回滚会调用ConnectionProxy的rollback方法,回滚逻辑实现比较简单,关键在于会向TC报告自己本地执行失败的状态:

    @Overridepublic void rollback() throws SQLException {// 1. 执行正常的回滚操作targetConnection.rollback();// 2. 向TC报告当前分支事务的状态if (context.inGlobalTransaction() && context.isBranchRegistered()) {report(false);}// 3. 清空连接上下文context.reset();}

大家可以思考一下,当TC收到某个分支事务执行失败的状态后,它又是如何通知其他分支事务完成回滚的呢?这部分内容将在本系列后面揭晓。


更新执行器

UpdateExecutor 的逻辑是其次简单的,我们来看看其实现逻辑:

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {@Overridepublic T execute(Object... args) throws Throwable {// 1. 获取全局事务ID,并绑定到当前连接上下文中String xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}// 2. 将是否需要全局锁这一标识设置到连接上下文中 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());// 3. 真正执行查询的方法return doExecute(args);}...
}
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {@Overridepublic T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();// 如果当前连接本身已经开启了自动提交,则在事务执行前,关闭自动提交,执行结束后,再开启自动提交if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}...
}    

下面我们来看看处理更全面的一种情况,也就是连接本身开启了自动提交的前提下,是如何进行处理的:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {// 1. 关闭自动提交设置connectionProxy.changeAutoCommit();// 2. 分支事务一阶段提交逻辑包含获取全局锁的逻辑,所以需要处理抢锁失败的重试逻辑return new LockRetryPolicy(connectionProxy).execute(() -> {// 执行SQL解析等拦截逻辑,然后执行最终的目标SQL语句T result = executeAutoCommitFalse(args);// 提交事务connectionProxy.commit();return result;});} catch (Exception e) {if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {// 恢复自动提交设置connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}... 
}
    protected T executeAutoCommitFalse(Object[] args) throws Exception {try {// 1. 获取前置镜像TableRecords beforeImage = beforeImage();// 2. 执行正常的SQL语句T result = statementCallback.execute(statementProxy.getTargetStatement(), args);// 3. 获取后置镜像TableRecords afterImage = afterImage(beforeImage);// 4. 准备undo日志prepareUndoLog(beforeImage, afterImage);return result;} catch (TableMetaException e) {...}}

更新过程中的拦截逻辑核心就三步,下面我们来详细看看每一步的具体实现过程:

  1. 准备前置镜像
    @Overrideprotected TableRecords beforeImage() throws SQLException {ArrayList<List<Object>> paramAppenderList = new ArrayList<>();TableMeta tmeta = getTableMeta();// 1. 构建组装前置镜像对应的SQL语句String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);// 2. 执行该SQL语句,然后获取查询出来的记录return buildTableRecords(tmeta, selectSQL, paramAppenderList);}

这里简单讲讲前置镜像SQL语句组装的规则:

// 目标SQL语句
update test set name = 'WILL' where age = 18
// 构建得到的SQL语句
// 规则: select  + 主键,update涉及列 + from 表 + 目标SQL语句的where子句 
SELECT id, name FROM test WHERE age = 18 FOR UPDATE
  1. 准备后置镜像
    @Overrideprotected TableRecords afterImage(TableRecords beforeImage) throws SQLException {TableMeta tmeta = getTableMeta();if (beforeImage == null || beforeImage.size() == 0) {return TableRecords.empty(getTableMeta());}// 1. 构建后置镜像查询SQLString selectSQL = buildAfterImageSQL(tmeta, beforeImage);ResultSet rs = null;// 2. 执行后置镜像查询SQL,返回查询结果try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);rs = pst.executeQuery();return TableRecords.buildRecords(tmeta, rs);} finally {IOUtil.close(rs);}}

后置镜像SQL语句组装是依赖于前置镜像SQL的,而非目标SQL语句,:

// 目标SQL语句
update test set name = 'WILL' where age = 18
// 前置镜像SQL语句
SELECT id, name FROM test WHERE age = 18 FOR UPDATE
// 构建得到的SQL语句
// 规则: select  + 主键,update涉及列 + from 表 + where 主键 in (前置镜像查询出来的记录列表的主键列聚合得到的主键列表)
SELECT id, name FROM test WHERE (id) in ( (?),(?) )
  1. 准备undo_log日志
    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {// 0. 健壮性检查if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {return;}if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {if (beforeImage.getRows().size() != afterImage.getRows().size()) {throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");}}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();// 1. 提交本地事务前,需要获取对应的全局锁,如果此处执行的时删除语句,则以前置镜像作为锁记录,否则以后置镜像作为锁记录(删除操作,无需记录后置镜像)TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;// (此处key的组装规则和上文说的一致)String lockKeys = buildLockKey(lockKeyRecords);if (null != lockKeys) {// 2. 向当前连接上下文的lockKeysBuffer中追加需要获取的全局锁keyconnectionProxy.appendLockKey(lockKeys);// 3. 构建undo日志SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);// 4. 像当前连接上下文的sqlUndoItemsBuffer中追加构建好的undo日志connectionProxy.appendUndoLog(sqlUndoLog);}}

构建undo日志的具体过程如下:

    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {SQLType sqlType = sqlRecognizer.getSQLType();String tableName = sqlRecognizer.getTableName();SQLUndoLog sqlUndoLog = new SQLUndoLog();sqlUndoLog.setSqlType(sqlType);sqlUndoLog.setTableName(tableName);sqlUndoLog.setBeforeImage(beforeImage);sqlUndoLog.setAfterImage(afterImage);return sqlUndoLog;}

lockKeysBuffer 和 sqlUndoItemsBuffer 会在当前本地事务提交的时候用到。

分支事务提交会调用ConnectionProxy的doCommit方法,这一点上面已经说过了,而doCommit方法会在注册分支事务的同时,传入需要获取的全局锁的key:
在这里插入图片描述
ConnectionContext 的buildLockKeys 方法中会遍历lockKeysBuffer 集合,对所有需要获取的全局锁key进行拼接:

    public String buildLockKeys() {if (lockKeysBuffer.isEmpty()) {return null;}// 1. 获取所有全局锁keySet<String> lockKeysBufferSet = new HashSet<>();for (Set<String> lockKeys : lockKeysBuffer.values()) {lockKeysBufferSet.addAll(lockKeys);}if (lockKeysBufferSet.isEmpty()) {return null;}// 2. 用;拼接在一起,然后返回StringBuilder appender = new StringBuilder();Iterator<String> iterable = lockKeysBufferSet.iterator();while (iterable.hasNext()) {appender.append(iterable.next());if (iterable.hasNext()) {appender.append(";");}}return appender.toString();}

执行完分支事务注册和全局锁获取后,下一步就是向undo_log表中写入undo日志,然后提交本地事务了; 这里会调用UndoLogManager的flushUndoLogs写入undo日志,下面我们一起来看看:

    @Overridepublic void flushUndoLogs(ConnectionProxy cp) throws SQLException {ConnectionContext connectionContext = cp.getContext();if (!connectionContext.hasUndoLog()) {return;}// 1. 准备分支事务的undo日志String xid = connectionContext.getXid();long branchId = connectionContext.getBranchId();BranchUndoLog branchUndoLog = new BranchUndoLog();branchUndoLog.setXid(xid);branchUndoLog.setBranchId(branchId);branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());// 2. 调用undo日志解析器对undo日志进行编码UndoLogParser parser = UndoLogParserFactory.getInstance();byte[] undoLogContent = parser.encode(branchUndoLog);// 3. 尝试对undo日志进行压缩CompressorType compressorType = CompressorType.NONE;if (needCompress(undoLogContent)) {compressorType = ROLLBACK_INFO_COMPRESS_TYPE;undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);}// 4. 将undo日志插入undo_log表中,如果使用的是Mysql,这里会调用MySQLUndoLogManager的insertUndoLogWithNormal实现insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());}

删除执行器

本节来看看DeleteExecutor的执行流程,由于DeleteExecutor和UpdateExecutor都继承了AbstractDMLBaseExecutor,所以二者主要区别主要集中在前置和后置镜像构建的逻辑上,下面我们一起来看一下:

  1. 构建前置镜像
public class DeleteExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {@Overrideprotected TableRecords beforeImage() throws SQLException {SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;TableMeta tmeta = getTableMeta(visitor.getTableName());ArrayList<List<Object>> paramAppenderList = new ArrayList<>();// 构建前置镜像对应的SQL,然后执行该SQL,返回查询得到的结果String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);return buildTableRecords(tmeta, selectSQL, paramAppenderList);}... 
}

这里还是老规矩,来看看前置镜像SQL组装的逻辑:

// 目标SQL
delete from t where id = 1
// 前置镜像SQL
// 组装规则: select * from 表 + 目标SQL的where子句
SELECT name, id FROM t WHERE id = 1 FOR UPDATE
  1. delete操作无需后置镜像,所以返回结果集合为空
    @Overrideprotected TableRecords afterImage(TableRecords beforeImage) throws SQLException {return TableRecords.empty(getTableMeta());}

插入执行器

本节我们再来看看MySQLInsertExecutor的执行流程,由于MySQLInsertExecutor同样继承了AbstractDMLBaseExecutor,所以这里我们也只对前置和后置镜像构建过程进行分析:

  1. 构建前置镜像
    @Overrideprotected TableRecords beforeImage() throws SQLException {// insert操作无需前置镜像,所以返回的空集合return TableRecords.empty(getTableMeta());}
  1. 构建后置镜像
    @Overrideprotected TableRecords afterImage(TableRecords beforeImage) throws SQLException {// 1. 获取插入记录的主键值Map<String, List<Object>> pkValues = getPkValues();// 2. 正常情况下,此处返回的后置镜像里面的内容也是空的,因为记录插入之前并不存在TableRecords afterImage = buildTableRecords(pkValues);if (afterImage == null) {throw new SQLException("Failed to build after-image for insert");}return afterImage;}

除了简单的增删改查语句外,还有涉及Join的更新操作,InsertOnUpdate等操作,这部分操作对应的执行器实现大家可自行翻阅源码进行学习。


小节

本文和大家一起探索了Seata AT模式的一阶段实现,下篇文章将和大家一起来看看AT模式二阶段的实现和一阶段中漏掉的全局事务提交和回滚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149400.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

.Net开源迁移框架FluentMigrator的使用。

在实际的开发过程中&#xff0c;经常会遇到数据库结构变动&#xff0c;比如新增表、删除表&#xff1b;已有的表新增字段&#xff0c;删除字段&#xff1b;修改字段属性等等。而且需要开发环境、测试环境和生产环境进行同步。如果使用的是EF&#xff0c;还是挺方便的。而非EF环…

PyTorch入门之【tensor】

目录 tensor的创建tensor的相关信息tensor的运算 tensor的创建 1.手动创建 import torch test1torch.tensor([1,2,3])#一维时为向量 test2torch.tensor([[1,2,3]])#二维时为矩阵 test3torch.tensor([[[1,2,3]]])#三维及以上统称为tensor print(test1) print(test2) print(tes…

【Linux】线程详解完结篇——信号量 + 线程池 + 单例模式 + 读写锁

线程详解第四篇 前言正式开始信号量引例信号量的本质信号量相关的四个核心接口生产消费者模型用环形队列实现生产者消费者模型基于环形队列的生产消费模型的原理代码演示单生产者单消费者多生产者多消费者 计数器的意义 线程池基本概念代码 单例模式STL,智能指针和线程安全STL中…

文心一言 VS 讯飞星火 VS chatgpt (106)-- 算法导论10.1 4题

四、用go语言&#xff0c;重写 ENQUEUE 和 DEQUEUE 的代码&#xff0c;使之能处理队列的下溢和上溢。 文心一言&#xff1a; 在 Go 语言中&#xff0c;我们可以使用切片来实现队列。下面是如何实现 ENQUEUE 和 DEQUEUE 操作&#xff0c;同时处理队列的下溢和上溢的情况&#…

git提交代码的流程

1.拉取代码 当你进入了一家公司就需要拉去公司的代码进行开发,此时你的项目小组长会给你个地址拉代码, git clone 公司项目的地址 此时如果不使用了这个方式拉去代码,拉去的是master分支上的代码,但是很多数的情况下&#xff0c;公司的项目可能会在其它的分支上,因此到公…

经典算法-----汉诺塔问题

前言 今天我们学习一个老经典的问题-----汉诺塔问题&#xff0c;可能在学习编程之前我们就听说过这个问题&#xff0c;那这里我们如何去通过编程的方式去解决这么一个问题呢&#xff1f;下面接着看。 汉诺塔问题 问题描述 这里是引用汉诺塔问题源自印度一个古老的传说&#x…

Python3数据科学包系列(一):数据分析实战

Python3中类的高级语法及实战 Python3(基础|高级)语法实战(|多线程|多进程|线程池|进程池技术)|多线程安全问题解决方案 Python3数据科学包系列(一):数据分析实战 Python3数据科学包系列(二):数据分析实战 认识下数据科学中数据处理基础包: (1)NumPy 俗话说: 要学会跑需先…

<C++>类和对象-下

目录 一、构造函数的初始化 1. 构造函数体赋值 2. 初始化列表 2.1 概念 2.2 隐式类型转换式构造 2.3 explicit关键字 二、static静态成员 1. 概念 2. 特性 三、友元 1. 友元函数 2.友元类 四、内部类 1. 概念 五、匿名对象 1. const引用匿名对象 2. 匿名对象的隐式类型转换 总…

postgresql实现单主单从

实现步骤 1.主库创建一个有复制权限的用户 CREATE ROLE 用户名login # 有登录权限的角色即是用户replication #复制权限 encrypted password 密码;2.主库配置开放从库外部访问权限 修改 pg_hba.conf 文件 &#xff08;相当于开放防火墙&#xff09; # 类型 数据库 …

Swing程序设计(5)绝对布局,流布局

文章目录 前言一、布局管理器二、介绍 1.绝对布局2.流布局总结 前言 Swing窗体中&#xff0c;每一个组件都有大小和具体的位置。而在容器中摆放各种组件时&#xff0c;很难判断其组件的具体位置和大小。即一个完整的界面中&#xff0c;往往有多个组件&#xff0c;那么如何将这…

Unity如何实现TreeView

前言 最近有一个需求,需要实现一个TreeView的试图显示,开始我一直觉得这么通用的结构,肯定有现成的UI组件或者插件可以使用,结果,找了好久,都没有找到合适的插件,有两个效果差强人意。 最后在回家的路上突然灵光一闪,想到了一种简单的实现方式,什么插件都不用,仅使用…

基于虚拟同步发电机控制的双机并联Simulink仿真模型

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

1024 科学计数法

一.问题&#xff1a; 科学计数法是科学家用来表示很大或很小的数字的一种方便的方法&#xff0c;其满足正则表达式 [-][1-9].[0-9]E[-][0-9]&#xff0c;即数字的整数部分只有 1 位&#xff0c;小数部分至少有 1 位&#xff0c;该数字及其指数部分的正负号即使对正数也必定明确…

kafka集群工作机制

一、kafka在zookeeper上的元数据解释 kafka中的broker要选举Controller角色来管理整个kafka集群中的分区和副本状态。一个Topic下多个partition要选举Leader角色和客户端进行交互数据 Zookeeper客户端工具&#xff1a; prettyZoo。 下载地址&#xff1a;https://github.com/vr…

2023年R1快开门式压力容器操作证模拟考试题库及R1快开门式压力容器操作理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年R1快开门式压力容器操作证模拟考试题库及R1快开门式压力容器操作理论考试试题是由安全生产模拟考试一点通提供&#xff0c;R1快开门式压力容器操作证模拟考试题库是根据R1快开门式压力容器操作最新版教材&#…

润滑油泵控制(博途SCL源代码)

有关博途PLC定时器的各种使用方法请参考下面文章链接: 博途PLC IEC定时器编程应用(SCL语言)_博图 定时器-CSDN博客博途PLC定时器支持数据类型TIME 类型 ,写法支持T#2M10S 、T#10S等,时基是MS所以如果设置1M用 DINT数据类型就是60000,大部分HMI上数据类型很多不支持IEC的…

buuctf-[GXYCTF2019]禁止套娃 git泄露,无参数rce

用dirsearch扫一下&#xff0c;看到flag.php 访问一下没啥东西&#xff0c;使用githack python2 GitHack.py http://8996e81f-a75c-4180-b0ad-226d97ba61b2.node4.buuoj.cn/.git/查看index.php <?php include "flag.php"; echo "flag在哪里呢&#xff1f;…

【iptables 实战】9 docker网络原理分析

在开始本章阅读之前&#xff0c;需要提前了解以下的知识 阅读本节需要一些docker的基础知识&#xff0c;最好是在linux上安装好docker环境。提前掌握iptables的基础知识&#xff0c;前文参考【iptables 实战】 一、docker网络模型 docker网络模型如下图所示 说明&#xff1…

【算法|动态规划No.9】leetcodeLCR 091. 粉刷房子

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…