RabbitMQ 發佈/訂閲模式優化:避免消息丟失的關鍵配置

在 RabbitMQ 發佈/訂閲模式中,消息丟失通常發生在三個環節:生產者到交換機交換機到隊列消費者處理消息。以下是關鍵配置優化方案:


一、生產者端配置
  1. 發佈確認機制
    啓用 publisher confirms,確保消息成功到達交換機:
channel.confirm_delivery()  # 開啓確認模式
channel.basic_publish(
    exchange='logs',
    routing_key='',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化消息
    ),
    mandatory=True  # 確保消息被路由到隊列
)
  1. 消息持久化
    設置 delivery_mode=2,防止 RabbitMQ 重啓丟失消息:
properties = pika.BasicProperties(
    delivery_mode=2,  # 持久化標記
    content_type='text/plain'
)
  1. 失敗重試策略
    實現消息重發機制(如指數退避算法):
retry_count = 0
while retry_count < 3:
    try:
        channel.basic_publish(...)
        break
    except pika.exceptions.UnroutableError:
        retry_count += 1
        time.sleep(2 ** retry_count)

二、Broker 端配置
  1. 隊列持久化
    聲明隊列時設置 durable=True
channel.queue_declare(
    queue='log_queue',
    durable=True,  # 持久化隊列
    exclusive=False,
    auto_delete=False
)
  1. 交換機持久化
    聲明交換機時設置 durable=True
channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout',
    durable=True  # 持久化交換機
)
  1. 鏡像隊列(高可用)
    在集羣中啓用鏡像隊列,防止節點故障:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

三、消費者端配置
  1. 手動消息確認
    關閉自動確認,處理完成後手動 ACK:
channel.basic_consume(
    queue='log_queue',
    on_message_callback=callback,
    auto_ack=False  # 關閉自動確認
)

def callback(ch, method, properties, body):
    process_message(body)  # 業務處理
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動確認
  1. 預取數量控制
    設置 basic_qos 防止消息堆積:
channel.basic_qos(prefetch_count=1)  # 每次只處理一條消息
  1. 死信隊列(DLX)
    配置無法處理的消息重定向:
args = {"x-dead-letter-exchange": "dlx_logs"}
channel.queue_declare(
    queue='log_queue',
    durable=True,
    arguments=args  # 綁定死信交換機
)

四、監控與災備
  1. 啓用消息追蹤
    使用 Firehose 插件監控消息流:
rabbitmqctl trace_on
  1. 磁盤警報閾值
    防止磁盤寫滿導致消息丟失:
disk_free_limit.absolute = 2GB  # 設置最小磁盤空間
  1. 消息備份
    通過 Shovel 插件跨集羣同步消息:
shovel.myshovel = {
  src-uri = "amqp://src-server"
  dest-uri = "amqp://backup-server"
}

配置驗證步驟

  1. 重啓 RabbitMQ 服務,驗證持久化隊列/交換機是否保留
  2. 斷開消費者連接,檢查未確認消息是否重新入隊
  3. 模擬節點宕機,觀察鏡像隊列故障轉移
  4. 強制關閉生產者,驗證重試機制是否觸發

關鍵原則:持久化 + 確認機制 + 冗餘備份是防丟失的核心。根據業務場景在可靠性和性能間權衡(如非關鍵日誌可適當降低持久化級別)。