动态

详情 返回 返回

TDMQ CKafka 版客户端實戰指南系列之二:消費消息最佳實踐 - 动态 详情

導語

在數字化時代,消息隊列系統已成為企業架構中不可或缺的一部分,其中,TDMQ CKafka 版作為一種高效、可擴展的分佈式消息系統,廣泛應用於各類業務場景中。上一篇我們深入探討了 TDMQ CKafka 版的生產實踐,從消息發送、分區策略到高可用性保障,全方位解析瞭如何在生產環境中高效利用 TDMQ CKafka 版。本文將接續前文,聚焦於 TDMQ CKafka 版的消費實踐,探討如何穩紮穩打、精準消費,確保消息從生產到消費的完整鏈條順暢無阻。

在消費篇中,我們將詳細闡述消費消息的基本流程、負載均衡機制、應對重平衡的策略、訂閲關係的管理、消費位點的控制、消息重複與消費冪等性的處理、消費失敗的應對、消費延遲與堆積的解決,以及如何通過調整套接字緩衝區、模擬消息廣播、實現消息過濾等高級技巧,優化 TDMQ CKafka 版的消費性能。

接下來,讓我們一同深入探索 TDMQ CKafka 版的消費實踐,解鎖高效消息處理的秘訣。

消費篇:穩紮穩打,精準消費

消費消息流程

消費消息的基本流程並不複雜,首先是 Poll 數據,消費者從消息隊列中拉取消息;接着執行消費邏輯,對拉取到的消息進行處理;處理完成後再次 Poll 數據,如此循環往復。例如,在一個電商訂單處理系統中,消費者從消息隊列中拉取訂單消息,然後根據訂單信息進行庫存扣減、訂單狀態更新等操作,完成後繼續拉取下一批訂單消息。

負載均衡機制

負載均衡

負載均衡在消費過程中起着關鍵作用。每個 Consumer Group 可以包含多個 Consumer ,只要將參數 group.id 設置成相同的值,這些 Consumer 就屬於同一個 Consumer Group,共同負責消費訂閲的 Topic。

例如:Consumer Group A 訂閲了 Topic A,並開啓三個消費實例 C1、C2、C3,則發送到 Topic A 的每條消息最終只會傳給 C1、C2、C3 的某一個。TDMQ CKafka 版默認會均勻地把消息傳給各個消費實例,以做到消費負載均衡。

TDMQ CKafka 版負載均衡的內部原理是:把訂閲的 Topic 的分區,平均分配給各個 Consumer。因此,Consumer 的個數不要大於分區的數量,否則會有消費實例分配不到任何分區而處於空跑狀態,儘量保證消費者數量能被分區總數整除。除了第一次啓動上線之外,後續消費實例發生重啓、增加、減少,分區數發生增加等變更時,都會觸發一次重均衡。

應對重均衡

如果頻繁出現 Rebalance,可能有多種原因。

1、  消費者消費處理耗時很長,比如在處理一些複雜的業務邏輯時,可能需要進行多次數據庫查詢或遠程接口調用,這會導致消費速度變慢;

2、  消費某一個異常消息也可能導致消費者阻塞或者失敗,例如消息格式錯誤,消費者無法解析;

3、  心跳超時同樣會引發 Rebalance 。

4、  在 v0.10.2 之前版本的客户端,Consumer 沒有獨立線程維持心跳,而是把心跳維持與 Poll 接口耦合在一起,若用户消費出現卡頓,就會導致 Consumer 心跳超時,引發 Rebalance;在 v0.10.2 及之後版本的客户端,如果消費時間過慢,超過一定時間(max.poll.interval.ms 設置的值,默認5分鐘)未進行 Poll 拉取消息,則會導致客户端主動離開隊列,而引發 Rebalance。

可以通過優化消費處理提高消費速度和參數調整等方法解決:

1、  消費端需要和 Broker 版本保持一致。

2、  可以參考以下説明調整參數值:

● session.timeout.ms:在 v0.10.2 之前的版本可適當提高該參數值,需要大於消費一批數據的時間,但不要超過 30s,建議設置為25s ,而 v0.10.2 及其之後的版本,保持默認值10s即可;

● max.poll.records:降低該參數值,建議遠遠小於單個線程每秒消費的條數 * 消費線程的個數 * max.poll.interval.ms / 1000 的值;

● max.poll.interval.ms :該值要大於 max.poll.records / (單個線程每秒消費的條數 * 消費線程的個數 ) 的值。

3、  儘量提高客户端的消費速度,將消費邏輯另起線程進行處理,並針對耗時進行監控。

4、  減少 Group 訂閲 Topic 的數量,一個 Group 訂閲的 Topic 最好不要超過5個,建議一個 Group 只訂閲一個 Topic。

主題訂閲關係

在訂閲關係方面,同一個 Consumer Group 內,建議客户端訂閲的 Topic 保持一致,即一個 Consumer Group 訂閲一個 Topic,這樣可以避免給排查問題帶來更多複雜度。

Consumer Group 訂閲多個 Topic

一個 Consumer Group 可以訂閲多個 Topic ,此時多個 Topic 的消息會被 Cosumer Group 中的 Consumer 均勻消費。例如 Consumer Group A 訂閲了 Topic A、Topic B、Topic C,則這三個 Topic 中的消息,被 Consumer Group 中的 Consumer 均勻消費。

Consumer Group 訂閲多個 Topic 的示例代碼如下:

String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
    subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);

Topic 被多個 Consumer Group 訂閲

一個 Topic 可以被多個 Consumer Group 訂閲,且各個 Consumer Group 獨立消費 Topic 下的所有消息。例如 Consumer Group A 訂閲了 Topic A,Consumer Group B 也訂閲了 Topic A,則發送到 Topic A 的每條消息,不僅會傳一份給 Consumer Group A 的消費實例,也會傳一份給 Consumer Group B 的消費實例,且這兩個過程相互獨立,相互沒有任何影響。

一個 Consumer Group 對應一個應用

建議一個 Consumer Group 對應一個應用,即不同的應用對應不同的代碼。如果您需要將不同的代碼寫在同一個應用中,請準備多份不同的 kafka.properties。例如:kafka1.properties、kafka2.properties。

消費位點解析

每個 Topic 會有多個分區,每個分區會統計當前消息的總條數,這個稱為最大位點 MaxOffset。

TDMQ CKafka 版的 Consumer 會按順序依次消費分區內的每條消息,記錄已經消費了的消息條數,稱為消費位點 ConsumerOffset。

剩餘的未消費的條數(也稱為消息堆積量)=MaxOffset-ConsumerOffset。

Offset 提交

TDMQ CKafka 版的 Consumer 有兩個相關參數:

● enable.auto.commit:默認值為 True。

● auto.commit.interval.ms: 默認值為5000,即5s。

這兩個參數組合的結果為:每次 Poll 數據前會先檢查上次提交位點的時間,如果距離當前時間已經超過參數 auto.commit.interval.ms 規定的時長,則客户端會啓動位點提交動作。

因此,如果將 enable.auto.commit 設置為 True,則需要在每次 Poll 數據時,確保前一次 Poll 出來的數據已經消費完畢,否則可能導致位點跳躍。

如果想自己控制位點提交,請把 enable.auto.commit 設為 False,並調用 Commit(Offsets) 函數自行控制位點提交。

注意:

儘量避免提交位點請求過於頻繁,否則容易導致 Broker CPU 很高,影響正常的服務。例如自動提交位點設置 auto.commit.interval.ms 為100ms,手動提交位點,在高吞吐場景下,每消費一條消息提交一個位點。

重置 Offset

以下兩種情況,會發生消費位點重置:

● 當服務端不存在曾經提交過的位點時(例如客户端第一次上線)。

● 當從非法位點拉取消息時(例如某個分區最大位點是10,但客户端卻從11開始拉取消息)。

Java 客户端可以通過 auto.offset.reset 來配置重置策略,主要有三種策略:

● Latest:從最大位點開始消費。

● Earliest:從最小位點開始消費。

● None:不做任何操作,即不重置。

説明:

建議設置成 Latest,而不要設置成 Earliest,避免因位點非法時從頭開始消費,從而造成大量重複。

如果是您自己管理位點,可以設置成 None。

拉取消息優化

拉取消息

消費過程是由客户端主動去服務端拉取消息的,在拉取大消息時需要控制拉取速度,注意以下參數設置:

● max.poll.records:如果單條消息超過1MB,建議設置為1。

● max.partition.fetch.bytes:設置為比單條消息的大小略大一點。

● fetch.max.bytes:設置為比單條消息的大小略大一點。

通過公網消費消息時,通常會因為公網帶寬的限制導致連接被斷開,此時需要注意控制拉取速度,修改配置:

● fetch.max.bytes:建議設置成公網帶寬的一半(注意該參數的單位是 bytes,公網帶寬的單位是 bits)。

● max.partition.fetch.bytes:建議設置成 fetch.max.bytes 的三分之一或者四分之一。

拉取大消息

消費過程是由客户端主動去服務端拉取消息的,在拉取大消息時,需要注意控制拉取速度,注意修改配置:

● max.poll.records:每次 Poll 獲取的最大消息數量。如果單條消息超過1MB,建議設置為1。

● fetch.max.bytes:設置比單條消息的大小略大一點。

● max.partition.fetch.bytes:設置比單條消息的大小略大一點。

拉取大消息的核心是逐條拉取。

消息異常處理

消息重複和消費冪等

TDMQ CKafka 版消費的語義是 at least once, 也就是至少投遞一次,保證消息不丟失,但是無法保證消息不重複。在出現網絡問題、客户端重啓時均有可能造成少量重複消息,此時應用消費端如果對消息重複比較敏感(例如訂單交易類),則應該做消息冪等。

以數據庫類應用為例,常用做法為:

  • 發送消息時,傳入 Key 作為唯一流水號 ID。
  • 消費消息時,判斷 Key 是否已經消費過,如果已經消費過了,則忽略,如果沒消費過,則消費一次。

當然,如果應用本身對少量消息重複不敏感,則不需要做此類冪等檢查。

消費失敗

TDMQ CKafka 版是按分區消息順序逐條向前推進消費的,如果消費端拿到某條消息後執行消費邏輯失敗,例如應用服務器出現了髒數據,導致某條消息處理失敗,等待人工干預,那麼有以下兩種處理方式:

失敗後一直嘗試再次執行消費邏輯。這種方式有可能造成消費線程阻塞在當前消息,無法向前推進,造成消息堆積。

由於 Kafka 沒有處理失敗消息的設計,實踐中通常會打印失敗的消息或者存儲到某個服務(例如創建一個 Topic 專門用來放失敗的消息),然後定時檢查失敗消息的情況,分析失敗原因,根據情況處理。

消費延遲

消費過程是由客户端主動去服務端拉取消息。一般情況下,如果客户端能夠及時消費,則不會產生較大延遲。若產生了較大延遲,請先關注是否有堆積,並注意提高消費速度。

消費堆積

通常造成消息堆積的原因是:

● 消費速度跟不上生產速度,此時應該提高消費速度。

● 消費端產生了阻塞。

● 消費端拿到消息後,執行消費邏輯,通常會執行一些遠程調用,如果這個時候同步等待結果,則有可能造成一直等待,消費進程無法向前推進。

消費端應該儘量避免堵塞消費線程,如果存在等待調用結果的情況,建議設置等待的超時時間,超時後作為消費失敗進行處理。

提高消費速度

方式1:增加 Consumer 實例個數提高並行處理能力,如果消費者和分區數已經1:1,可以考慮增加分區數(注意:對於 Flink 自動維護分區的場景不會自動感知新增分區後可能需要修改相關代碼後重啓)。 可以在進程內直接增加(需要保證每個實例對應一個線程),也可以部署多個消費實例進程。

説明:

實例個數超過分區數量後就不再能提高速度,將會有消費實例不工作。

方式2:增加消費線程。

  1. 定義一個線程池。
  2. Poll 數據。
  3. 把數據提交到線程池進行併發處理。
  4. 等併發結果返回成功後,再次 poll 數據執行。

消費某些分區不消費

消費者在消費過程中,可能遇到消費者在線,但是某些分區的位點一致不前進,可能原因如下:

  1. 遇到一條異常消息,可能是超大消息,格式異常,導致消費者拉取消息時候,轉換成業務位點。
  2. 使用公網帶寬,帶寬較小,拉取大消息時候直接把帶寬打滿,導致在超時時間內拉取不到消息。
  3. 消費者假死,導致不去拉取。

解決方式:

關掉消費者,在 TDMQ CKafka 版控制枱設置位點,跳過某些異常消息,或者優化消費代碼,然後重啓消費者消費。

消息訂閲模式

消息廣播

Kafka 目前沒有消息廣播的語義,可以通過創建不同的 Group 來模擬實現。

消息過濾

Kafka 自身沒有消息過濾的語義。實踐中可以採取以下兩個辦法:

● 如果過濾的種類不多,可以採取多個 Topic 的方式達到過濾的目的。

● 如果過濾的種類多,則最好在客户端業務層面自行過濾。

實踐中請根據業務具體情況進行選擇,也可以綜合運用上面兩種辦法。

總結:回顧要點,展望應用

通過前面的學習,我們系統地瞭解了生產消費的關鍵要點。在生產方面,從 Topic 的使用與創建,到分區數的估算、重試策略的設置,再到發送模式和參數的優化,以及 Key、Value 和分區策略的應用,每一個環節都需要我們精心配置,以確保高效、穩定的生產。在消費方面,消費的基本流程、負載均衡的實現、應對重均衡的策略、訂閲關係的管理、Offset 的管理與拉取策略,以及處理消息重複和消費失敗的方法,這些都是我們在消費過程中需要重點關注的內容。

這些生產消費知識在實際應用中具有巨大的價值。希望大家能將所學的生產消費知識運用到實際工作和生活中,不斷探索和實踐。也期待大家在實踐過程中,能總結出更多寶貴的經驗,歡迎在留言區分享你們的實踐故事和心得,讓我們一起共同成長,共同進步 。

user avatar cherish_5ad82c136df47 头像
点赞 1 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.