1、Seata的AT模型回顧
1.1、Seata的AT模型
- 在服務A中開啓全局事務,向TC申請全局事務id
- 服務A發起RPC遠程調用,分佈式事務參與方服務B和服務C分別向TC註冊分支事務
- 參與方提交本地事務,並記錄undo_log表
- 參與方上報分支事務的執行狀態
- TC根據各分支事務執行狀態,通知全局事務是要提交還是回滾
- 分佈式事務各參與方TC指示,如果是要提交全局事務,就直接刪除本地表的undo_log表裏邊對應的記錄即可,如果是需要回滾,則根據undo_log表裏邊的before_image對對應的數據進行回滾
1.2、Seata的AT模型在實際開發中的應用
要在應用中使用Seata的AT模型也非常簡單,只需要簡單幾步:
- ① 引入seata依賴
- ② 創建本地undo_log表
- ③ 在配置文件中進行Seata相關的配置
- ④
@GlobalTransactional註解開啓分佈式事務
具體操作見前面的博客內容
2、核心步驟源碼分析(AT)
2.1、RM、TM建立與TC的連接
要想開啓分佈式事務,前提是RM、TM持有和TC的連接,這樣才能完成通信。那麼這個連接是在什麼時候建立的呢?首先我們看一下@GlobalTransactional註解的包結構:
可以看到這個註解定義在org.apache.seata.spring.annotation這個包下,與此同時還定義了一個以Scanner結尾的掃描器,點開看一下這個掃描器
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
這個掃描器實現了InitializingBean這個接口,那麼就一定實現了它的afterPropertiesSet方法
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient(); //初始化客户端,什麼客户端呢?
}
this.findBusinessBeanNamesNeededEnhancement();
}
這裏有一個初始化客户端的方法,那麼初始化的是什麼客户端呢?繼續點進去就可以看到初始化TM和RM的代碼
protected void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
以初始化RM的代碼為例,繼續
public class RMClient {
/**
* Init.
*
* @param applicationId the application id
* @param transactionServiceGroup the transaction service group
*/
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}
繼續點進init方法來到org.apache.seata.core.rpc.netty.RmNettyRemotingClient#init
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
// Found one or more resources that were registered before initialization
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}
}
}
先看一下super.init(),進入org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init
@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
這裏調了一個getTransactionServiceGroup()方法,transactionServiceGroup我們之前就在配置文件中配置過,前面提到會根據這個serviceGroup去找到對應的SeataServer,不妨點clientChannelManager.reconnect() 進去看一下,看是不是根據這個serviceGroup去連接TC,點進去之後經過幾個重載的方法後來到了doReconnect方法:
void doReconnect(String transactionServiceGroup, boolean failFast) {
List<String> availList;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
throwFailFastException(failFast, "Failed to get available servers");
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
throwFailFastException(failFast, "can not get cluster name in registry config.");
return;
}
if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
throwFailFastException(failFast, "no available service found in cluster.");
return;
}
try {
doReconnect(availList, transactionServiceGroup);
} catch (Exception e) {
if (failFast) {
throw e;
}
LOGGER.error("connect server failed. {}", e.getMessage(), e);
}
}
確實是拿到了一個可用的地址列表availList,然後根據這個列表繼續調了doReconnect方法
void doReconnect(List<String> availList, String transactionServiceGroup) {
Set<String> channelAddress = new HashSet<>(availList.size());
Map<String, Exception> failedMap = new HashMap<>();
try {
for (String serverAddress : availList) {
try {
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
} catch (Exception e) {
failedMap.put(serverAddress, e);
}
}
if (failedMap.size() > 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
failedMap.keySet(),
failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
} else if (LOGGER.isDebugEnabled()) {
failedMap.forEach((key, value) -> {
LOGGER.error("{} can not connect to {} cause:{} trace information:",
FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
});
}
}
if (availList.size() == failedMap.size()) {
String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", ");
throw new FrameworkException("can not connect to [" + invalidAddress + "]");
}
} finally {
if (CollectionUtils.isNotEmpty(channelAddress)) {
List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
for (String address : channelAddress) {
String[] array = NetUtil.splitIPPortStr(address);
aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
}
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
} else {
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
}
}
}
可以看到會根據這個列表迭代,然後根據每個serverAddress通過acquireChannel方法獲取了一個Channel(如果沒有現成的連接,則會通過doConnect(serverAddress)方法創建一個channel)
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null) {
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (channelToServer != null) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to {}", serverAddress);
}
Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
synchronized (lockObj) {
return doConnect(serverAddress);
}
}
TM的連接建立流程類似,總的來説大致的流程如下:
2.2、事務處理
前面我們知道,分佈式事務的開啓是靠@GlobalTransactional這個註解開啓的,那麼肯定就有一個地方來識別這個註解並做一些事情。如果是我們自己來做的化,最先想到的肯定是基於aop來做,而從最上面的那個包結構截圖可以看到,緊挨着GlobalTransactional這個註解還定義了一個AspectTransactionalInterceptor類
public class AspectTransactionalInterceptor implements MethodInterceptor {
既然這個類實現了MethodInterceptor接口,那程序在執行的時候肯定會調用它的invoke方法
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
InvocationWrapper invocationWrapper = new DefaultInvocationWrapper(null, invocation.getThis(), specificMethod, invocation.getArguments());
return this.globalTransactionalInterceptorHandler.invoke(invocationWrapper);
}
繼續點進去最後一行的globalTransactionalInterceptorHandler.invoke方法,就來到了org.apache.seata.integration.tx.api.interceptor.handler.AbstractProxyInvocationHandler#invoke
@Override
public Object invoke(InvocationWrapper invocation) throws Throwable {
if (CollectionUtils.isNotEmpty(getMethodsToProxy()) && !getMethodsToProxy().contains(invocation.getMethod().getName())) {
return invocation.proceed();
}
if (nextInvocationHandlerChain != null) {
invocation = new NestInterceptorHandlerWrapper(nextInvocationHandlerChain, ihttp://blog.shengxiao.tech/imagesnvocation);
}
return doInvoke(invocation);
}
最終來到org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#doInvoke
@Override
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
Class<?> targetClass = invocation.getTarget().getClass();
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
final AspectTransactional globalTransactionalAnnotation = getAspectTransactional(specificMethod, targetClass);
final GlobalLockConfig globalLockAnnotation = getGlobalLockConfig(specificMethod, targetClass);
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = globalTransactionalAnnotation;
} else {
transactional = this.aspectTransactional;
}
return handleGlobalTransaction(invocation, transactional);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(invocation, globalLockAnnotation);
}
}
}
return invocation.proceed();
}
這裏邊有兩個分支,一個是如果globalTransactionalAnnotation或者aspectTransactional不是空的話,就進入handleGlobalTransaction,否則進入handleGlobalLock
進入org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#handleGlobalTransaction,其實只有一行transactionalTemplate.execute,但是這個方法的調用傳了一個匿名內部類進來,看一下這個execute方法
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
tx = GlobalTransactionContext.createNew();
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,else create
tx = GlobalTransactionContext.getCurrentOrCreate();
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {
LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());
}
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx, txInfo);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion(tx);
cleanUp(tx);
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
這裏看到兩個重點內容:
- 事務隔離級別
seata自己定義了幾種事務隔離級別,但基本上保持和spring的事務隔離級別一致,比如NOT_SUPPORTED級別的時候,將當前事務掛起,以非事務的方式執行並返回;再比如REQUIRES_NEW的時候,將當前事務掛起,然後創建一個新的事務並繼續執行後續的事務和業務邏輯。
這裏一共定義了6種,相比較spring少了一種
NESTED:
- NOT_SUPPORTED:不支持事務,如果有事務,則掛起,以非事務的方式運行
- REQUIRES_NEW:如果當前有事務,則將當前事務掛起,然後新起一個事務執行
- SUPPORTS:如果當前沒有事務,則以非事務的方式執行,否則就以當前事務執行
- REQUIRED:如果當前沒有事務,則創建一個事務執行,否則就用當前事務執行
- NEVER:如果當前存在事務,則報錯,如果不存在事務,就正常按照非事務的方式執行
- MANDATORY:如果當前事務不存在,則報錯,如果存在,就以當前事務執行
- 事務的處理
上面的代碼可以明顯看到beginTransaction、c、commitTransaction、以及最裏邊的catch裏邊的completeTransactionAfterThrowing
1、beginTransaction方法
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore begin: just involved in global transaction [{}]", tx.getXid());
}
return;
}
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
這個方法裏邊就是觸發事務開啓之前、事務開啓、以及觸發開啓事務之後三個方法,主要看下tx.begin
org.apache.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
@Override
public void begin(int timeout, String name) throws TransactionException {
this.createTime = System.currentTimeMillis();
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
這個方法開啓了一個事務,並且得到一個xid,然後將這個xid綁定到上下文中,具體怎麼開啓的事務呢?其實就是執行一個遠程調用
org.apache.seata.tm.DefaultTransactionManager#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
也就是這裏的
syncCall,會得到一個響應
public class GlobalBeginResponse extends AbstractTransactionResponse {
private String xid;
private String extraData;
}
2、commitTransaction
這個方法是提交事務的方法,方法裏邊如果超時,會回滾;如果正常提交,會在提交之前觸發提交前的hook,在提交之後觸發提交後的hook
private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
throws TransactionalExecutor.ExecutionException, TransactionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore commit: just involved in global transaction [{}]", tx.getXid());
}
return;
}
if (isTimeout(tx.getCreateTime(), txInfo)) {
// business execution timeout
Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
String.format("client detected transaction timeout before commit, so change to rollback, xid = %s", tx.getXid()));
rollbackTransaction(tx, exx);
return;
}
try {
triggerBeforeCommit();
tx.commit();
GlobalStatus afterCommitStatus = tx.getLocalStatus();
TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;
switch (afterCommitStatus) {
case TimeoutRollbacking:
code = TransactionalExecutor.Code.Rollbacking;
break;
case TimeoutRollbacked:
code = TransactionalExecutor.Code.RollbackDone;
break;
case Finished:
code = TransactionalExecutor.Code.CommitFailure;
break;
default:
}
Exception statusException = null;
if (GlobalStatus.isTwoPhaseHeuristic(afterCommitStatus)) {
statusException = new TmTransactionException(TransactionExceptionCode.CommitHeuristic,
String.format("Global transaction[%s] not found, may be rollbacked.", tx.getXid()));
} else if (GlobalStatus.isOnePhaseTimeout(afterCommitStatus)) {
statusException = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid()));
}
if (null != statusException) {
throw new TransactionalExecutor.ExecutionException(tx, statusException, code);
}
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}
3、completeTransactionAfterThrowing
這個方法在catch裏邊,也就是説當程序執行異常的時候會執行。點進去看到裏邊會進行事務的回滾(事務信息不為空 且 拋出的異常是指定的事務回滾異常)
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
throws TransactionalExecutor.ExecutionException, TransactionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
rollbackTransaction(tx, originalException);
} else {
// not roll back on this exception, so commit
commitTransaction(tx, txInfo);
}
}
4、business.execute
執行的業務方法
2.3、各事務角色的請求交互
前面看到事務的”拆解“執行過程,那麼RM/TM與RC之間是如何交互的呢?總的來説有異步和同步的方式。下面以開啓事務方法org.apache.seata.tm.DefaultTransactionManager#begin為例
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
繼續看syncCall方法
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
繼續org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)方法
@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
Object response = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
return response;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException)exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
在這個方法裏邊,外邊的if分支邏輯為如果開啓了批量提交請求,則走if,否則,就同步調用else裏邊的邏輯。而在if這個邏輯裏邊seata的方案是將請求封裝成一個org.apache.seata.core.protocol.RpcMessage對象,然後將其丟到一個阻塞隊列裏邊,交由定時異步任務去消費處理。這個阻塞隊列又以serverAddress作為key放到了一個map裏邊。異步任務真正獲取數據是從這個map裏邊獲取的。
這個basketMap在哪兒消費處理呢?
前面我們在看RM、TM建立與TC連接的代碼的時候就看到了org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init
@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
這裏一個將一個MergedSendRunnable的任務丟到了線程池中去交由線程池調度執行。這個類的run方法如下:
@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
Integer parentId = childToParentMap.remove(msgId);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
這個任務類的run方法的try裏邊就是真正發送請求的方法AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
@Override
public void sendAsyncRequest(Channel channel, Object msg) {
if (channel == null) {
LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
return;
}
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
Object body = rpcMessage.getBody();
if (body instanceof MergeMessage) {
Integer parentId = rpcMessage.getId();
mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
if (body instanceof MergedWarpMessage) {
for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
childToParentMap.put(msgId, parentId);
}
}
}
super.sendAsync(channel, rpcMessage);
}