EventBus是Guava中實現的用於發佈/訂閲模式的事件處理組件。

一、先來要給簡單的Demo

java一切皆對象,肯定有個事件對象。

Event.java

package com.cqsym.lmdw1.testguava;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class Event {
    private String message;
    public Event(String message){
        this.message = message;
    }

}

有了事件對象,肯定還有事件監聽或者事件訂閲一説了。

EventListener.java

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.Subscribe;

public class EventListener {

    @Subscribe
    public void listener(Event event){
        System.out.println("Subscribe 接收消息:" + event.getMessage());
    }

}

肯定有事件的發送或者發佈者。

MessageEventDemo.java

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.EventBus;

import java.util.concurrent.TimeUnit;

public class MessageEventDemo {

    public static void main(String[] args) throws InterruptedException {
        EventBus eventBus = new EventBus("testEventBus");
        //註冊監聽到eventBus
        eventBus.register(new EventListener());
        //通過eventBus發送兩筆消息
        eventBus.post(new Event("send message1"));
        eventBus.post(new Event("send message2"));
        Thread.sleep(2000L);
        eventBus.post(new Event("send message3"));
        TimeUnit.SECONDS.sleep(2L);
        eventBus.post(new Event("send message4"));
    }

}

Guava之EventBus_System

簡單説明下他的流程

1:肯定要定義一個事件類型。

2:還需要定義要給事件處理的方法。

3:通過new創建一個EventBus事件總線的對象,在這個對象上註冊要處理事件的方法的對象,然後利用事件總線對象.post()發送事件對象即可。


二、多種類型的事件監聽

一樣定義2個事件對象

EventA.java

package com.cqsym.lmdw1.testguava;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class EventA {
    private String message;
    public EventA(String message){
        this.message = message;
    }

}

EventB.java

package com.cqsym.lmdw1.testguava;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class EventB {
    private String message;
    public EventB(String message){
        this.message = message;
    }

}


通用定義2個事件處理器

EventListener.java

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.Subscribe;

public class EventListener {

    @Subscribe
    public void listenerA(EventA event){
        System.out.println("EventListener SubscribeA 接收消息:" + event.getMessage());
    }

    @Subscribe
    public void listenerB(EventB event){
        System.out.println("EventListener SubscribeB 接收消息:" + event.getMessage());
    }
}

MultipleEventListener.java

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.Subscribe;

public class MultipleEventListener {

    @Subscribe
    public void listenerA(EventA event){
        System.out.println("MultipleEventListener SubscribeA 接收消息:" + event.getMessage());
    }

    @Subscribe
    public void listenerB(EventB event){
        System.out.println("MultipleEventListener SubscribeB 接收消息:" + event.getMessage());
    }
}

MessageEventDemo.java

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.EventBus;

import java.util.concurrent.TimeUnit;

public class MessageEventDemo {

    public static void main(String[] args) throws InterruptedException {
        EventBus eventBus = new EventBus("testEventBus");
        //註冊監聽到eventBus
        eventBus.register(new EventListener());
        eventBus.register(new MultipleEventListener());
        //通過eventBus發送兩筆消息
        eventBus.post(new EventA("send message1"));
        eventBus.post(new EventA("send message2"));
        eventBus.post(new EventB("send message1"));
        eventBus.post(new EventB("send message2"));
        Thread.sleep(2000L);
        eventBus.post(new EventB("send message3"));
        eventBus.post(new EventA("send message3"));
        TimeUnit.SECONDS.sleep(2L);
        eventBus.post(new EventB("send message4"));
        eventBus.post(new EventA("send message4"));
    }

}

這裏對程序進行簡單的描述下過程,可以簡單理解為註冊了2個事件處理器,所有事件總線沒發送一個事件,都會經理這2個事件處理器來處理。這裏一個是EventListener,一個是MultipleEventListener。也是按照註冊時的先後順序來處理的。EventBus在多種類型的事件中,會根據事件類型來區分,把對應的事件發送給相應的訂閲者。

Guava之EventBus_事件處理_02


三、DeadEvent事件

在上個例子中,我們註釋掉EventListener和MultipleEventListener中的 關於事件B的定閲,在MultipleEventListener新增DeadEvent事件訂閲,其他不變。

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.Subscribe;

public class EventListener {

    @Subscribe
    public void listenerA(EventA event){
        System.out.println("EventListener SubscribeA 接收消息:" + event.getMessage());
    }

//    @Subscribe
//    public void listenerB(EventB event){
//        System.out.println("EventListener SubscribeB 接收消息:" + event.getMessage());
//    }
}
package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.Subscribe;

public class MultipleEventListener {

    @Subscribe
    public void listenerA(EventA event){
        System.out.println("MultipleEventListener SubscribeA 接收消息:" + event.getMessage());
    }

//    @Subscribe
//    public void listenerB(EventB event){
//        System.out.println("MultipleEventListener SubscribeB 接收消息:" + event.getMessage());
//    }

    @Subscribe
    public void listenerDeadEvent(DeadEvent deadEvent){
        System.out.println("deadEvent:" + deadEvent.getEvent());
    }
}

可以看出事件B在任何處理器中都沒有對他他這事件進行處理,所以執行結果中只有有事件B的都到了DeadEvent中去了

Guava之EventBus_System_03

可以看到EventA可以正常發送到訂閲者,EventB沒有訂閲者,會把消息發給DeadEvent

DeadEvent就是用來接收這種沒有訂閲者的消息。


四、異步

事件類定義

package com.cqsym.lmdw1.testguava;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private String orderStatus;
}

事件監聽器

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.Subscribe;

public class OrderEventListener {

    @Subscribe
    public void handleOrderEvent(OrderEvent event) {
        System.out.println("處理訂單事件: " + event.getOrderId()
                + ", 狀態: " + event.getOrderStatus());
        // 模擬處理時間
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

}

AsyncEventBus使用示例

package com.cqsym.lmdw1.testguava;

import com.google.common.eventbus.AsyncEventBus;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncEventBusDemo {
    public static void main(String[] args) {
        // 創建線程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 創建AsyncEventBus實例
        AsyncEventBus asyncEventBus = new AsyncEventBus("OrderEventBus", executor);

        // 註冊監聽器
        OrderEventListener listener = new OrderEventListener();
        asyncEventBus.register(listener);

        // 發佈事件
        System.out.println("開始發佈事件...");
        asyncEventBus.post(new OrderEvent("ORDER001", "CREATED"));
        asyncEventBus.post(new OrderEvent("ORDER002", "PAID"));
        asyncEventBus.post(new OrderEvent("ORDER003", "CREATED"));
        asyncEventBus.post(new OrderEvent("ORDER004", "CREATED"));
        asyncEventBus.post(new OrderEvent("ORDER005", "PAID"));
        asyncEventBus.post(new OrderEvent("ORDER006", "PAID"));
        asyncEventBus.post(new OrderEvent("ORDER007", "CREATED"));
        asyncEventBus.post(new OrderEvent("ORDER008", "CREATED"));

        System.out.println("事件發佈完成,繼續執行其他邏輯...");

        // 關閉線程池
        executor.shutdown();
    }
}

運行結果

Guava之EventBus_事件處理_04

可以看到事件一下子就發佈完成了(異步的,無需等待其處理)。


SpringBoot集成示例

@Configuration
public class EventBusConfig {
    
    @Bean
    public AsyncEventBus asyncEventBus() {
        return new AsyncEventBus("SpringBootEventBus", 
                                Executors.newFixedThreadPool(5));
    }
}

@Component
public class EventPublisher {
    
    @Autowired
    private AsyncEventBus asyncEventBus;
    
    public void publishOrderEvent(String orderId, String status) {
        asyncEventBus.post(new OrderEvent(orderId, status));
    }
}

@Component
public class EventSubscriber {
    
    @PostConstruct
    public void init() {
        // 註冊到事件總線
        asyncEventBus.register(this);
    }
    
    @Subscribe
    public void handleOrderEvent(OrderEvent event) {
        // 異步處理事件邏輯
        System.out.println("異步處理訂單: " + event.getOrderId());
    }
}