Spring Boot 整合 RabbitMQ 大幅簡化了開發流程,核心是通過
spring-boot-starter-amqp依賴封裝底層細節,通過RabbitTemplate和@RabbitListener實現消息收發。本文講解的四大模式覆蓋了多數業務場景:
- 工作隊列:多消費者負載均衡;
- 發佈訂閲:消息廣播;
- 路由:精確匹配篩選;
- 通配符:靈活多維度篩選。
一、整合前置準備:依賴與基礎配置
無論哪種模式,整合的前置步驟一致,核心是引入依賴並配置 RabbitMQ 連接信息。
1.1 引入 Maven 依賴
在 Spring Boot 項目的 pom.xml 中,添加 RabbitMQ 核心依賴與 Web 依賴(用於接口測試):
<!-- RabbitMQ 整合依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Web 依賴:用於編寫接口發送消息 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 測試依賴(可選) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
spring-boot-starter-amqp 內置了 RabbitMQ 客户端(默認 Lettuce),並封裝了 RabbitTemplate(消息發送模板)、@RabbitListener(消息監聽註解)等核心組件,無需額外引入客户端依賴。
1.2 配置 RabbitMQ 連接信息
在 src/main/resources/application.yml 中,配置 RabbitMQ 服務器地址、端口、虛擬主機等信息(與文檔中一致,使用雲服務器映射地址):
spring:
rabbitmq:
host: 110.41.51.65 # 服務器IP(文檔中示例地址)
port: 5672 # 端口(文檔中自定義為15673,默認5672)
username: study # 用户名(文檔中創建的專用用户)
password: study # 密碼(與用户名對應)
virtual-host: bite # 虛擬主機(文檔中創建的隔離空間,默認/)
# 可選:通過addresses簡化配置(二選一即可)
# addresses: amqp://study:study@110.41.51.65:15673/bite
配置後,Spring Boot 會自動創建 ConnectionFactory、RabbitTemplate 等 Bean,無需手動初始化。
二、模式一:工作隊列模式(Work Queues)
工作隊列模式通過多消費者競爭同一隊列的消息,實現任務負載均衡,適用於集羣環境下的異步任務處理(如 12306 短信通知)。
2.1 核心原理
- 生產者發送多條消息到隊列;
- 多個消費者監聽同一隊列,RabbitMQ 採用“輪詢”策略分配消息,每條消息僅被一個消費者處理;
- 無需交換機,使用 RabbitMQ 內置的默認交換機(空字符串)。
2.2 代碼實現
步驟1:聲明隊列(配置類)
通過 @Configuration 類聲明持久化隊列,確保服務重啓後隊列不丟失:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// 常量類:存儲隊列名稱(文檔中推薦統一管理)
class Constants {
public static final String WORK_QUEUE = "work_queue";
}
@Configuration
public class RabbitMQConfig {
// 聲明工作隊列(durable=true:持久化)
@Bean("workQueue")
public Queue workQueue() {
return QueueBuilder.durable(Constants.WORK_QUEUE).build();
}
}
步驟2:編寫生產者(接口發送消息)
通過 RabbitTemplate 發送消息,用 HTTP 接口觸發(方便測試):
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
// 注入Spring自動創建的RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 工作隊列模式:發送10條消息
@RequestMapping("/work")
public String sendWorkMessage() {
for (int i = 0; i < 10; i++) {
String msg = "Work Message " + i;
// 發送消息:默認交換機(空字符串),路由鍵=隊列名
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
}
return "工作隊列消息發送成功!";
}
}
步驟3:編寫消費者(監聽隊列)
通過 @RabbitListener 註解聲明消費者,多個消費者監聽同一隊列:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
// 消費者1:監聽work_queue隊列
@RabbitListener(queues = Constants.WORK_QUEUE)
public void listenWorkQueue1(Message message) {
String msg = new String(message.getBody());
System.out.println("消費者1接收:" + msg);
}
// 消費者2:監聽同一隊列(競爭消息)
@RabbitListener(queues = Constants.WORK_QUEUE)
public void listenWorkQueue2(Message message) {
String msg = new String(message.getBody());
System.out.println("消費者2接收:" + msg);
}
}
@RabbitListener 是 Spring AMQP 的核心註解,支持直接指定隊列名,無需手動綁定。
2.3 運行驗證
- 啓動項目:Spring Boot 會自動創建
work_queue隊列; - 發送消息:訪問接口
http://127.0.0.1:8080/producer/work,返回“工作隊列消息發送成功!”; - 觀察日誌:消費者1接收“0、2、4、6、8”,消費者2接收“1、3、5、7、9”,符合輪詢分配規則。
三、模式二:發佈訂閲模式(Publish/Subscribe)
發佈訂閲模式通過
fanout類型交換機,將消息廣播到所有綁定的隊列,適用於多系統同步接收消息(如氣象局推送天氣預報)。
3.1 核心原理
- 生產者發送消息到
fanout交換機; - 交換機將消息複製到所有綁定的隊列;
- 每個隊列的消費者都能接收完整消息,實現“一條消息多端消費”;
fanout交換機忽略路由鍵(Routing Key),綁定鍵(Binding Key)可設為空。
3.2 代碼實現
步驟1:聲明交換機、隊列與綁定關係
在 RabbitMQConfig 中添加交換機、隊列聲明,以及兩者的綁定:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
class Constants {
// 發佈訂閲模式:交換機+隊列名稱
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
}
@Configuration
public class RabbitMQConfig {
// 1. 聲明fanout交換機(durable=true:持久化)
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange(Constants.FANOUT_EXCHANGE, true, false);
}
// 2. 聲明兩個隊列
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
// 3. 綁定隊列1到交換機
@Bean
public Binding bindFanoutQueue1(
@Qualifier("fanoutExchange") FanoutExchange exchange,
@Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange); // 綁定鍵為空
}
// 4. 綁定隊列2到交換機
@Bean
public Binding bindFanoutQueue2(
@Qualifier("fanoutExchange") FanoutExchange exchange,
@Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
}
步驟2:編寫生產者(發送廣播消息)
在 ProducerController 中添加接口,發送消息到 fanout 交換機:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 發佈訂閲模式:發送廣播消息
@RequestMapping("/fanout")
public String sendFanoutMessage() {
String msg = "Hello Publish/Subscribe!";
// 發送消息到fanout交換機,路由鍵為空(fanout忽略路由鍵)
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
return "廣播消息發送成功!";
}
}
步驟3:編寫消費者(監聽不同隊列)
創建兩個消費者,分別監聽 fanout_queue1 和 fanout_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
// 消費者1:監聽fanout_queue1
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void listenFanoutQueue1(String msg) {
System.out.println("消費者1(fanout_queue1)接收:" + msg);
}
// 消費者2:監聽fanout_queue2
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void listenFanoutQueue2(String msg) {
System.out.println("消費者2(fanout_queue2)接收:" + msg);
}
}
3.3 運行驗證
- 發送消息:訪問
http://127.0.0.1:8080/producer/fanout; - 觀察日誌:兩個消費者均打印
Hello Publish/Subscribe!,消息被成功廣播; - 管理界面驗證:進入 RabbitMQ 管理界面(
http://110.41.51.65:15672),切換到bite虛擬主機,可看到fanout_exchange已綁定兩個隊列。
四、模式三:路由模式(Routing)
路由模式通過
direct類型交換機,按“路由鍵(Routing Key)完全匹配”篩選消息,適用於按類型分發消息(如日誌系統分級別處理)。
4.1 核心原理
- 生產者發送消息時指定
Routing Key; - 交換機類型為
direct,僅將消息路由到“綁定鍵(Binding Key與Routing Key完全匹配)”的隊列; - 一個隊列可綁定多個
Binding Key(如隊列綁定black和green,可接收兩種路由鍵的消息)。
4.2 代碼實現
步驟1:聲明交換機、隊列與綁定關係
在 RabbitMQConfig 中添加 direct 交換機、隊列及綁定(指定綁定鍵):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
class Constants {
// 路由模式:交換機+隊列名稱
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE1 = "direct_queue1"; // 綁定orange
public static final String DIRECT_QUEUE2 = "direct_queue2"; // 綁定black/green
}
@Configuration
public class RabbitMQConfig {
// 1. 聲明direct交換機
@Bean("directExchange")
public DirectExchange directExchange() {
return new DirectExchange(Constants.DIRECT_EXCHANGE, true, false);
}
// 2. 聲明兩個隊列
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
// 3. 綁定隊列1到交換機(綁定鍵=orange)
@Bean
public Binding bindDirectQueue1(
@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("orange");
}
// 4. 綁定隊列2到交換機(綁定鍵=black)
@Bean
public Binding bindDirectQueue2(
@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("black");
}
// 5. 綁定隊列2到交換機(綁定鍵=green)
@Bean
public Binding bindDirectQueue3(
@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("green");
}
}
步驟2:編寫生產者(指定Routing Key)
在 ProducerController 中添加接口,支持動態傳入 Routing Key:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 路由模式:按Routing Key發送消息
@RequestMapping("/direct")
public String sendDirectMessage(String routingKey) {
String msg = "Hello Routing! Key: " + routingKey;
// 發送消息到direct交換機,指定Routing Key
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, msg);
return "路由消息發送成功!Key:" + routingKey;
}
}
步驟3:編寫消費者(監聽不同隊列)
創建兩個消費者,分別監聽 direct_queue1 和 direct_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener {
// 消費者1:監聽direct_queue1(僅接收orange消息)
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void listenDirectQueue1(String msg) {
System.out.println("消費者1(direct_queue1)接收:" + msg);
}
// 消費者2:監聽direct_queue2(接收black/green消息)
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void listenDirectQueue2(String msg) {
System.out.println("消費者2(direct_queue2)接收:" + msg);
}
}
4.3 運行驗證
- 發送 orange 消息:訪問
http://127.0.0.1:8080/producer/direct?routingKey=orange,消費者1打印Hello Routing! Key: orange; - 發送 black 消息:訪問
http://127.0.0.1:8080/producer/direct?routingKey=black,消費者2打印Hello Routing! Key: black; - 發送 green 消息:訪問
http://127.0.0.1:8080/producer/direct?routingKey=green,消費者2打印Hello Routing! Key: green。
五、模式四:通配符模式(Topics)
通配符模式是路由模式的擴展,通過
topic類型交換機支持“通配符匹配”,適用於複雜的多維度消息篩選(如電商系統按“業務+操作+級別”路由)。
5.1 核心原理
Routing Key和Binding Key為“點分隔的單詞”(如order.pay.error);- 支持兩種通配符:
*:匹配一個單詞(如*.error匹配order.error,不匹配order.pay.error);#:匹配零個或多個單詞(如#.info匹配info、order.pay.info);
- 交換機類型為
topic,按通配符規則路由消息。
5.2 代碼實現
步驟1:聲明交換機、隊列與綁定關係
在 RabbitMQConfig 中添加 topic 交換機、隊列及通配符綁定:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
class Constants {
// 通配符模式:交換機+隊列名稱
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE1 = "topic_queue1"; // 綁定*.error
public static final String TOPIC_QUEUE2 = "topic_queue2"; // 綁定#.info/*.error
}
@Configuration
public class RabbitMQConfig {
// 1. 聲明topic交換機
@Bean("topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(Constants.TOPIC_EXCHANGE, true, false);
}
// 2. 聲明兩個隊列
@Bean("topicQueue1")
public Queue topicQueue1() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
// 3. 綁定隊列1到交換機(綁定鍵=*.error)
@Bean
public Binding bindTopicQueue1(
@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("*.error");
}
// 4. 綁定隊列2到交換機(綁定鍵=#.info)
@Bean
public Binding bindTopicQueue2(
@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("#.info");
}
// 5. 綁定隊列2到交換機(綁定鍵=*.error)
@Bean
public Binding bindTopicQueue3(
@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("*.error");
}
}
步驟2:編寫生產者(動態傳入Routing Key)
在 ProducerController 中添加接口,支持複雜格式的 Routing Key:
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 通配符模式:發送帶複雜Routing Key的消息
@RequestMapping("/topics")
public String sendTopicsMessage(String routingKey) {
String msg = "Hello Topics! Key: " + routingKey;
// 發送消息到topic交換機,指定Routing Key
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, msg);
return "通配符消息發送成功!Key:" + routingKey;
}
}
步驟3:編寫消費者(監聽不同隊列)
創建兩個消費者,分別監聽 topic_queue1 和 topic_queue2:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicListener {
// 消費者1:監聽topic_queue1(僅接收*.error消息)
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void listenTopicQueue1(String msg) {
System.out.println("消費者1(topic_queue1)接收:" + msg);
}
// 消費者2:監聽topic_queue2(接收#.info/*.error消息)
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void listenTopicQueue2(String msg) {
System.out.println("消費者2(topic_queue2)接收:" + msg);
}
}
5.3 運行驗證
- 發送 order.error 消息:訪問
http://127.0.0.1:8080/producer/topics?routingKey=order.error,消費者1和2均接收消息; - 發送 order.pay.info 消息:訪問
http://127.0.0.1:8080/producer/topics?routingKey=order.pay.info,僅消費者2接收消息; - 發送 pay.error 消息:訪問
http://127.0.0.1:8080/producer/topics?routingKey=pay.error,消費者1和2均接收消息。
六、整合核心組件與注意事項
6.1 核心組件總結
|
組件
|
作用
|
關鍵特性
|
|
|
消息發送模板
|
自動管理連接/通道,支持消息序列化
|
|
|
消息監聽註解
|
可加在類/方法上,支持多參數類型(String、Message等)
|
|
|
隊列構建工具
|
支持鏈式配置持久化、自動刪除等屬性
|
|
|
綁定構建工具
|
支持交換機與隊列的靈活綁定(指定綁定鍵)
|
6.2 注意事項
- 隊列/交換機持久化:生產環境需設置
durable=true,避免 RabbitMQ 重啓後組件丟失; - 消息序列化:若發送對象消息,需配置
Jackson2JsonMessageConverter(文檔中推薦 JSON 序列化); - 消費者啓動順序:建議先啓動消費者再發送消息,避免消息因無消費者監聽而丟失(或配置隊列持久化);
- 虛擬主機隔離:不同業務應使用獨立虛擬主機(如
order-vhost、log-vhost),通過配置virtual-host實現隔離。