1、Seata的AT模型回顧

1.1、Seata的AT模型

【分佈式事務】5、分佈式事務Seata源碼學習_分佈式事務

【分佈式事務】5、分佈式事務Seata源碼學習_分佈式事務_02

  • 在服務A中開啓全局事務,向TC申請全局事務id
  • 服務A發起RPC遠程調用,分佈式事務參與方服務B和服務C分別向TC註冊分支事務
  • 參與方提交本地事務,並記錄undo_log表
  • 參與方上報分支事務的執行狀態
  • TC根據各分支事務執行狀態,通知全局事務是要提交還是回滾
  • 分佈式事務各參與方TC指示,如果是要提交全局事務,就直接刪除本地表的undo_log表裏邊對應的記錄即可,如果是需要回滾,則根據undo_log表裏邊的before_image對對應的數據進行回滾

1.2、Seata的AT模型在實際開發中的應用

要在應用中使用Seata的AT模型也非常簡單,只需要簡單幾步:

  • ① 引入seata依賴
  • ② 創建本地undo_log表
  • ③ 在配置文件中進行Seata相關的配置
  • @GlobalTransactional註解開啓分佈式事務

具體操作見前面的博客內容

2、核心步驟源碼分析(AT)

2.1、RM、TM建立與TC的連接

要想開啓分佈式事務,前提是RM、TM持有和TC的連接,這樣才能完成通信。那麼這個連接是在什麼時候建立的呢?首先我們看一下@GlobalTransactional註解的包結構:

【分佈式事務】5、分佈式事務Seata源碼學習_seata_03

可以看到這個註解定義在org.apache.seata.spring.annotation這個包下,與此同時還定義了一個以Scanner結尾的掃描器,點開看一下這個掃描器

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

這個掃描器實現了InitializingBean這個接口,那麼就一定實現了它的afterPropertiesSet方法

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
        return;
    }
    if (initialized.compareAndSet(false, true)) {
        initClient();   //初始化客户端,什麼客户端呢?
    }

    this.findBusinessBeanNamesNeededEnhancement();
}

這裏有一個初始化客户端的方法,那麼初始化的是什麼客户端呢?繼續點進去就可以看到初始化TM和RM的代碼

protected void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
        LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

以初始化RM的代碼為例,繼續

public class RMClient {

    /**
     * Init.
     *
     * @param applicationId           the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) {
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        rmNettyRemotingClient.init();
    }
}

繼續點進init方法來到org.apache.seata.core.rpc.netty.RmNettyRemotingClient#init

@Override
public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();

        // Found one or more resources that were registered before initialization
        if (resourceManager != null
            && !resourceManager.getManagedResources().isEmpty()
            && StringUtils.isNotBlank(transactionServiceGroup)) {
            boolean failFast = ConfigurationFactory.getInstance().getBoolean(
                ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
                DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
            getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
        }
    }
}

先看一下super.init(),進入org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(() -> {
        try {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        } catch (Exception ex) {
            LOGGER.warn("reconnect server failed. {}", ex.getMessage());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

這裏調了一個getTransactionServiceGroup()方法,transactionServiceGroup我們之前就在配置文件中配置過,前面提到會根據這個serviceGroup去找到對應的SeataServer,不妨點clientChannelManager.reconnect() 進去看一下,看是不是根據這個serviceGroup去連接TC,點進去之後經過幾個重載的方法後來到了doReconnect方法:

void doReconnect(String transactionServiceGroup, boolean failFast) {
    List<String> availList;
    try {
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        throwFailFastException(failFast, "Failed to get available servers");
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);

        if (StringUtils.isBlank(clusterName)) {
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                         ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                         transactionServiceGroup);
            throwFailFastException(failFast, "can not get cluster name in registry config.");
            return;
        }

        if (!(registryService instanceof FileRegistryServiceImpl)) {
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        throwFailFastException(failFast, "no available service found in cluster.");
        return;
    }
    try {
        doReconnect(availList, transactionServiceGroup);
    } catch (Exception e) {
        if (failFast) {
            throw e;
        }
        LOGGER.error("connect server failed. {}", e.getMessage(), e);
    }
}

確實是拿到了一個可用的地址列表availList,然後根據這個列表繼續調了doReconnect方法

void doReconnect(List<String> availList, String transactionServiceGroup) {
    Set<String> channelAddress = new HashSet<>(availList.size());
    Map<String, Exception> failedMap = new HashMap<>();
    try {
        for (String serverAddress : availList) {
            try {
                acquireChannel(serverAddress);
                channelAddress.add(serverAddress);
            } catch (Exception e) {
                failedMap.put(serverAddress, e);
            }
        }
        if (failedMap.size() > 0) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                             failedMap.keySet(),
                             failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
            } else if (LOGGER.isDebugEnabled()) {
                failedMap.forEach((key, value) -> {
                    LOGGER.error("{} can not connect to {} cause:{} trace information:",
                                 FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
                });
            }
        }
        if (availList.size() == failedMap.size()) {
            String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", ");
            throw new FrameworkException("can not connect to [" + invalidAddress + "]");
        }
    } finally {
        if (CollectionUtils.isNotEmpty(channelAddress)) {
            List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
            for (String address : channelAddress) {
                String[] array = NetUtil.splitIPPortStr(address);
                aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
            }
            RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
        } else {
            RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
        }
    }
}

可以看到會根據這個列表迭代,然後根據每個serverAddress通過acquireChannel方法獲取了一個Channel(如果沒有現成的連接,則會通過doConnect(serverAddress)方法創建一個channel)

Channel acquireChannel(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null) {
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("will connect to {}", serverAddress);
    }
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
        return doConnect(serverAddress);
    }
}

TM的連接建立流程類似,總的來説大致的流程如下:

【分佈式事務】5、分佈式事務Seata源碼學習_seata_04

2.2、事務處理

前面我們知道,分佈式事務的開啓是靠@GlobalTransactional這個註解開啓的,那麼肯定就有一個地方來識別這個註解並做一些事情。如果是我們自己來做的化,最先想到的肯定是基於aop來做,而從最上面的那個包結構截圖可以看到,緊挨着GlobalTransactional這個註解還定義了一個AspectTransactionalInterceptor

public class AspectTransactionalInterceptor implements MethodInterceptor {

既然這個類實現了MethodInterceptor接口,那程序在執行的時候肯定會調用它的invoke方法

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    InvocationWrapper invocationWrapper = new DefaultInvocationWrapper(null, invocation.getThis(), specificMethod, invocation.getArguments());
    return this.globalTransactionalInterceptorHandler.invoke(invocationWrapper);
}

繼續點進去最後一行的globalTransactionalInterceptorHandler.invoke方法,就來到了org.apache.seata.integration.tx.api.interceptor.handler.AbstractProxyInvocationHandler#invoke

@Override
public Object invoke(InvocationWrapper invocation) throws Throwable {
    if (CollectionUtils.isNotEmpty(getMethodsToProxy()) && !getMethodsToProxy().contains(invocation.getMethod().getName())) {
        return invocation.proceed();
    }
    if (nextInvocationHandlerChain != null) {
        invocation = new NestInterceptorHandlerWrapper(nextInvocationHandlerChain, ihttp://blog.shengxiao.tech/imagesnvocation);
    }
    return doInvoke(invocation);
}

最終來到org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#doInvoke

【分佈式事務】5、分佈式事務Seata源碼學習_seata_05

@Override
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
    Class<?> targetClass = invocation.getTarget().getClass();
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            final AspectTransactional globalTransactionalAnnotation = getAspectTransactional(specificMethod, targetClass);
            final GlobalLockConfig globalLockAnnotation = getGlobalLockConfig(specificMethod, targetClass);
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    transactional = globalTransactionalAnnotation;
                } else {
                    transactional = this.aspectTransactional;
                }
                return handleGlobalTransaction(invocation, transactional);
            } else if (globalLockAnnotation != null) {
                return handleGlobalLock(invocation, globalLockAnnotation);
            }
        }
    }
    return invocation.proceed();
}

這裏邊有兩個分支,一個是如果globalTransactionalAnnotation或者aspectTransactional不是空的話,就進入handleGlobalTransaction,否則進入handleGlobalLock

進入org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#handleGlobalTransaction,其實只有一行transactionalTemplate.execute,但是這個方法的調用傳了一個匿名內部類進來,看一下這個execute方法

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                tx = GlobalTransactionContext.createNew();
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,else create
                tx = GlobalTransactionContext.getCurrentOrCreate();
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                      , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {
            LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());
        }

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            //5. clear
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion(tx);
            cleanUp(tx);
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

這裏看到兩個重點內容:

  • 事務隔離級別

seata自己定義了幾種事務隔離級別,但基本上保持和spring的事務隔離級別一致,比如NOT_SUPPORTED級別的時候,將當前事務掛起,以非事務的方式執行並返回;再比如REQUIRES_NEW的時候,將當前事務掛起,然後創建一個新的事務並繼續執行後續的事務和業務邏輯。

這裏一共定義了6種,相比較spring少了一種NESTED

  • NOT_SUPPORTED:不支持事務,如果有事務,則掛起,以非事務的方式運行
  • REQUIRES_NEW:如果當前有事務,則將當前事務掛起,然後新起一個事務執行
  • SUPPORTS:如果當前沒有事務,則以非事務的方式執行,否則就以當前事務執行
  • REQUIRED:如果當前沒有事務,則創建一個事務執行,否則就用當前事務執行
  • NEVER:如果當前存在事務,則報錯,如果不存在事務,就正常按照非事務的方式執行
  • MANDATORY:如果當前事務不存在,則報錯,如果存在,就以當前事務執行
  • 事務的處理

上面的代碼可以明顯看到beginTransactionccommitTransaction、以及最裏邊的catch裏邊的completeTransactionAfterThrowing

1、beginTransaction方法

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
 if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
     if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Ignore begin: just involved in global transaction [{}]", tx.getXid());
     }
     return;
 }
 try {
     triggerBeforeBegin();
     tx.begin(txInfo.getTimeOut(), txInfo.getName());
     triggerAfterBegin();
 } catch (TransactionException txe) {
     throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                        TransactionalExecutor.Code.BeginFailure);

 }
}

這個方法裏邊就是觸發事務開啓之前、事務開啓、以及觸發開啓事務之後三個方法,主要看下tx.begin

org.apache.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

@Override
public void begin(int timeout, String name) throws TransactionException {
 this.createTime = System.currentTimeMillis();
 if (role != GlobalTransactionRole.Launcher) {
     assertXIDNotNull();
     if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
     }
     return;
 }
 assertXIDNull();
 String currentXid = RootContext.getXID();
 if (currentXid != null) {
     throw new IllegalStateException("Global transaction already exists," +
                                     " can't begin a new global transaction, currentXid = " + currentXid);
 }
 xid = transactionManager.begin(null, null, name, timeout);
 status = GlobalStatus.Begin;
 RootContext.bind(xid);
 if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Begin new global transaction [{}]", xid);
 }
}

這個方法開啓了一個事務,並且得到一個xid,然後將這個xid綁定到上下文中,具體怎麼開啓的事務呢?其實就是執行一個遠程調用org.apache.seata.tm.DefaultTransactionManager#begin

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
 throws TransactionException {
 GlobalBeginRequest request = new GlobalBeginRequest();
 request.setTransactionName(name);
 request.setTimeout(timeout);
 GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
 if (response.getResultCode() == ResultCode.Failed) {
     throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
 }
 return response.getXid();
}

也就是這裏的syncCall,會得到一個響應

public class GlobalBeginResponse extends AbstractTransactionResponse {
  private String xid;
  private String extraData;
}

2、commitTransaction

這個方法是提交事務的方法,方法裏邊如果超時,會回滾;如果正常提交,會在提交之前觸發提交前的hook,在提交之後觸發提交後的hook

private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
 throws TransactionalExecutor.ExecutionException, TransactionException {
 if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
     if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Ignore commit: just involved in global transaction [{}]", tx.getXid());
     }
     return;
 }
 if (isTimeout(tx.getCreateTime(), txInfo)) {
     // business execution timeout
     Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                                                String.format("client detected transaction timeout before commit, so change to rollback, xid = %s", tx.getXid()));
     rollbackTransaction(tx, exx);
     return;
 }

 try {
     triggerBeforeCommit();
     tx.commit();
     GlobalStatus afterCommitStatus = tx.getLocalStatus();
     TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;
     switch (afterCommitStatus) {
         case TimeoutRollbacking:
             code = TransactionalExecutor.Code.Rollbacking;
             break;
         case TimeoutRollbacked:
             code = TransactionalExecutor.Code.RollbackDone;
             break;
         case Finished:
             code = TransactionalExecutor.Code.CommitFailure;
             break;
         default:
     }
     Exception statusException = null;
     if (GlobalStatus.isTwoPhaseHeuristic(afterCommitStatus)) {
         statusException = new TmTransactionException(TransactionExceptionCode.CommitHeuristic,
                                                      String.format("Global transaction[%s] not found, may be rollbacked.", tx.getXid()));
     } else if (GlobalStatus.isOnePhaseTimeout(afterCommitStatus)) {
         statusException = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                                                      String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid()));
     }
     if (null != statusException) {
         throw new TransactionalExecutor.ExecutionException(tx, statusException, code);
     }
     triggerAfterCommit();
 } catch (TransactionException txe) {
     // 4.1 Failed to commit
     throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                        TransactionalExecutor.Code.CommitFailure);
 }
}

3、completeTransactionAfterThrowing

這個方法在catch裏邊,也就是説當程序執行異常的時候會執行。點進去看到裏邊會進行事務的回滾(事務信息不為空 且 拋出的異常是指定的事務回滾異常)

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
 throws TransactionalExecutor.ExecutionException, TransactionException {
 //roll back
 if (txInfo != null && txInfo.rollbackOn(originalException)) {
     rollbackTransaction(tx, originalException);
 } else {
     // not roll back on this exception, so commit
     commitTransaction(tx, txInfo);
 }
}



4、business.execute

執行的業務方法


2.3、各事務角色的請求交互

前面看到事務的”拆解“執行過程,那麼RM/TM與RC之間是如何交互的呢?總的來説有異步和同步的方式。下面以開啓事務方法org.apache.seata.tm.DefaultTransactionManager#begin為例

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

繼續看syncCall方法

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    try {
        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    }
}

繼續org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)方法

@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    long timeoutMillis = this.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    if (this.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                                                                           key -> new LinkedBlockingQueue<>());
        if (!basket.offer(rpcMessage)) {
            LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                         serverAddress, rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }

        try {
            Object response = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            return response;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }

}

在這個方法裏邊,外邊的if分支邏輯為如果開啓了批量提交請求,則走if,否則,就同步調用else裏邊的邏輯。而在if這個邏輯裏邊seata的方案是將請求封裝成一個org.apache.seata.core.protocol.RpcMessage對象,然後將其丟到一個阻塞隊列裏邊,交由定時異步任務去消費處理。這個阻塞隊列又以serverAddress作為key放到了一個map裏邊。異步任務真正獲取數據是從這個map裏邊獲取的。

這個basketMap在哪兒消費處理呢?

前面我們在看RM、TM建立與TC連接的代碼的時候就看到了org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(() -> {
        try {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        } catch (Exception ex) {
            LOGGER.warn("reconnect server failed. {}", ex.getMessage());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

這裏一個將一個MergedSendRunnable的任務丟到了線程池中去交由線程池調度執行。這個類的run方法如下:

@Override
public void run() {
    while (true) {
        synchronized (mergeLock) {
            try {
                mergeLock.wait(MAX_MERGE_SEND_MILLS);
            } catch (InterruptedException e) {
            }
        }
        isSending = true;
        basketMap.forEach((address, basket) -> {
            if (basket.isEmpty()) {
                return;
            }

            MergedWarpMessage mergeMessage = new MergedWarpMessage();
            while (!basket.isEmpty()) {
                RpcMessage msg = basket.poll();
                mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                mergeMessage.msgIds.add(msg.getId());
            }
            if (mergeMessage.msgIds.size() > 1) {
                printMergeMessageLog(mergeMessage);
            }
            Channel sendChannel = null;
            try {
                // send batch message is sync request, but there is no need to get the return value.
                // Since the messageFuture has been created before the message is placed in basketMap,
                // the return value will be obtained in ClientOnResponseProcessor.
                sendChannel = clientChannelManager.acquireChannel(address);
                AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
            } catch (FrameworkException e) {
                if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                    destroyChannel(address, sendChannel);
                }
                // fast fail
                for (Integer msgId : mergeMessage.msgIds) {
                    MessageFuture messageFuture = futures.remove(msgId);
                    Integer parentId = childToParentMap.remove(msgId);
                    if (parentId != null) {
                        mergeMsgMap.remove(parentId);
                    }
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(
                            new RuntimeException(String.format("%s is unreachable", address), e));
                    }
                }
                LOGGER.error("client merge call failed: {}", e.getMessage(), e);
            }
        });
        isSending = false;
    }
}

這個任務類的run方法的try裏邊就是真正發送請求的方法AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);

@Override
public void sendAsyncRequest(Channel channel, Object msg) {
    if (channel == null) {
        LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
        return;
    }
    RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
                                                ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
                                                : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
    Object body = rpcMessage.getBody();
    if (body instanceof MergeMessage) {
        Integer parentId = rpcMessage.getId();
        mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
        if (body instanceof MergedWarpMessage) {
            for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
                childToParentMap.put(msgId, parentId);
            }
        }
    }
    super.sendAsync(channel, rpcMessage);
}