RabbitMQ 發佈/訂閲模式優化:避免消息丟失的關鍵配置
在 RabbitMQ 發佈/訂閲模式中,消息丟失通常發生在三個環節:生產者到交換機、交換機到隊列、消費者處理消息。以下是關鍵配置優化方案:
一、生產者端配置
- 發佈確認機制
啓用publisher confirms,確保消息成功到達交換機:
channel.confirm_delivery() # 開啓確認模式
channel.basic_publish(
exchange='logs',
routing_key='',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
),
mandatory=True # 確保消息被路由到隊列
)
- 消息持久化
設置delivery_mode=2,防止 RabbitMQ 重啓丟失消息:
properties = pika.BasicProperties(
delivery_mode=2, # 持久化標記
content_type='text/plain'
)
- 失敗重試策略
實現消息重發機制(如指數退避算法):
retry_count = 0
while retry_count < 3:
try:
channel.basic_publish(...)
break
except pika.exceptions.UnroutableError:
retry_count += 1
time.sleep(2 ** retry_count)
二、Broker 端配置
- 隊列持久化
聲明隊列時設置durable=True:
channel.queue_declare(
queue='log_queue',
durable=True, # 持久化隊列
exclusive=False,
auto_delete=False
)
- 交換機持久化
聲明交換機時設置durable=True:
channel.exchange_declare(
exchange='logs',
exchange_type='fanout',
durable=True # 持久化交換機
)
- 鏡像隊列(高可用)
在集羣中啓用鏡像隊列,防止節點故障:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
三、消費者端配置
- 手動消息確認
關閉自動確認,處理完成後手動 ACK:
channel.basic_consume(
queue='log_queue',
on_message_callback=callback,
auto_ack=False # 關閉自動確認
)
def callback(ch, method, properties, body):
process_message(body) # 業務處理
ch.basic_ack(delivery_tag=method.delivery_tag) # 手動確認
- 預取數量控制
設置basic_qos防止消息堆積:
channel.basic_qos(prefetch_count=1) # 每次只處理一條消息
- 死信隊列(DLX)
配置無法處理的消息重定向:
args = {"x-dead-letter-exchange": "dlx_logs"}
channel.queue_declare(
queue='log_queue',
durable=True,
arguments=args # 綁定死信交換機
)
四、監控與災備
- 啓用消息追蹤
使用 Firehose 插件監控消息流:
rabbitmqctl trace_on
- 磁盤警報閾值
防止磁盤寫滿導致消息丟失:
disk_free_limit.absolute = 2GB # 設置最小磁盤空間
- 消息備份
通過 Shovel 插件跨集羣同步消息:
shovel.myshovel = {
src-uri = "amqp://src-server"
dest-uri = "amqp://backup-server"
}
配置驗證步驟
- 重啓 RabbitMQ 服務,驗證持久化隊列/交換機是否保留
- 斷開消費者連接,檢查未確認消息是否重新入隊
- 模擬節點宕機,觀察鏡像隊列故障轉移
- 強制關閉生產者,驗證重試機制是否觸發
關鍵原則:持久化 + 確認機制 + 冗餘備份是防丟失的核心。根據業務場景在可靠性和性能間權衡(如非關鍵日誌可適當降低持久化級別)。
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。