[图文] Seata AT 模式分布式事务源码分析

AT 模式是 Seata 中最主要的分布式事务解决方案,最早来源于阿里中间件团队发布的 TXC 服务,后来成功上云改名 GTS。本文主要分析 Seata 0.6.1 版本下 AT 模式的实现

Posted by ChenJY on June 29, 2019 | Viewed times

推荐阅读 Seata TCC 分布式事务源码分析

什么是 Seata AT 模式

AT 模式是 Seata 主推的分布式事务解决方案,最早来源于阿里中间件团队发布的 TXC 服务,后来成功上云改名 GTSSeata 官方文档中有关于 AT 模式的详细介绍 —— AT Mode,它使得应用代码可以像使用本地事务一样使用分布式事务,完全屏蔽了底层细节,它和笔者之前介绍过的 Seata TCC 模式的区别有以下几点:

  1. 使用上,TCC 依赖于用户自行实现的三个方法成本较大;AT 依赖全局事务注解和代理数据源,其余代码基本不需要改动,对业务无侵入、接入成本极小
  2. TCC 的作用范围在应用层,本质上是实现针对某种业务逻辑的正向和反向方法;AT 模式的作用范围在于底层数据源,通过保存操作行记录的前后快照和生成反向 SQL 语句进行补偿操作,实现难度较大,优点是对上层应用透明
  3. TCCtry 阶段加锁,后续补偿逻辑事务间各自独立;AT 需要借助于全局锁和 GlobalLock 注解来解决不同全局事务间的写冲突问题,如果一阶段分支事务成功则二阶段一开始全局锁即被释放,否则需要夯住直到分支事务二阶段回滚完成才能释放全局锁

Seata AT 的使用方法

我们先了解一下如何在应用里使用 AT 模式,流程非常简单,Seata 也提供了 Seata-Samples 方便大家了解如何使用该项目。

第一步,增加全局事务注解

首先依赖 Seata 的客户端 SDK,然后在整个分布式事务发起方的业务方法上增加 @GlobalTransactional 注解,下面的例子来源于 Seata-Samples dubbo 案例,purchase 是事务发起方的业务方法,通过 RPC 调用了下游库存服务订单服务提供的接口:

@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
    LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
    // RPC 调用库存服务
    storageService.deduct(commodityCode, orderCount);
    // RPC 调用订单服务
    orderService.create(userId, commodityCode, orderCount);
    throw new RuntimeException("xxx");
}

第二步,配置代理数据源

MySQL 为例:

// 配置数据源
<bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        // …… …… 省略数据源配置
</bean>

// 关键步骤,配置 Seata 的代理数据源,代理之前配置的 accountDataSource
<bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
    <constructor-arg ref="accountDataSource" />
</bean>

// 配置 applicationId 和 txServiceGroup,这主要是来标识应用和服务端集群的
<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
    <constructor-arg value="dubbo-demo-account-service"/>
    <constructor-arg value="my_test_tx_group"/>
</bean>

// 省略一些 dubbo 服务注册配置和 jdbcTemplate 配置

第三步,新建 undo_log 表

在事务链涉及的服务的数据库中新建 undo_log 表用来存储 UndoLog 信息,用于二阶段回滚操作,表中包含 xidbranchIdrollback_info 等关键字段信息。

Seata AT 的工作流程

工作流程总览

概括来讲,AT 模式的工作流程分为两阶段。一阶段进行业务 SQL 执行,并通过 SQL 拦截、SQL 改写等过程生成修改数据前后的快照(Image),并作为 UndoLog 和业务修改在同一个本地事务中提交

如果一阶段成功那么二阶段仅仅异步删除刚刚插入的 UndoLog;如果二阶段失败则通过 UndoLog 生成反向 SQL 语句回滚一阶段的数据修改。其中关键的 SQL 解析和拼接工作借助了 Druid Parser 中的代码,这部分本文并不涉及,感兴趣的小伙伴可以去翻看源码,并不是很复杂

图解 AT 模式一阶段流程

一阶段中分支事务的具体工作有:

  1. 根据需要执行的 SQLUPDATEINSERTDELETE)类型生成相应的 SqlRecognizer
  2. 进而生成相应的 SqlExecutor
  3. 接着便进入核心逻辑查询数据的前后快照,例如图中标红的部分,拿到修改数据行的前后快照之后,将二者整合生成 UndoLog,并尝试将其和业务修改在同一事务中提交。

整个流程的流程图如下:

值得注意的是,本地事务提交前必须先向服务端注册分支,分支注册信息中包含由表名和行主键组成的全局锁,如果分支注册过程中发现全局锁正在被其他全局事务锁定则抛出全局锁冲突异常,客户端需要循环等待,直到其他全局事务释放锁之后该本地事务才能提交。Seata 以这样的机制保证全局事务间的写隔离。

图解二阶段 Commit 流程

对服务端来说,等到一阶段完成未抛异常,全局事务的发起方会向服务端申请提交这个全局事务,服务端根据 xid 查询出该全局事务后加锁并关闭这个全局事务,目的是防止该事务后续还有分支继续注册上来,同时将其状态从 Begin 修改为 Committing

紧接着,判断该全局事务下的分支类型是否均为 AT 类型,若是则服务端会进行异步提交,因为 AT 模式下一阶段完成数据已经落地。服务端仅仅修改全局事务状态为 AsyncCommitting,然后会有一个定时线程池去存储介质(File 或者 Database)中查询出待提交的全局事务日志进行提交,如果全局事务提交成功则会释放全局锁并删除事务日志。整个流程如下图所示:

对客户端来说,先是接收到服务端发送的 branch commit 请求,然后客户端会根据 resourceId 找到相应的 ResourceManager,接着将分支提交请求封装成 Phase2Context 插入内存队列 ASYNC_COMMIT_BUFFER,客户端会有一个定时线程池去查询该队列进行 UndoLog 的异步删除。

一旦客户端提交失败或者 RPC 超时,则服务端会将该全局事务状态置位 CommitRetrying,之后会由另一个定时线程池去一直重试这些事务直至成功。整个流程如下图所示:

图解二阶段 Rollback 流程

回滚相对复杂一些,如果发起方一阶段抛异常会向服务端请求回滚该全局事务,服务端会根据 xid 查询出这个全局事务,加锁关闭事务使得后续不会再有分支注册上来,并同时更改其状态 BeginRollbacking,接着进行同步回滚以保证数据一致性。除了同步回滚这个点外,其他流程同提交时相似,如果同步回滚成功则释放全局锁并删除事务日志,如果失败则会进行异步重试。整个流程如下图所示:

客户端接收到服务端的 branch rollback 请求,先根据 resourceId 拿到对应的数据源代理,然后根据 xidbranchId 查询出 UndoLog 记录,反序列化其中的 rollback 字段拿到数据的前后快照,我们称该全局事务为 A

根据具体 SQL 类型生成对应的 UndoExecutor,校验一下数据 UndoLog 中的前后快照是否一致或者前置快照和当前数据(这里需要 SELECT 一次)是否一致,如果一致说明不需要做回滚操作,如果不一致则生成反向 SQL 进行补偿,在提交本地事务前会检测获取数据库本地锁是否成功,如果失败则说明存在其他全局事务(假设称之为 B)的一阶段正在修改相同的行,但是由于这些行的主键在服务端已经被当前正在执行二阶段回滚的全局事务 A 锁定,因此事务 B 的一阶段在本地提交前尝试获取全局锁一定是失败的,等到获取全局锁超时后全局事务 B 会释放本地锁,这样全局事务 A 就可以继续进行本地事务的提交,成功之后删除本地 UndoLog 记录。整个流程如下图所示:

本节小结

我们通过流程图分析了一下 Seata AT 模式两阶段的工作流程,这里提一句,官方文档针对 AT 模式的工作流程提供了一个非常易懂的例子 —— AT 模式工作机制

笔者强烈建议感兴趣的同学阅读过后,再看下文的源码分析

Seata AT 模式源码模块拆解

通过上面的文字和图解,相信大家已经了解了 Seata AT 模式的基本工作原理,那么本节开始我们正式进入相关源码的分析阶段。第一步,由于 Seata 模块不算少,我们先对整个 Seata 项目的模块进行拆解,挑出其中需要重点关注的模块,忽略那些次要的。

下文的源码分析均基于 Seata v0.6.1 版本

相比于之前笔者对 Seata TCC 实现的分析,AT 模式的源码就要复杂很多了,基本上大多数模块均有涉及,因此在阅读源码之前,我们先对模块的优先级进行筛选,包括下文会叙述哪些模块和忽略哪些模块

首先,seata-tccAT 的功能无关可以不用看;seata-commonseata-coreseata-configseata-discovery 这些只看名字也能知道大致的功能,后续阅读代码期间经常会看到其中的类,因此都可以暂时忽略seata-tmseata-rm 这两者都是封装的与 seata-server 进行通信的方法和步骤,这部分笔者已经在上一篇关于 TCC 的文章中叙述过了,不再赘述seata-spring 主要是注解、切面织入、方法拦截等功能的实现,关键点包括全局事务的开启,但是由于 ATTCC 在全局事务开启部分的逻辑是一致的,因此本文也不再赘述

一通排查下来,和 AT 核心功能有关的模块仅剩下 seata-rm-datasourceseata-server,仔细一想这也很合理,因为 Seata 中分支事务才是真正执行数据修改和补偿的部分,因此对于 TCC 模式来说,TwoPhaseBusinessAction 注解的实现类是分支事务,对 AT 模式来说,代理数据源正是分支事务,因此核心逻辑必然在 seata-rm-datasource 模块中,而 TC 集群是协调整个全局事务的指挥者,自然 seata-server 模块也是我们需要特别关注的,但是由于服务端逻辑和 TCC 部分高度相似,除了 v0.6.1 中新增了 DB 模式作为日志存储介质外,因此下文先选取客户端 AT 模式相关源码进行深入分析,最后简要分析下与 AT 模式相关的服务端源码

Seata AT 模式客户端部分

数据源代理部分 —— 三类 Proxy

下图来源于 Seata 官方文档:

Seata 中主要针对 java.sql 包下的 DataSourceConnectionStatementPreparedStatement 四个接口进行了再包装,包装类分别为 DataSourceProxyConnectionProxyStatementProxyPreparedStatementProxy,很好一一对印,其功能是在 SQL 语句执行前后、事务 commit 或者 rollbakc 前后进行一些与 Seata 分布式事务相关的操作,例如分支注册、状态回报、全局锁查询、快照存储、反向 SQL 生成等。

ExecuteTemplate.execute

AT 模式下,真正分支事务开始是在 StatementProxyPreparedStatementProxyexecuteexecuteQueryexecuteUpdate具体执行方法中,这些方法均实现自 StatementPreparedStatement 的标准接口,而方法体内调用了 ExecuteTemplate.execute方法拦截,下面我们来看看这个方法的实现:

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                    StatementProxy<S> statementProxy,
                                                    StatementCallback<T, S> statementCallback,
                                                    Object... args) throws SQLException {
    
    // 如果不是处于全局事务中,即上游没有 xid 传递下来
    // 或者没有 GlobalLock 修饰,该数据操作不需要纳入 Seata 框架下进行管理
    // 则直接执行这个 SQL                                                        
    if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizer == null) {
        sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor = null;
    if (sqlRecognizer == null) {
        executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
    } else {

        // 通过 SQL 的类型,生成不同的执行器
        switch (sqlRecognizer.getSQLType()) {
            case INSERT:
                executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case UPDATE:
                executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case DELETE:
                executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case SELECT_FOR_UPDATE:
                executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            default:
                executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                break;
        }
    }
    T rs = null;
    try {

        // 调用执行器的 execute 方法,显然这是一个抽象方法,最后会调到三个具体的执行器实现类之一
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException)ex;
    }
    return rs;
}

下面我们看看这个 executor.execute 方法的实现。

执行器接口 execute 的实现

execute 方法的实现位于 BaseTransactionalExecutor 类中:

@Override
public Object execute(Object... args) throws Throwable {
    
    // 如果处于全局事务中,绑定 xid
    if (RootContext.inGlobalTransaction()) {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }

    // 如果被 GlobalLock 修饰,该操作需要设置全局锁标识
    if (RootContext.requireGlobalLock()) {
        statementProxy.getConnectionProxy().setGlobalLockRequire(true);
    } else {
        statementProxy.getConnectionProxy().setGlobalLockRequire(false);
    }

    // 调用抽象方法 doExecute
    return doExecute(args);
}

BaseTransactionalExecutor 类中 execute 方法主要做了一些与全局事务相关的状态值的设定,继续追踪进入 doExecute 方法的实现。

抽象方法 doExecute 的实现

终于进入正题,doExecute 方法位于 AbstractDMLBaseExecutor 类中,该类继承自上文中的 BaseTransactionalExecutor

doExecute 方法体内先拿到具体的连接代理对象 connectionProxy,然后根据 Commit 标识进行不同方法的调用,但翻看代码实现时发现,其实 executeCommitTrue 方法就是先把 Commit 标识改成 false 然后再调用 executeCommitFalse 方法。

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getCommit()) {
        return executeCommitTrue(args);
    } else {
        return executeCommitFalse(args);
    }
}

executeCommitTrue 方法体中有一个无限循环,这么做的意义是,一旦分支注册时抛出锁冲突异常,则需要一直等待直到别的全局事务释放该全局锁之后才能提交自己的修改,否则一直阻塞等待

protected T executeCommitTrue(Object[] args) throws Throwable {
    T result = null;
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    LockRetryController lockRetryController = new LockRetryController();
    try {
        
        // 先将 Commit 标识改成 false,只允许手动提交
        connectionProxy.setCommit(false);
        
        // 进入一个无限循环
        while (true) {
            try {
                
                // 调用 executeCommitFalse 方法
                result = executeCommitFalse(args);
                
                // 如果分支成功,则 commit,提交本地事务,该方法也是代理方法,下文会叙述
                connectionProxy.commit();
                break;
            } catch (LockConflictException lockConflict) {
                
                // 如果全局锁冲突,可能是已经有别的事务拿到了要修改行的全局锁,则回滚
                connectionProxy.getTargetConnection().rollback();

                // 然后 sleep 一段时间,不要立即重试
                lockRetryController.sleep(lockConflict);
            }
        }

    } catch (Exception e) {

        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("exception occur", e);
        throw e;
    } finally {
        connectionProxy.setCommit(true);
    }
    return result;
}

下面我们仔细看一下 executeCommitFalse 方法的逻辑,它是实现 AT 模式的关键步骤。其中,beforeImage 是一个抽象方法,针对 INSERTUPDATEDELETE 有不同的实现,因为需要将这三种不同的 SQL 解析为相应的 SELECT 语句,查询操作前数据的快照;同样的 afterImage 也是一个抽象方法,来查询操作后数据的快照;statementCallback.execute 语句真正执行 SQLprepareUndoLog 整合 beforeImageafterImage 生成 UndoLog 对象。

protected T executeCommitFalse(Object[] args) throws Throwable {

    // beforeImage 是一个抽象方法,针对 INSERT、UPDATE、DELETE 有不同的实现
    TableRecords beforeImage = beforeImage();
    
    // 真正执行 SQL
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    
    // 原理同 beforeImage
    TableRecords afterImage = afterImage(beforeImage);
    
    // 整合 beforeImage 和 afterImage 生成 UndoLog
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

executeCommitFalse 执行过后,会调用 connectionProxy.commit() 做事务提交,我们看看该代理方法的实现。

ConnectionProxy 复写的 commit 方法

commit 方法实现自 Connection 接口的 commit 方法:

@Override
public void commit() throws SQLException {
    
    // 针对分支事务处理
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } 

    // 针对 GlobalLock 的处理
    else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

执行一阶段本地事务提交

如果是分支事务,调用 processGlobalTransactionCommit 方法进行提交

private void processGlobalTransactionCommit() throws SQLException {
    try {

        // 调用 RM 注册分支事务,包括行记录的主键作为全局锁
        register();
    } catch (TransactionException e) {

        // 如果报锁冲突异常,则 executeCommitTrue 会循环等待
        recognizeLockKeyConflictException(e);
    }

    try {

        // 分支注册成功不抛异常,则将 UndoLog 插入数据库
        if (context.hasUndoLog()) {
            UndoLogManager.flushUndoLogs(this);
        }

        // 将业务修改和 UndoLog 一并提交
        targetConnection.commit();
    } catch (Throwable ex) {

        // 汇报分支状态为一阶段失败,默认失败会重试五次
        report(false);
        if (ex instanceof SQLException) {
            throw new SQLException(ex);
        }
    }

    // 汇报分支状态为一阶段成功 
    report(true);
    context.reset();
}

GlobalLock 的具体作用

如果是用 GlobalLock 修饰的业务方法,虽然该方法并非某个全局事务下的分支事务,但是它对数据资源的操作也需要先查询全局锁,如果存在其他 Seata 全局事务正在修改,则该方法也需等待。所以,如果想要 Seata 全局事务执行期间,数据库不会被其他事务修改,则该方法需要强制添加 GlobalLock 注解,来将其纳入 Seata 分布式事务的管理范围。

功能有点类似于 Spring@Transactional 注解,如果你希望开启事务,那么必须添加该注解,如果你没有添加那么事务功能自然不生效,业务可能出 BUGSeata 也一样,如果你希望某个不在全局事务下的 SQL 操作不影响 AT 分布式事务,那么必须添加 GlobalLock 注解。

private void processLocalCommitWithGlobalLocks() throws SQLException {
    
    // 查询这些主键是不是被其他全局事务锁住,如果有就抛出锁冲突异常
    checkLock(context.buildLockKeys());
    try {

        // 否则提交事务,因为该方法的修改并不影响已存在的 Seata 分布式事务
        targetConnection.commit();
    } catch (Throwable ex) {
        throw new SQLException(ex);
    }
    context.reset();
}

二阶段异步删除分支 UndoLog

如果一阶段成功,则 TC 会通知客户端 RM 进行第二阶段的提交工作,这部分代码最终实现位于 AsyncWorker 类中的 branchCommit 方法。

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                    String applicationData) throws TransactionException {
    
    // 将分支提交信息包装成 Phase2Context 插入内存中的异步提交队列
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
            + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

插入 ASYNC_COMMIT_BUFFER 之后,AsyncWorker 类中会有一个定时任务,从队列中取出分支提交信息 Phase2Context,将其中的 xidbranchId 提取出来生成 DELETE SQL 语句,删除本地数据库中存储的相应的 UndoLog。下面是该定时任务的关键方法 doBranchCommits 的实现:

private void doBranchCommits() {
    
    // 如果异步提交队列是空,return
    if (ASYNC_COMMIT_BUFFER.size() == 0) {
        return;
    }
    
    // 该 map 报错 resourceId 和 commitContext 列表的对应关系
    Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
    while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
        
        // 取出分支提交信息 commitContext
        Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
        List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
        if (contextsGroupedByResourceId == null) {
            contextsGroupedByResourceId = new ArrayList<>();
            
            // 将其放入对应 RM 的提交队列中
            mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
        }
        contextsGroupedByResourceId.add(commitContext);
    }

    for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
        Connection conn = null;
        try {
            try {

                // …… …… 省略,这里拿到原始 connection,因为只需做 delete 操作,没有涉及代理数据源的部分
                conn = dataSourceProxy.getPlainConnection();
            } catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                continue;
            }

            // …… …… 省略,批量删除 UndoLog
            
            try {
                UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
            } catch (Exception ex) {
                LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
            }
        } 
        // …… …… 省略
    }
}

二阶段生成反向 SQL 回滚

如果一阶段失败,则二阶段需要回滚一阶段的数据库更新操作,此时涉及到根据 UndoLog 构造逆向 SQL 进行补偿。这部分逻辑的入口位于 DataSourceManager 类中的 branchRollback 方法:

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {

    // 根据 resourceId 拿到代理数据源
    DataSourceProxy dataSourceProxy = get(resourceId);
    try {

        // 调用 UndoLogManager 的 undo 方法进行补偿,核心逻辑在这里
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } 
    // …… …… 省略
}

UndoLogManager 负责 UndoLog 的插入、删除、补偿等操作,其中核心方法即为 undo,我们可以看到其中有一个无限 for 循环,一旦当前事务进行二阶段回滚时获取本地锁失败,则进入循环等待逻辑,等待本地锁被释放之后自己再提交本地事务:

public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    
    // …… …… 省略
    
    for (; ; ) {
        try {
            // …… …… 省略

            // 构造查询 UndoLog 表的 SELECT 语句,条件为 xid 和 branchId
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            // 进行查询
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;
                
                // 仅允许 normal 状态的 UndoLog 记录进行回顾,主要是防止 
                int state = rs.getInt("log_status");
                if (!canUndo(state)) {
                    return;
                }

                // 取出 rollback_info 
                Blob b = rs.getBlob("rollback_info");
                
                // 转为字节数组
                byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
                
                // JSON 反序列化为对象
                BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
                
                // 循环所有的 UndoLog 记录
                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                    sqlUndoLog.setTableMeta(tableMeta);
                    
                    // 构造反向 SQL 语句,三类更新操作会生成三种不同的 UndoExecutor
                    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                        dataSourceProxy.getDbType(),
                        sqlUndoLog);
                    
                    // executeOn 内部会调用一个抽象方法 buildUndoSQL
                    undoExecutor.executeOn(conn);
                }
            }

            // 如果该 Undo Log 存在则执行回滚后删除记录
            if (exists) {
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                
                // 如果不存在说明一阶段有异常,可以插入一条终止状态的 UndoLog 防止一阶段因为超时等问题,悬挂的事务提交又在补偿方法之后达到,与 TCC 中 Cancel 方法处理 Try 方法悬挂异曲同工
                insertUndoLogWithGlobalFinished(xid, branchId, conn);
                conn.commit();
            }
            return;
        } 
        // 如果抛出 SQLIntegrityConstraintViolationException 则进入循环,等待本地锁被释放
        // …… …… 省略
    }
}

UndoExecutorFactory 类的 getUndoExecutor 方法会根据 UndoLog 中记录的 SQLType 生成不同的 UndoExecutor 返回:

public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
    if (!dbType.equals(JdbcConstants.MYSQL)) {
        throw new NotSupportYetException(dbType);
    }
    switch (sqlUndoLog.getSqlType()) {
        case INSERT:
            return new MySQLUndoInsertExecutor(sqlUndoLog);
        case UPDATE:
            return new MySQLUndoUpdateExecutor(sqlUndoLog);
        case DELETE:
            return new MySQLUndoDeleteExecutor(sqlUndoLog);
        default:
            throw new ShouldNeverHappenException();
    }
}

UndoExecutor 中的 executeOn 方法 首先会调用一个抽象方法 buildUndoSQL,根据 INSERTUPDATEDELETE 三种不同的 SQL 类型生成相应的反向 SQL 语句。

下面我们以 DELETE 为例分析一下 MySQLUndoDeleteExecutor 类的 buildUndoSQL 方法的实现,如果一阶段已经删除了某行数据,那么二阶段补偿自然需要构造一个 INSERT 语句将被删除的行重新插入。

protected String buildUndoSQL() {
    
    // 首先对于 MySQL 中的一些保留字,如果出现在普通 SQL 中需要增加反引号
    KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
    
    // DELETE 的 beforeImage 快照不可能是空,因为肯定有数据才会执行删除
    TableRecords beforeImage = sqlUndoLog.getBeforeImage();
    List<Row> beforeImageRows = beforeImage.getRows();
    if (beforeImageRows == null || beforeImageRows.size() == 0) {
        throw new ShouldNeverHappenException("Invalid UNDO LOG");
    }
    Row row = beforeImageRows.get(0);
    List<Field> fields = new ArrayList<>(row.nonPrimaryKeys());
    Field pkField = row.primaryKeys().get(0);
    
    // 主键放在列表最后一个
    fields.add(pkField);
    String insertColumns = fields.stream()
        .map(field -> keywordChecker.checkAndReplace(field.getName()))
        .collect(Collectors.joining(", "));
    String insertValues = fields.stream().map(field -> "?")
        .collect(Collectors.joining(", "));
    
    // "INSERT INTO %s (%s) VALUES (%s)" 依次放入表名、列名、数值
    return String.format(INSERT_SQL_TEMPLATE, keywordChecker.checkAndReplace(sqlUndoLog.getTableName()),
                            insertColumns, insertValues);
}

Seata AT 模式服务端部分

AT 模式下,全局事务注册、提交、回滚均和 TCC 模式一模一样,均是根据一阶段调用抛不抛异常决定。

区别在于两点:

  1. 分支事务的注册,TCC 模式下分支事务是在进入参与方 Try 方法之前的切面中注册的,而且分支实现完毕不需要再次汇报分支状态;但 AT 模式不一样,分支事务是在代理数据源提交本地事务之前注册的,注册成功才能提交一阶段本地事务,如果注册失败报锁冲突则一直阻塞等待直到该全局锁被释放,且本地提交之后不论是否成功还需要再次向 TC 汇报一次分支状态。

  2. AT 模式由于一阶段已经完成数据修改,因此二阶段可以异步提交,但回滚是同步的,回滚失败才会异步重试;但是 SeataTCC 模式二阶段 Confirm 是同步提交的,可以最大程度保证 TCC 模式的数据一致性,但是笔者认为在要求性能的场景下,TCC 的二阶段也可以改为异步提交

服务端提交全局事务

核心方法是 DefaultCore 类中的 commit 方法:

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    
    // 查询全局事务
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
    // 锁住全局事务并关闭它,不让后续可能的分支再注册上来
    boolean shouldCommit = globalSession.lockAndExcute(() -> {
        
        //the lock should release after branch commit
        globalSession
            .closeAndClean(); 
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            globalSession.changeStatus(GlobalStatus.Committing);
            return true;
        }
        return false;
    });
    if (!shouldCommit) {
        return globalSession.getStatus();
    }
    if (globalSession.canBeCommittedAsync()) {
        
        // 如果是 AT 模式,只改变全局事务状态为 AsyncCommitting 进行异步提交
        asyncCommit(globalSession);
        return GlobalStatus.Committed;
    } else {
        
        // 如果分支里包含 TCC 的分支,进行同步提交
        doGlobalCommit(globalSession, false);
    }
    return globalSession.getStatus();
}

服务端异步提交分支事务

DefaultCoordinator 类中有一个 asyncCommitting 定时线程池,会定时调用 handleAsyncCommitting 方法从存储介质(文件或者数据库)中分批查询出状态为 AsyncCommitting 的全局事务列表,针对每个全局事务调用 doGlobalCommit 方法提交其下所有未提交的分支事务。

asyncCommitting.scheduleAtFixedRate(() -> {
        try {
            handleAsyncCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception async committing ... ", e);
        }
    }, 0, asynCommittingRetryDelay, TimeUnit.SECONDS);

protected void handleAsyncCommitting() {
    
    // 查询待提交的全局事务列表
    Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager()
        .allSessions();
    if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
        return;
    }
    for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
        try {
            asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            
            // 调用 doGlobalCommit 进行提交,通过回调每个分支事务的客户端来触发客户端 RM 完成分支事务提交
            // 如果 doGlobalCommit 成功则释放全局锁并删除事务日志
            core.doGlobalCommit(asyncCommittingSession, true);
        } catch (TransactionException ex) {
            LOGGER.info("Failed to async committing [{}] {} {}",
                asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage());
        }
    }
}

服务端同步回滚分支事务

一旦一阶段失败,全局事务发起方通知 TC 回滚全局事务的话,那么二阶段的回滚调用是同步进行的,一旦同步回滚失败才会进入异步重试阶段。核心方法为 DefaultCore 类中的 doGlobalRollback 方法:

public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            globalSession.removeBranch(branchSession);
            continue;
        }
        try {
            
            // 进行回调,通知客户端 RM 回滚分支事务
            BranchStatus branchStatus = resourceManagerInbound.branchRollback(branchSession.getBranchType(),
                branchSession.getXid(), branchSession.getBranchId(),
                branchSession.getResourceId(), branchSession.getApplicationData());

            // …… …… 省略

        } catch (Exception ex) {
            LOGGER.error("Exception rollbacking branch " + branchSession, ex);
            
            // 如果是第一次补偿,retrying 是 false,因此如果失败则进入异步重试
            if (!retrying) {
                queueToRetryRollback(globalSession);
            }
            throw new TransactionException(ex);
        }
    }
    
    // 回滚成功的话,释放全局锁并删除事务日志
    SessionHelper.endRollbacked(globalSession);
}

回滚的异步重试与异步提交相同,都是一个定时线程池去扫描存储介质中尚未完成回滚的全局事务,因此这里不再赘述

Seata AT 模式的全局锁

全局锁的组成和作用

全局锁主要由表名加操作行的主键两个部分组成,Seata AT 模式使用服务端保存全局锁的方法保证:

  1. 全局事务之前的写隔离
  2. 全局事务与被 GlobalLock 修饰方法间的写隔离性

全局锁的注册

当客户端在进行一阶段本地事务提交前,会先向服务端注册分支事务,此时会将修改行的表名、主键信息封装成全局锁一并发送到服务端进行保存,如果服务端保存时发现已经存在其他全局事务锁定了这些行主键,则抛出全局锁冲突异常,客户端循环等待并重试。

全局锁的查询

GlobalLock 修饰的方法虽然不在某个全局事务下,但是其在提交事务前也会进行全局锁查询,如果发现全局锁正在被其他全局事务持有,则自身也会循环等待。

全局锁的释放

由于二阶段提交是异步进行的,当服务端向客户端发送 branch commit 请求后,客户端仅仅是将分支提交信息插入内存队列即返回,服务端只要判断这个流程没有异常就会释放全局锁。因此,可以说如果一阶段成功则在二阶段一开始就会释放全局锁,不会锁定到二阶段提交流程结束。

但是如果一阶段失败二阶段进行回滚,则由于回滚是同步进行的,全局锁直到二阶段回滚完成才会被释放。

Seata AT 模式潜在优化点

Seata AT 模式的源码读下来,其逻辑也存在可以优化的地方:

  1. 针对简单 SQL 语句,其后置数据快照可以直接在内存中计算生成,而无需再走一次 SELECT
  2. 全局锁可以保存在客户端本地数据库中,这样减少与服务端的 RPC 调用次数

全文总结

本文基于 Seata v0.6.1 通过图文结合的方式分析其 AT 模式的工作原理和源码实现,剖析了其中核心的前后快照生成、全局锁机制、服务端协调机制等逻辑,希望对大家有所启发。


这是一个不定时更新的、披着程序员外衣的文青小号。

在这里,既分享极客技术,也记录人间烟火,欢迎关注。

0

Comment