RocketMQ概念介紹
RocketMQ 概念介紹
1、消息隊列使用場景介紹
- 解耦:如果服務 A 調用服務 B 時是同步依賴,那麼 B 服務壓力過大可能導致整個系統鏈路阻塞。
- 流量削峯填谷:高併發場景下(如電商秒殺),直接將請求打到數據庫或下游服務會導致瞬時壓力過大。消息隊列可以充當緩衝,異步處理峯值流量。
- 異步處理:對於非實時業務(日誌處理、統計計算、推薦系統)可異步處理,提高整體吞吐和響應速度。
2、為什麼選擇RocketMQ?
Kafka: 雖然吞吐量驚人,但在企業場景中有明顯短板: - 缺少事務消息、延時消息等關鍵特性 - 運維依賴ZooKeeper,故障排查困難
RabbitMQ:功能全面,但性能瓶頸明顯: - Erlang VM的GC問題在高負載時暴露 - 鏡像隊列的網絡開銷過大 - 消息堆積時內存壓力巨大。
RocketMQ: 在各維度都做到了很好的平衡: - 單機10萬TPS對大部分業務夠用 - 事務消息、順序消息、延時消息等特性齊全 - NameServer去中心化設計,運維相對簡單 - 阿里雙11驗證,穩定性有保障。
3、RocketMQ核心概念
核心組件
- NameServer:去中心化的路由註冊中心,提供輕量級的服務發現和路由功能。
- Broker:消息存儲和中轉的核心組件,負責消息的接收、存儲和轉發。
- Producer:消息生產者,負責向Broker發送消息。
- Consumer:消息消費者,負責從Broker消費消息。
- Topic:消息的邏輯分類,一個Topic可以有多個隊列。
- Message Queue:消息的物理存儲單元,一個Topic可以有多個隊列。
- Message:消息的載體,包含消息體、消息屬性等。
消息類型
- 普通消息:最基本的消息類型。
- 順序消息:保證消息的消費順序。
- 延時消息:消息延遲消費。
- 批量消息:將多條消息批量發送。
- 事務消息:保證分佈式事務的一致性。
4、消息流轉過程:
1. 啓動初始化:
Broker註冊:Broker啓動時,會向所有NameServer註冊自身信息(包括IP地址、端口、存儲的Topic-Queue列表,如“pay_topic的Queue0-3在Broker-A”)。
路由表生成:NameServer彙總所有Broker的信息,形成全局路由表(記錄“哪個Topic的消息在哪些Broker的哪些MessageQueue上”)。
2. 生產者發送消息:從業務系統到Broker存儲
步驟1:獲取路由
生產者啓動時,從NameServer拉取“pay_topic”對應的Broker地址(假設返回Broker-A和Broker-B)。
步驟2:選擇MessageQueue
生產者按負載均衡策略(默認輪詢,或按業務ID哈希,如訂單ID%Queue數)選擇一個MessageQueue(如Broker-A的Queue0)。
MessageQueue消息的“物理存儲單元”,每個Topic包含多個MessageQueue(默認4個),可以分散在不同Broker上(比如Topic有8個MessageQueue,分佈在2個Broker上,每個Broker存4個)。
核心作用:MessageQueue是實現“並行處理”的關鍵——生產者可向多個Queue並行發送消息,消費者可從多個Queue並行拉取消息,大幅提升吞吐量(類似“多個貨架同時存/取包裹”)。
步驟3:發送消息
生產者通過網絡將消息發送到Broker-A的Queue0,支持三種發送方式(默認超時時間3S):
- 同步發送:等Broker返回“成功”後再繼續(適合核心消息,如支付通知);
- 異步發送:發送後立即返回,Broker處理完通過回調通知結果(適合非核心但需結果的場景);
- 單向發送:只發不關心結果(適合日誌、監控等消息)。
步驟4:Broker存儲消息
Broker-A收到消息後,執行兩步存儲:
- 寫入CommitLog(全局日誌文件,所有消息混存,按時間順序寫入);單個文件默認1G。
- 同步到ConsumeQueue(消息索引文件,按Topic-Queue劃分,記錄消息在CommitLog中的位置,便於消費者快速查詢)。每個文件默認5.72M。
3. 消費者消費消息:從Broker拉取到業務處理
步驟1:獲取路由
消費者啓動時,同樣從NameServer獲取“pay_topic”的Broker地址(Broker-A和Broker-B)。
步驟2:拉取消息
消費者向Broker-A發起拉取請求,根據Offset(消費進度,記錄“已消費到MessageQueue0的第100條消息”)拉取未消費的消息(如從第101條開始)。
步驟3:處理消息
消費者接收消息後,執行業務邏輯(如向用户發送支付短信、更新訂單狀態)。
步驟4:提交Offset
在實現消費者的業務邏輯時,應該要儘量使⽤同步實現⽅式,保證在⾃⼰業務處理完成之後再向Broker端返回狀態。處理成功後,返回值是⼀個枚舉值,有兩個選項 CONSUME_SUCCESS和RECONSUME_LATER。如果消費者返回CONSUME_SUCCESS,那麼消息⾃ 然就處理結束了。但是如果消費者沒有處理成功,返回的是RECONSUME_LATER,Broker就會過⼀段時間再發起消息重試。消費者向Broker提交新的Offset(如“已消費到第150條”),Broker記錄該進度(集羣消費時Offset存在Broker,廣播消費時存在本地)。
異常處理:
- 若消費失敗(如業務邏輯拋異常),消息不會提交Offset,RocketMQ的做法是給每個消費者組⾃動⽣成⼀個對應的重試Topic。RocketMQ 會自動將消息推送到對應的重試隊列中。可以通過配置定時重試時間間隔、最大重試次數等。
- 重試超過16次(默認)後,消息進入死信隊列(DLQ),需人工排查處理(如“pay_topic%DLQ”)。
對於順序消息和併發消息處理策略:
消費者消費消息時也可以指定消費的偏移量
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET, //從對列的最後⼀條消息開始消費
CONSUME_FROM_FIRST_OFFSET, //從對列的第⼀條消息開始消費
CONSUME_FROM_TIMESTAMP; //從某⼀個時間點開始重新消費
}
4、核心特性:保障高可用與可靠性的關鍵設計
RocketMQ能在電商、金融等核心場景立足,依賴五大核心特性,從根本上解決“消息丟失、服務中斷、事務一致性”等問題。
1. 持久化機制:宕機不丟消息
消息寫入Broker後,並非只存在內存,而是通過“刷盤”寫入磁盤文件(CommitLog),確保Broker宕機後重啓可恢復消息。支持兩種刷盤策略:
- 同步刷盤:消息寫入磁盤後才返回生產者“成功”(核心消息必用,如支付消息,零丟失但性能略低);
- 異步刷盤:消息先存內存,定時(默認500ms)批量刷盤(非核心消息用,如日誌,性能高但極端情況可能丟消息)。
2. 主從架構:故障自動恢復
每個Broker可配置1個從節點(Slave),形成“主從備份”:
- 主節點(Master):負責接收生產者消息、處理消費者請求(讀寫都走主節點);
- 從節點(Slave):實時同步主節點的CommitLog,僅提供讀能力(分擔主節點讀壓力)。
核心能力:主節點宕機後,從節點通過DLedger協議(基於Raft算法)自動競選為新主(約10秒內完成),生產者/消費者通過NameServer感知新主地址,無縫切換,服務不中斷。
3. 事務消息:解決分佈式事務一致性
在分佈式系統中,“訂單創建”和“庫存扣減”需保證原子性(要麼都成功,要麼都失敗),RocketMQ通過“半消息+確認/回滾”實現:
- 步驟1:生產者發送“半消息”(暫存,消費者不可見);
- 步驟2:執行本地事務(如創建訂單);
- 步驟3:若事務成功,發送“確認”指令(半消息變為可見,消費者處理);若失敗,發送“回滾”指令(半消息刪除)。
兜底機制:若步驟3超時,Broker會主動查詢生產者事務狀態(回調check方法),避免消息長期處於半消息狀態。
4. 延遲消息:支持定時投遞
業務中常需“訂單15分鐘未支付自動取消”“30天后自動確認收貨”,RocketMQ通過“延遲隊列+定時調度”實現:
生產者發送消息時指定延遲級別(如message.setDelayTimeLevel(3),對應10秒,默認支持18個級別:1s/5s/10s/30s/1m…2h);
消息先存到“延遲隊列”(系統內置Topic:SCHEDULE_TOPIC_XXXX);
定時任務(每隔1s)掃描延遲隊列,將到期消息投遞到目標Topic,消費者即可接收。
5. 重試與死信:確保消息不“失聯”
消費失敗的消息不會被丟棄,而是進入重試機制:
- 重試隊列:消費失敗後,消息被放入“%RETRY%消費組名”隊列,默認重試16次(間隔從1s遞增到2h);
- 死信隊列:超過重試次數後,消息進入“%DLQ%消費組名”隊列,可人工查看日誌、修復問題後重新投遞,避免消息丟失且便於排查。
5、RocketMQ怎麼對文件進行讀寫
RocketMQ對文件的讀寫巧妙地利用了操作系統的一些高效文件讀寫方式——PageCache、順序讀寫、零拷貝。
1、傳統文件傳輸步驟:
read()系統調用:
- 上下文切換:程序從用户態切換到內核態。
- DMA 拷貝:DMA 引擎將文件數據從磁盤拷貝到內核空間的頁緩存。
- CPU 拷貝:CPU 將數據從內核頁緩存拷貝到用户空間的緩衝區。
write()系統調用:
- 上下文切換:程序再次從用户態切換到內核態。
- CPU 拷貝:CPU 將數據從用户空間的緩衝區拷貝到內核空間的 Socket 緩衝區。
- DMA 拷貝:DMA 引擎將數據從 Socket 緩衝區拷貝到網卡緩衝區,最終通過網絡發送出去。
2、RocketMQ使用零拷貝
1、DMA 拷貝:磁盤上的 CommitLog 文件數據被 DMA 引擎加載到內核的 Page Cache。(這一步與傳統方式相同,是必需的物理讀取)。
2、CPU 拷貝(僅描述符拷貝):
- Broker 進程調用
sendfile()系統調用。 - CPU 不再拷貝數據本身,而是將 Page Cache 中對應數據塊的內存地址和偏移量(文件描述符) 等信息,填充到 Socket 緩衝區。
3、DMA 拷貝
網卡驅動根據 Socket 緩衝區 中的描述符,直接從 Page Cache 的多個位置 將數據包的所有部分“收集”起來,一併發送到網絡。
這是關鍵:數據直接從內核緩衝區流向網卡,再也沒有經過用户態。
RocketMQ 獨特的存儲設計使得零拷貝能發揮最大效用:
- 所有消息順序寫入一個巨大的 CommitLog 文件。這意味着在磁盤上,數據是緊湊、連續的,非常適合順序讀和批量傳輸。
- 當 Consumer 拉取消息時,Broker 根據
ConsumeQueue(索引文件)找到一批消息在CommitLog中的物理偏移量。 - 這些消息在
CommitLog中很可能是連續存儲的(因為是順序追加寫入)。這樣,Broker 可以通過一次sendfile調用,將多個連續的消息內容(可能幾十KB)作為一個整體發送出去。 - 即使消息在
CommitLog中不絕對連續,sendfile也支持多次 Gather 操作,將多個不連續的數據塊一併發送,效率依然很高。