動態

詳情 返回 返回

為什麼不應該在事務中嵌套發送 MQ 消息和 RPC 調用? - 動態 詳情

引言

或許你曾寫過這樣的代碼:

@Transaction // 開啓事務
public void craeteOrder(Order order) {
    saveOrder(order);
    sendMQ(order); // 或者是發送 rpc
}

在一個事務內,向 MySQL 寫入數據,接下來發送 MQ 或 RPC 調用。在大部分情況下,這樣寫好像沒什麼問題

但如果此時我們下游執行反查操作,會發現找不到數據。更奇怪的是,這在業務的低谷期才會出現,而在高峯期反而不會出現?

存在問題

破壞事務原子性語義

數據庫事務只能保證數據庫操作的原子性(如 MySQL 的 InnoDB 事務),但無法控制外部系統的行為(如 MQ 或 RPC 服務)

  • 事務成功提交,但 MQ 消息發送失敗
  • 事務提交失敗,但 MQ 消息發送成功

我們所期望的事務原子性,就是操作要麼全部執行成功,要麼全部失敗。以上兩種情況,都將導致上下游數據不一致

以一個業務場景為例

電商場景中,用户支付成功後,在事務內:

  1. 更新訂單狀態為“已支付”(數據庫操作)
  2. 發送物流服務的 MQ 消息(外部操作)

假設步驟 1 執行成功,步驟 2 執行失敗,會出現——已支付,卻不發貨

假設步驟 1 執行失敗,步驟 2 執行成功,會出現——未支付,期待收穫

長事務

MQ 和 RPC 通常是網絡 I/O 操作,耗時可能會高於本地數據庫操作。同時,網絡環境是不穩定的,隨時可能會出現延遲、不可用、丟包等等。這些因素將延長事務的執行時間,導致:

  • 數據庫鎖競爭加劇,可能引發死鎖
  • 高併發場景下,RPC 耗時長,將增加 DB 連接池佔用時間,降低系統吞吐量

下游無法反查到數據

現在我們有個業務場景:用户支付後,需要創建訂單,併發送 RPC 請求給權益中心,來加積分

事務提交前,權益中心需要反查數據,但因為事務隔離級別為讀已提交以上,此時無法查詢到還未提交事務的訂單數據。那麼 RPC 返回失敗結果,導致本地事務無法提交。這就出現了個死循環——上游等待下游執行成功後才能提交事務,下游等待上游提交事務後才能返回執行成功

回答開頭的問題,低谷期的 MQ 消息會出現反查無法查詢到數據的情況,正是因為低谷期 MQ 消息能被及時消費,延遲幾乎跟 RPC 請求一樣,導致消費者會在事務提交前執行反查操作,出現和 RPC 請求一樣的問題。而高峯期因為 MQ 消費不及時,使得反查操作被“延後”了,在事務提交後才開始消費,所以可以查詢到數據

這可以看做是上游事務提交和下游消費的時序問題

解決方案

保證事務提交和消息發送的時序問題

讓消費者等一會

依舊是在事務中嵌套發送消息,不過消費者接收到消息時,主動 sleep 一定時間,再進行消費。或者發送延遲消息,保證消費者晚點再消費。目的是通過等待一定時間,保證消費者的消費行為發生在提交事務之後執行

不過缺點也明顯,延遲時間不好把控:

  • 延遲太短,消費者可能會在事務提交之前執行反查,使得延遲時間沒有意義
  • 延遲太長,將加大延遲,降低吞吐量

(有點類似 Redis 延遲雙刪的思想,等一會再執行接下來的操作。它們的缺點也都是一樣的,需要延遲多久不好把控)

在事務提交後再發消息

在事務提交後,再發送 MQ 消息和 RPC 請求,保證事務提交,在發送 MQ 消息和 RPC 請求之前執行,避免它們在事務中嵌套

public void craeteOrder(Order order) {
    saveOrderByTransaction(order); // 通過事務寫入訂單
    sendMQ(order); // 或者是發送 rpc,在事務之外執行
}

@Transaction // 只對 SQL 加事務
public void saveOrderByTransaction(order) {
    saveOrder(order);
}

但以上代碼還存在一個問題:如果事務執行失敗,代碼可能還會繼續向下執行,此時 MQ 消息依舊會被髮送成功。即本地事務提交失敗,但 MQ 消息成功發送,導致上下游狀態不一致

所以我們需要判斷事務是否成功提交,只有成功了,才發送消息。可以加個 if-else 解決,不過 Spring 給我們一個更優雅的解決方案:使用@TransactionalEventListener監聽事務狀態。當事務成功提交後,才執行某些邏輯。保證當事務提交失敗時,不發送 MQ 消息。只有當事務成功提交,才發送 MQ 消息

// OrderService 
@Transactional
public void createOrder(Order order) {         
    // 假設訂單創建成功後,發佈事件
    OrderCreatedEvent event = new OrderCreatedEvent(order);
    eventPublisher.publishEvent(event);
    
    saveOrder(order);   
}

// OrderCreatedEventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
    sendMQ(event);     
}

本地事務和 MQ 消息的原子性

上面的@TransactionalEventListener其實還存在一個問題:事務提交成功,但 MQ 消息發送失敗,無法保證本地事務和消息發送的原子性問題,即要麼都成功,要麼都失敗

分佈式事務

分佈式事務可以解決本地事務和 MQ 消息的原子性問題,但會帶來可靠性、性能、使用成本等問題,給系統帶來額外的複雜性。弊遠大於利

事務消息

在 RocketMQ 中,支持事務消息,可以保證本地事務和 MQ 消息的原子性。執行邏輯如下:

  1. 生產者將消息發送至 Apache RocketMQ 服務端
  2. Apache RocketMQ 服務端將消息持久化成功之後,向生產者返回 Ack 確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息
  3. 生產者開始執行本地事務邏輯
  4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit 或是 Rollback),服務端收到確認結果後處理邏輯如下:

    • 二次確認結果為 Commit:服務端將半事務消息標記為可投遞,並投遞給消費者
    • 二次確認結果為 Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者

本地消息表+定時任務

比較常見的解決方案是使用「本地消息表+定時任務」

在本地事務中,除了寫入業務數據外,還要將要發送的 MQ 消息寫入到 MySQL 的「消息表」中。而發送消息不再由業務代碼決定,而是由後台定時任務來輪詢「消息表」,定時發送消息。從而保證:

  • 本地事務執行失敗,不會發送 MQ 消息。因為消息表不會寫入該消息(回滾),定時任務自然不會發送該消息了
  • 本地事務執行成功,可以保證 MQ 消息一定能發送成功。定時任務查詢到消息表的消息後,發送消息。如果出現失敗,可以繼續重試。當達到一定重試次數還發送失敗,可以發送信息,讓人工介入處理

如果允許數據存在一定的延遲,即不是「強一致」的場景,只需要保證數據的「最終一致性」的話,「本地消息表+定時任務」是一個非常好的選擇,同時它也能解決上面提到的「事務提交和消息發送的時序」問題

// OrderService 
@Transactional
public void createOrder(Order order) {         
    saveOrder(order);
    saveOrderMesaage(order);   // 寫入本地消息表
    // 不需要在代碼寫發送 MQ 消息的邏輯
}

// MessageSendTask
@Scheduled(fixedRate = 1000) // 每隔 1 秒執行一次
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
    List<Message> messages = findMessages();
    sendMQInBatches(messages);
    // 接下來還要更新消息表中消息的發送狀態     
}

監聽 binlog

可以通過 canal 監聽上游數據庫的 binlog 日誌,解析日誌後發送到 MQ 中,由下游自行決定如何消費

優勢
  • 上游提交後才通知到相關係統,下游反查可以查到數據
  • 可以保證本地事務和 MQ 消息的最終一致性。只有事務提交了,才有 binlog,才能發送 MQ 消息,給下游消費
  • 解耦。業務只管正常寫入數據就行,具體的發送 MQ 消息的操作不需要關心
缺點

實現複雜,需要額外維護監聽 binlog 的第三方組件

避免反查

消息中包含了消費者所需要的字段,即通過冗餘字段,避免反查操作。那麼對於下游,就不需要關心上游事務什麼時候提交。不過這樣會帶來額外的問題:

  • 使生產者的邏輯更復雜
  • 增大的消息的體積,對網絡帶寬和 MQ 帶來額外負擔
  • MQ 的引入是為了解耦,即生產者不需要關心消費者是如何去使用數據的。如果生產者需要根據各類消費者定製消息,那麼就會將生產者和消費者耦合在一起。這樣開發上游的同學,還要去梳理整個消費邏輯,開發上游的同學可能就不樂意了,導致上下游的開發很難協調,同時也需要有人去推動這種修改。所以,這不僅僅是個技術上的問題

而且在實際業務中,很多場景反查操作是不可避免的,我們不能假設反查的操作一定不存在

  • 在某些團隊中,會一刀切,即消息只允許攜帶主鍵值,這就導致反查數據庫是必然發生的
  • 下游需要查詢到上游最新的數據

    • 網絡拍賣場景中,加價是依賴當前最新的價格往上加的,此時我們必須拿到最新的數據,而不是用户當前在網頁看到的“舊”的數據,即「當前讀」,此時肯定要反查數據拿到最新值
    • 在電商場景中,我們訂單的支付金額一般是用户創建訂單時“看到的”價格。可能創建訂單後,商品加價了,但我們一般還是以創建訂單時的價格為準,即「快照讀」,此時就不一定需要反查數據了

結語

事務中嵌套發送 MQ 消息和 RPC 調用,會導致:

  • 事務回滾導致上下游數據不一致
  • 增加事務執行時間,加大鎖競爭,導致吞吐量下降(長事務)
  • 下游無法反查到未提交的數據

事務內應該只包含可靠的、可回滾數據。即,不要在事務中嵌套發送 MQ 消息和 RPC 調用

常見的解決方案:

  • 本地消息表+定時任務。由定時任務來發送 MQ 消息。實現簡單,可靠,效果好
  • 事務消息。依賴 RocketMQ 實現
  • 監聽 binlog。實現成本較高

同時因為數據庫主從延遲的存在,反查不保證一定能查到數據,適當的重試也是不可避免的


如果文章對你有幫助,歡迎點贊+收藏+關注,有問題歡迎在評論區評論哦!

公眾號【牛肉燒烤屋】

參考資料

b 站:BV1BtKNeDEkX

b 站:BV1S4woeBEuE

https://rocketmq.apache.org/zh/docs/featureBehavior/04transac...

user avatar vanve 頭像 monkeynik 頭像 sofastack 頭像 ayuan01 頭像 lenve 頭像 lu_lu 頭像 nianqingyouweidenangua 頭像 zbooksea 頭像 jibianoububian 頭像 god23bin 頭像 best_6455a509a2177 頭像 javalover 頭像
點贊 40 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.