EventBus是Guava中實現的用於發佈/訂閲模式的事件處理組件。
一、先來要給簡單的Demo
java一切皆對象,肯定有個事件對象。
Event.java
package com.cqsym.lmdw1.testguava;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Event {
private String message;
public Event(String message){
this.message = message;
}
}
有了事件對象,肯定還有事件監聽或者事件訂閲一説了。
EventListener.java
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.Subscribe;
public class EventListener {
@Subscribe
public void listener(Event event){
System.out.println("Subscribe 接收消息:" + event.getMessage());
}
}
肯定有事件的發送或者發佈者。
MessageEventDemo.java
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.EventBus;
import java.util.concurrent.TimeUnit;
public class MessageEventDemo {
public static void main(String[] args) throws InterruptedException {
EventBus eventBus = new EventBus("testEventBus");
//註冊監聽到eventBus
eventBus.register(new EventListener());
//通過eventBus發送兩筆消息
eventBus.post(new Event("send message1"));
eventBus.post(new Event("send message2"));
Thread.sleep(2000L);
eventBus.post(new Event("send message3"));
TimeUnit.SECONDS.sleep(2L);
eventBus.post(new Event("send message4"));
}
}
簡單説明下他的流程
1:肯定要定義一個事件類型。
2:還需要定義要給事件處理的方法。
3:通過new創建一個EventBus事件總線的對象,在這個對象上註冊要處理事件的方法的對象,然後利用事件總線對象.post()發送事件對象即可。
二、多種類型的事件監聽
一樣定義2個事件對象
EventA.java
package com.cqsym.lmdw1.testguava;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class EventA {
private String message;
public EventA(String message){
this.message = message;
}
}
EventB.java
package com.cqsym.lmdw1.testguava;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class EventB {
private String message;
public EventB(String message){
this.message = message;
}
}
通用定義2個事件處理器
EventListener.java
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.Subscribe;
public class EventListener {
@Subscribe
public void listenerA(EventA event){
System.out.println("EventListener SubscribeA 接收消息:" + event.getMessage());
}
@Subscribe
public void listenerB(EventB event){
System.out.println("EventListener SubscribeB 接收消息:" + event.getMessage());
}
}
MultipleEventListener.java
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.Subscribe;
public class MultipleEventListener {
@Subscribe
public void listenerA(EventA event){
System.out.println("MultipleEventListener SubscribeA 接收消息:" + event.getMessage());
}
@Subscribe
public void listenerB(EventB event){
System.out.println("MultipleEventListener SubscribeB 接收消息:" + event.getMessage());
}
}
MessageEventDemo.java
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.EventBus;
import java.util.concurrent.TimeUnit;
public class MessageEventDemo {
public static void main(String[] args) throws InterruptedException {
EventBus eventBus = new EventBus("testEventBus");
//註冊監聽到eventBus
eventBus.register(new EventListener());
eventBus.register(new MultipleEventListener());
//通過eventBus發送兩筆消息
eventBus.post(new EventA("send message1"));
eventBus.post(new EventA("send message2"));
eventBus.post(new EventB("send message1"));
eventBus.post(new EventB("send message2"));
Thread.sleep(2000L);
eventBus.post(new EventB("send message3"));
eventBus.post(new EventA("send message3"));
TimeUnit.SECONDS.sleep(2L);
eventBus.post(new EventB("send message4"));
eventBus.post(new EventA("send message4"));
}
}
這裏對程序進行簡單的描述下過程,可以簡單理解為註冊了2個事件處理器,所有事件總線沒發送一個事件,都會經理這2個事件處理器來處理。這裏一個是EventListener,一個是MultipleEventListener。也是按照註冊時的先後順序來處理的。EventBus在多種類型的事件中,會根據事件類型來區分,把對應的事件發送給相應的訂閲者。
三、DeadEvent事件
在上個例子中,我們註釋掉EventListener和MultipleEventListener中的 關於事件B的定閲,在MultipleEventListener新增DeadEvent事件訂閲,其他不變。
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.Subscribe;
public class EventListener {
@Subscribe
public void listenerA(EventA event){
System.out.println("EventListener SubscribeA 接收消息:" + event.getMessage());
}
// @Subscribe
// public void listenerB(EventB event){
// System.out.println("EventListener SubscribeB 接收消息:" + event.getMessage());
// }
}
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.Subscribe;
public class MultipleEventListener {
@Subscribe
public void listenerA(EventA event){
System.out.println("MultipleEventListener SubscribeA 接收消息:" + event.getMessage());
}
// @Subscribe
// public void listenerB(EventB event){
// System.out.println("MultipleEventListener SubscribeB 接收消息:" + event.getMessage());
// }
@Subscribe
public void listenerDeadEvent(DeadEvent deadEvent){
System.out.println("deadEvent:" + deadEvent.getEvent());
}
}
可以看出事件B在任何處理器中都沒有對他他這事件進行處理,所以執行結果中只有有事件B的都到了DeadEvent中去了
可以看到EventA可以正常發送到訂閲者,EventB沒有訂閲者,會把消息發給DeadEvent
DeadEvent就是用來接收這種沒有訂閲者的消息。
四、異步
事件類定義
package com.cqsym.lmdw1.testguava;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private String orderId;
private String orderStatus;
}
事件監聽器
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.Subscribe;
public class OrderEventListener {
@Subscribe
public void handleOrderEvent(OrderEvent event) {
System.out.println("處理訂單事件: " + event.getOrderId()
+ ", 狀態: " + event.getOrderStatus());
// 模擬處理時間
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
AsyncEventBus使用示例
package com.cqsym.lmdw1.testguava;
import com.google.common.eventbus.AsyncEventBus;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncEventBusDemo {
public static void main(String[] args) {
// 創建線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 創建AsyncEventBus實例
AsyncEventBus asyncEventBus = new AsyncEventBus("OrderEventBus", executor);
// 註冊監聽器
OrderEventListener listener = new OrderEventListener();
asyncEventBus.register(listener);
// 發佈事件
System.out.println("開始發佈事件...");
asyncEventBus.post(new OrderEvent("ORDER001", "CREATED"));
asyncEventBus.post(new OrderEvent("ORDER002", "PAID"));
asyncEventBus.post(new OrderEvent("ORDER003", "CREATED"));
asyncEventBus.post(new OrderEvent("ORDER004", "CREATED"));
asyncEventBus.post(new OrderEvent("ORDER005", "PAID"));
asyncEventBus.post(new OrderEvent("ORDER006", "PAID"));
asyncEventBus.post(new OrderEvent("ORDER007", "CREATED"));
asyncEventBus.post(new OrderEvent("ORDER008", "CREATED"));
System.out.println("事件發佈完成,繼續執行其他邏輯...");
// 關閉線程池
executor.shutdown();
}
}
運行結果
可以看到事件一下子就發佈完成了(異步的,無需等待其處理)。
SpringBoot集成示例
@Configuration
public class EventBusConfig {
@Bean
public AsyncEventBus asyncEventBus() {
return new AsyncEventBus("SpringBootEventBus",
Executors.newFixedThreadPool(5));
}
}
@Component
public class EventPublisher {
@Autowired
private AsyncEventBus asyncEventBus;
public void publishOrderEvent(String orderId, String status) {
asyncEventBus.post(new OrderEvent(orderId, status));
}
}
@Component
public class EventSubscriber {
@PostConstruct
public void init() {
// 註冊到事件總線
asyncEventBus.register(this);
}
@Subscribe
public void handleOrderEvent(OrderEvent event) {
// 異步處理事件邏輯
System.out.println("異步處理訂單: " + event.getOrderId());
}
}