Spring Boot 整合 RabbitMQ 大幅簡化了開發流程,核心是通過 spring-boot-starter-amqp 依賴封裝底層細節,通過 RabbitTemplate@RabbitListener 實現消息收發。本文講解的四大模式覆蓋了多數業務場景:

  • 工作隊列:多消費者負載均衡;
  • 發佈訂閲:消息廣播;
  • 路由:精確匹配篩選;
  • 通配符:靈活多維度篩選。

一、整合前置準備:依賴與基礎配置

無論哪種模式,整合的前置步驟一致,核心是引入依賴並配置 RabbitMQ 連接信息。

1.1 引入 Maven 依賴

在 Spring Boot 項目的 pom.xml 中,添加 RabbitMQ 核心依賴與 Web 依賴(用於接口測試):

玩轉Spring Boot 集成篇(RabbitMQ) -_spring

<!-- RabbitMQ 整合依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Web 依賴:用於編寫接口發送消息 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 測試依賴(可選) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

spring-boot-starter-amqp 內置了 RabbitMQ 客户端(默認 Lettuce),並封裝了 RabbitTemplate(消息發送模板)、@RabbitListener(消息監聽註解)等核心組件,無需額外引入客户端依賴。

1.2 配置 RabbitMQ 連接信息

src/main/resources/application.yml 中,配置 RabbitMQ 服務器地址、端口、虛擬主機等信息(與文檔中一致,使用雲服務器映射地址):

spring:
  rabbitmq:
    host: 110.41.51.65    # 服務器IP(文檔中示例地址)
    port: 5672          # 端口(文檔中自定義為15673,默認5672)
    username: study       # 用户名(文檔中創建的專用用户)
    password: study       # 密碼(與用户名對應)
    virtual-host: bite    # 虛擬主機(文檔中創建的隔離空間,默認/)
    # 可選:通過addresses簡化配置(二選一即可)
    # addresses: amqp://study:study@110.41.51.65:15673/bite

配置後,Spring Boot 會自動創建 ConnectionFactoryRabbitTemplate 等 Bean,無需手動初始化。

二、模式一:工作隊列模式(Work Queues)

工作隊列模式通過多消費者競爭同一隊列的消息,實現任務負載均衡,適用於集羣環境下的異步任務處理(如 12306 短信通知)。

2.1 核心原理

  • 生產者發送多條消息到隊列;
  • 多個消費者監聽同一隊列,RabbitMQ 採用“輪詢”策略分配消息,每條消息僅被一個消費者處理;
  • 無需交換機,使用 RabbitMQ 內置的默認交換機(空字符串)。

2.2 代碼實現

步驟1:聲明隊列(配置類)

通過 @Configuration 類聲明持久化隊列,確保服務重啓後隊列不丟失:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

// 常量類:存儲隊列名稱(文檔中推薦統一管理)
class Constants {
    public static final String WORK_QUEUE = "work_queue";
}

@Configuration
public class RabbitMQConfig {
    // 聲明工作隊列(durable=true:持久化)
    @Bean("workQueue")
    public Queue workQueue() {
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    }
}
步驟2:編寫生產者(接口發送消息)

通過 RabbitTemplate 發送消息,用 HTTP 接口觸發(方便測試):

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/producer")
public class ProducerController {
    // 注入Spring自動創建的RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 工作隊列模式:發送10條消息
    @RequestMapping("/work")
    public String sendWorkMessage() {
        for (int i = 0; i < 10; i++) {
            String msg = "Work Message " + i;
            // 發送消息:默認交換機(空字符串),路由鍵=隊列名
            rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
        }
        return "工作隊列消息發送成功!";
    }
}
步驟3:編寫消費者(監聽隊列)

通過 @RabbitListener 註解聲明消費者,多個消費者監聽同一隊列:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkListener {
    // 消費者1:監聽work_queue隊列
    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void listenWorkQueue1(Message message) {
        String msg = new String(message.getBody());
        System.out.println("消費者1接收:" + msg);
    }

    // 消費者2:監聽同一隊列(競爭消息)
    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void listenWorkQueue2(Message message) {
        String msg = new String(message.getBody());
        System.out.println("消費者2接收:" + msg);
    }
}

@RabbitListener 是 Spring AMQP 的核心註解,支持直接指定隊列名,無需手動綁定。

2.3 運行驗證

  1. 啓動項目:Spring Boot 會自動創建 work_queue 隊列;
  2. 發送消息:訪問接口 http://127.0.0.1:8080/producer/work,返回“工作隊列消息發送成功!”;
  3. 觀察日誌:消費者1接收“0、2、4、6、8”,消費者2接收“1、3、5、7、9”,符合輪詢分配規則。

三、模式二:發佈訂閲模式(Publish/Subscribe)

發佈訂閲模式通過 fanout 類型交換機,將消息廣播到所有綁定的隊列,適用於多系統同步接收消息(如氣象局推送天氣預報)。

3.1 核心原理

  • 生產者發送消息到 fanout 交換機;
  • 交換機將消息複製到所有綁定的隊列;
  • 每個隊列的消費者都能接收完整消息,實現“一條消息多端消費”;
  • fanout 交換機忽略路由鍵(Routing Key),綁定鍵(Binding Key)可設為空。

3.2 代碼實現

步驟1:聲明交換機、隊列與綁定關係

RabbitMQConfig 中添加交換機、隊列聲明,以及兩者的綁定:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

class Constants {
    // 發佈訂閲模式:交換機+隊列名稱
    public static final String FANOUT_EXCHANGE = "fanout_exchange";
    public static final String FANOUT_QUEUE1 = "fanout_queue1";
    public static final String FANOUT_QUEUE2 = "fanout_queue2";
}

@Configuration
public class RabbitMQConfig {
    // 1. 聲明fanout交換機(durable=true:持久化)
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(Constants.FANOUT_EXCHANGE, true, false);
    }

    // 2. 聲明兩個隊列
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    }
    @Bean("fanoutQueue2")
    public Queue fanoutQueue2() {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    }

    // 3. 綁定隊列1到交換機
    @Bean
    public Binding bindFanoutQueue1(
            @Qualifier("fanoutExchange") FanoutExchange exchange,
            @Qualifier("fanoutQueue1") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange); // 綁定鍵為空
    }

    // 4. 綁定隊列2到交換機
    @Bean
    public Binding bindFanoutQueue2(
            @Qualifier("fanoutExchange") FanoutExchange exchange,
            @Qualifier("fanoutQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}
步驟2:編寫生產者(發送廣播消息)

ProducerController 中添加接口,發送消息到 fanout 交換機:

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 發佈訂閲模式:發送廣播消息
    @RequestMapping("/fanout")
    public String sendFanoutMessage() {
        String msg = "Hello Publish/Subscribe!";
        // 發送消息到fanout交換機,路由鍵為空(fanout忽略路由鍵)
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
        return "廣播消息發送成功!";
    }
}
步驟3:編寫消費者(監聽不同隊列)

創建兩個消費者,分別監聽 fanout_queue1fanout_queue2

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListener {
    // 消費者1:監聽fanout_queue1
    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void listenFanoutQueue1(String msg) {
        System.out.println("消費者1(fanout_queue1)接收:" + msg);
    }

    // 消費者2:監聽fanout_queue2
    @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    public void listenFanoutQueue2(String msg) {
        System.out.println("消費者2(fanout_queue2)接收:" + msg);
    }
}

3.3 運行驗證

  1. 發送消息:訪問 http://127.0.0.1:8080/producer/fanout
  2. 觀察日誌:兩個消費者均打印 Hello Publish/Subscribe!,消息被成功廣播;
  3. 管理界面驗證:進入 RabbitMQ 管理界面(http://110.41.51.65:15672),切換到 bite 虛擬主機,可看到 fanout_exchange 已綁定兩個隊列。

四、模式三:路由模式(Routing)

路由模式通過 direct 類型交換機,按“路由鍵(Routing Key)完全匹配”篩選消息,適用於按類型分發消息(如日誌系統分級別處理)。

4.1 核心原理

  • 生產者發送消息時指定 Routing Key
  • 交換機類型為 direct,僅將消息路由到“綁定鍵(Binding KeyRouting Key 完全匹配)”的隊列;
  • 一個隊列可綁定多個 Binding Key(如隊列綁定 blackgreen,可接收兩種路由鍵的消息)。

4.2 代碼實現

步驟1:聲明交換機、隊列與綁定關係

RabbitMQConfig 中添加 direct 交換機、隊列及綁定(指定綁定鍵):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

class Constants {
    // 路由模式:交換機+隊列名稱
    public static final String DIRECT_EXCHANGE = "direct_exchange";
    public static final String DIRECT_QUEUE1 = "direct_queue1"; // 綁定orange
    public static final String DIRECT_QUEUE2 = "direct_queue2"; // 綁定black/green
}

@Configuration
public class RabbitMQConfig {
    // 1. 聲明direct交換機
    @Bean("directExchange")
    public DirectExchange directExchange() {
        return new DirectExchange(Constants.DIRECT_EXCHANGE, true, false);
    }

    // 2. 聲明兩個隊列
    @Bean("directQueue1")
    public Queue directQueue1() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }
    @Bean("directQueue2")
    public Queue directQueue2() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }

    // 3. 綁定隊列1到交換機(綁定鍵=orange)
    @Bean
    public Binding bindDirectQueue1(
            @Qualifier("directExchange") DirectExchange exchange,
            @Qualifier("directQueue1") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("orange");
    }

    // 4. 綁定隊列2到交換機(綁定鍵=black)
    @Bean
    public Binding bindDirectQueue2(
            @Qualifier("directExchange") DirectExchange exchange,
            @Qualifier("directQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("black");
    }

    // 5. 綁定隊列2到交換機(綁定鍵=green)
    @Bean
    public Binding bindDirectQueue3(
            @Qualifier("directExchange") DirectExchange exchange,
            @Qualifier("directQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("green");
    }
}
步驟2:編寫生產者(指定Routing Key)

ProducerController 中添加接口,支持動態傳入 Routing Key

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 路由模式:按Routing Key發送消息
    @RequestMapping("/direct")
    public String sendDirectMessage(String routingKey) {
        String msg = "Hello Routing! Key: " + routingKey;
        // 發送消息到direct交換機,指定Routing Key
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, msg);
        return "路由消息發送成功!Key:" + routingKey;
    }
}
步驟3:編寫消費者(監聽不同隊列)

創建兩個消費者,分別監聽 direct_queue1direct_queue2

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectListener {
    // 消費者1:監聽direct_queue1(僅接收orange消息)
    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void listenDirectQueue1(String msg) {
        System.out.println("消費者1(direct_queue1)接收:" + msg);
    }

    // 消費者2:監聽direct_queue2(接收black/green消息)
    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void listenDirectQueue2(String msg) {
        System.out.println("消費者2(direct_queue2)接收:" + msg);
    }
}

4.3 運行驗證

  1. 發送 orange 消息:訪問 http://127.0.0.1:8080/producer/direct?routingKey=orange,消費者1打印 Hello Routing! Key: orange
  2. 發送 black 消息:訪問 http://127.0.0.1:8080/producer/direct?routingKey=black,消費者2打印 Hello Routing! Key: black
  3. 發送 green 消息:訪問 http://127.0.0.1:8080/producer/direct?routingKey=green,消費者2打印 Hello Routing! Key: green

五、模式四:通配符模式(Topics)

通配符模式是路由模式的擴展,通過 topic 類型交換機支持“通配符匹配”,適用於複雜的多維度消息篩選(如電商系統按“業務+操作+級別”路由)。

5.1 核心原理

  • Routing KeyBinding Key 為“點分隔的單詞”(如 order.pay.error);
  • 支持兩種通配符:
  • *:匹配一個單詞(如 *.error 匹配 order.error,不匹配 order.pay.error);
  • #:匹配零個或多個單詞(如 #.info 匹配 infoorder.pay.info);
  • 交換機類型為 topic,按通配符規則路由消息。

5.2 代碼實現

步驟1:聲明交換機、隊列與綁定關係

RabbitMQConfig 中添加 topic 交換機、隊列及通配符綁定:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

class Constants {
    // 通配符模式:交換機+隊列名稱
    public static final String TOPIC_EXCHANGE = "topic_exchange";
    public static final String TOPIC_QUEUE1 = "topic_queue1"; // 綁定*.error
    public static final String TOPIC_QUEUE2 = "topic_queue2"; // 綁定#.info/*.error
}

@Configuration
public class RabbitMQConfig {
    // 1. 聲明topic交換機
    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(Constants.TOPIC_EXCHANGE, true, false);
    }

    // 2. 聲明兩個隊列
    @Bean("topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }

    // 3. 綁定隊列1到交換機(綁定鍵=*.error)
    @Bean
    public Binding bindTopicQueue1(
            @Qualifier("topicExchange") TopicExchange exchange,
            @Qualifier("topicQueue1") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("*.error");
    }

    // 4. 綁定隊列2到交換機(綁定鍵=#.info)
    @Bean
    public Binding bindTopicQueue2(
            @Qualifier("topicExchange") TopicExchange exchange,
            @Qualifier("topicQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("#.info");
    }

    // 5. 綁定隊列2到交換機(綁定鍵=*.error)
    @Bean
    public Binding bindTopicQueue3(
            @Qualifier("topicExchange") TopicExchange exchange,
            @Qualifier("topicQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("*.error");
    }
}
步驟2:編寫生產者(動態傳入Routing Key)

ProducerController 中添加接口,支持複雜格式的 Routing Key

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 通配符模式:發送帶複雜Routing Key的消息
    @RequestMapping("/topics")
    public String sendTopicsMessage(String routingKey) {
        String msg = "Hello Topics! Key: " + routingKey;
        // 發送消息到topic交換機,指定Routing Key
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, msg);
        return "通配符消息發送成功!Key:" + routingKey;
    }
}
步驟3:編寫消費者(監聽不同隊列)

創建兩個消費者,分別監聽 topic_queue1topic_queue2

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicListener {
    // 消費者1:監聽topic_queue1(僅接收*.error消息)
    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void listenTopicQueue1(String msg) {
        System.out.println("消費者1(topic_queue1)接收:" + msg);
    }

    // 消費者2:監聽topic_queue2(接收#.info/*.error消息)
    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void listenTopicQueue2(String msg) {
        System.out.println("消費者2(topic_queue2)接收:" + msg);
    }
}

5.3 運行驗證

  1. 發送 order.error 消息:訪問 http://127.0.0.1:8080/producer/topics?routingKey=order.error,消費者1和2均接收消息;
  2. 發送 order.pay.info 消息:訪問 http://127.0.0.1:8080/producer/topics?routingKey=order.pay.info,僅消費者2接收消息;
  3. 發送 pay.error 消息:訪問 http://127.0.0.1:8080/producer/topics?routingKey=pay.error,消費者1和2均接收消息。

六、整合核心組件與注意事項

6.1 核心組件總結

組件

作用

關鍵特性

RabbitTemplate

消息發送模板

自動管理連接/通道,支持消息序列化

@RabbitListener

消息監聽註解

可加在類/方法上,支持多參數類型(String、Message等)

QueueBuilder

隊列構建工具

支持鏈式配置持久化、自動刪除等屬性

BindingBuilder

綁定構建工具

支持交換機與隊列的靈活綁定(指定綁定鍵)

6.2 注意事項

  1. 隊列/交換機持久化:生產環境需設置 durable=true,避免 RabbitMQ 重啓後組件丟失;
  2. 消息序列化:若發送對象消息,需配置 Jackson2JsonMessageConverter(文檔中推薦 JSON 序列化);
  3. 消費者啓動順序:建議先啓動消費者再發送消息,避免消息因無消費者監聽而丟失(或配置隊列持久化);
  4. 虛擬主機隔離:不同業務應使用獨立虛擬主機(如 order-vhostlog-vhost),通過配置 virtual-host 實現隔離。