動態

詳情 返回 返回

單體架構中的事件驅動架構:Java應用程序的漸進式重構 - 動態 詳情

傳統觀點認為事件驅動架構屬於微服務架構範疇,服務通過消息代理進行異步通信。然而,事件驅動模式一些最具價值的應用恰恰發生在單體應用程序內部——在這些地方,緊密耦合已造成維護噩夢,而漸進式重構則提供了一條通往更好架構的路徑,且無需分佈式系統的運維複雜性。

為何在單體應用中使用事件有意義

傳統的分層單體應用存在一個特定問題:直接的方法調用在組件之間創建了僵化的依賴關係。您的訂單處理代碼直接調用庫存管理,庫存管理又調用倉庫系統,繼而觸發電子郵件通知。每個組件都瞭解其他幾個組件,從而形成一個糾纏不清的網,更改其中一部分需要理解並測試它所觸及的所有內容。

事件驅動模式引入了間接性。當下單時,訂單服務發佈一個"OrderPlaced"事件。其他對訂單感興趣的組件——庫存、發貨、通知——訂閲此事件並獨立響應。訂單服務不知道也不關心誰在監聽。即使這些組件存在於同一個代碼庫並共享同一個數據庫,它們也變得鬆散耦合。

這種方法提供了立竿見影的好處,而無需將應用程序拆分為微服務。您在保持單體應用運維簡單性的同時,獲得了可測試性、靈活性和更清晰的邊界。當您最終需要提取服務時,事件驅動的結構使得過渡更加平滑,因為組件已經通過定義良好的消息進行通信,而不是直接的方法調用。

起點:一個緊密耦合的訂單系統

考慮一個使用 Spring Boot 構建的典型電子商務單體應用。訂單創建流程如下所示:

@Service
@Transactional
public class OrderService {
    private final OrderRepository orderRepository;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final ShippingService shippingService;
    private final LoyaltyService loyaltyService;
    private final EmailService emailService;
    private final AnalyticsService analyticsService;
     
    public OrderService(
        OrderRepository orderRepository,
        InventoryService inventoryService,
        PaymentService paymentService,
        ShippingService shippingService,
        LoyaltyService loyaltyService,
        EmailService emailService,
        AnalyticsService analyticsService
    ) {
        this.orderRepository = orderRepository;
        this.inventoryService = inventoryService;
        this.paymentService = paymentService;
        this.shippingService = shippingService;
        this.loyaltyService = loyaltyService;
        this.emailService = emailService;
        this.analyticsService = analyticsService;
    }
     
    public Order createOrder(CreateOrderRequest request) {
        // 驗證庫存
        for (OrderItem item : request.getItems()) {
            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
                throw new InsufficientInventoryException(item.getProductId());
            }
        }
         
        // 處理支付
        PaymentResult payment = paymentService.processPayment(
            request.getCustomerId(),
            calculateTotal(request.getItems()),
            request.getPaymentDetails()
        );
         
        if (!payment.isSuccessful()) {
            throw new PaymentFailedException(payment.getErrorMessage());
        }
         
        // 創建訂單
        Order order = new Order(
            request.getCustomerId(),
            request.getItems(),
            payment.getTransactionId()
        );
        order.setStatus(OrderStatus.CONFIRMED);
        Order savedOrder = orderRepository.save(order);
         
        // 預留庫存
        for (OrderItem item : request.getItems()) {
            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
        }
         
        // 創建發貨單
        shippingService.createShipment(savedOrder);
         
        // 更新忠誠度積分
        loyaltyService.addPoints(
            request.getCustomerId(),
            calculateLoyaltyPoints(savedOrder)
        );
         
        // 發送確認郵件
        emailService.sendOrderConfirmation(savedOrder);
         
        // 跟蹤分析
        analyticsService.trackOrderPlaced(savedOrder);
         
        return savedOrder;
    }
}

這段代碼可以工作,但存在嚴重問題。OrderService 知道七個不同的服務。測試需要模擬所有這些服務。添加新的訂單後操作意味着要修改此方法。如果電子郵件服務緩慢,訂單創建就會變慢。如果分析跟蹤失敗,整個訂單就會失敗並回滾。

事務邊界也是錯誤的。所有操作都在單個數據庫事務中發生,這意味着即使電子郵件服務臨時停機也會阻止訂單創建。庫存預留和發貨單創建在事務上耦合,儘管它們在邏輯上是獨立的操作。

引入 Spring 應用事件

Spring Framework 提供了一個內置的事件系統,在單個 JVM 內工作。默認情況下它是同步的,這使得它易於推理和調試。首先定義領域事件:

public abstract class DomainEvent {
    private final Instant occurredAt;
    private final String eventId;
     
    protected DomainEvent() {
        this.occurredAt = Instant.now();
        this.eventId = UUID.randomUUID().toString();
    }
     
    public Instant getOccurredAt() {
        return occurredAt;
    }
     
    public String getEventId() {
        return eventId;
    }
}
 
public class OrderPlacedEvent extends DomainEvent {
    private final Long orderId;
    private final Long customerId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
     
    public OrderPlacedEvent(Order order) {
        super();
        this.orderId = order.getId();
        this.customerId = order.getCustomerId();
        this.items = new ArrayList<>(order.getItems());
        this.totalAmount = order.getTotalAmount();
    }
     
    // Getters
}

事件應該是不可變的,幷包含訂閲者需要的所有信息。避免直接傳遞實體——而是複製相關數據。這可以防止訂閲者意外修改共享狀態。

重構 OrderService 以發佈事件,而不是直接調用服務:

@Service
@Transactional
public class OrderService {
    private final OrderRepository orderRepository;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final ApplicationEventPublisher eventPublisher;
     
    public OrderService(
        OrderRepository orderRepository,
        InventoryService inventoryService,
        PaymentService paymentService,
        ApplicationEventPublisher eventPublisher
    ) {
        this.orderRepository = orderRepository;
        this.inventoryService = inventoryService;
        this.paymentService = paymentService;
        this.eventPublisher = eventPublisher;
    }
     
    public Order createOrder(CreateOrderRequest request) {
        // 驗證庫存
        for (OrderItem item : request.getItems()) {
            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
                throw new InsufficientInventoryException(item.getProductId());
            }
        }
         
        // 處理支付
        PaymentResult payment = paymentService.processPayment(
            request.getCustomerId(),
            calculateTotal(request.getItems()),
            request.getPaymentDetails()
        );
         
        if (!payment.isSuccessful()) {
            throw new PaymentFailedException(payment.getErrorMessage());
        }
         
        // 創建並保存訂單
        Order order = new Order(
            request.getCustomerId(),
            request.getItems(),
            payment.getTransactionId()
        );
        order.setStatus(OrderStatus.CONFIRMED);
        Order savedOrder = orderRepository.save(order);
         
        // 同步預留庫存(仍在關鍵路徑上)
        for (OrderItem item : request.getItems()) {
            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
        }
         
        // 為非關鍵操作發佈事件
        eventPublisher.publishEvent(new OrderPlacedEvent(savedOrder));
         
        return savedOrder;
    }
}

現在 OrderService 僅依賴四個組件,而不是八個。更重要的是,它只瞭解對訂單創建至關重要的操作——庫存驗證、支付處理和庫存預留。其他所有操作都通過事件發生。

為解耦的操作創建事件監聽器:

@Component
public class OrderEventListeners {
    private static final Logger logger = LoggerFactory.getLogger(OrderEventListeners.class);
     
    private final ShippingService shippingService;
    private final LoyaltyService loyaltyService;
    private final EmailService emailService;
    private final AnalyticsService analyticsService;
     
    public OrderEventListeners(
        ShippingService shippingService,
        LoyaltyService loyaltyService,
        EmailService emailService,
        AnalyticsService analyticsService
    ) {
        this.shippingService = shippingService;
        this.loyaltyService = loyaltyService;
        this.emailService = emailService;
        this.analyticsService = analyticsService;
    }
     
    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            shippingService.createShipment(event.getOrderId());
        } catch (Exception e) {
            logger.error("Failed to create shipment for order {}", event.getOrderId(), e);
            // 不要重新拋出 - 其他監聽器仍應執行
        }
    }
     
    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void updateLoyaltyPoints(OrderPlacedEvent event) {
        try {
            int points = calculatePoints(event.getTotalAmount());
            loyaltyService.addPoints(event.getCustomerId(), points);
        } catch (Exception e) {
            logger.error("Failed to update loyalty points for order {}", event.getOrderId(), e);
        }
    }
     
    @EventListener
    public void sendConfirmationEmail(OrderPlacedEvent event) {
        try {
            emailService.sendOrderConfirmation(event.getOrderId());
        } catch (Exception e) {
            logger.error("Failed to send confirmation email for order {}", event.getOrderId(), e);
        }
    }
     
    @EventListener
    public void trackAnalytics(OrderPlacedEvent event) {
        try {
            analyticsService.trackOrderPlaced(event.getOrderId(), event.getTotalAmount());
        } catch (Exception e) {
            logger.error("Failed to track analytics for order {}", event.getOrderId(), e);
        }
    }
}

每個監聽器在它自己的事務中運行(在適當的時候)並獨立處理故障。如果發送電子郵件失敗,發貨單創建仍然會發生。即使分析跟蹤拋出異常,訂單創建事務也會成功提交。

理解事務邊界

@Transactional(propagation = Propagation.REQUIRES_NEW) 註解至關重要。沒有它,所有監聽器都會參與訂單創建事務。如果任何監聽器失敗,整個訂單都會回滾——這正是我們試圖避免的情況。

使用 REQUIRES_NEW,每個監聽器都會啓動一個新的事務。當監聽器運行時,訂單已經提交。這意味着:

  • 監聽器無法阻止訂單創建
  • 監聽器故障不會回滾訂單
  • 每個監聽器的工作是獨立原子性的

但這有一個權衡。如果監聽器失敗,訂單存在但某些後處理沒有發生。您需要處理這些部分故障的策略:

@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderPlaced(OrderPlacedEvent event) {
    try {
        shippingService.createShipment(event.getOrderId());
    } catch (Exception e) {
        logger.error("Failed to create shipment for order {}", event.getOrderId(), e);
         
        // 記錄失敗以便重試
        failedEventRepository.save(new FailedEvent(
            event.getClass().getSimpleName(),
            event.getEventId(),
            "handleOrderPlaced",
            e.getMessage()
        ));
    }
}

一個單獨的後台作業可以重試失敗的事件:

@Component
public class FailedEventRetryJob {
    private final FailedEventRepository failedEventRepository;
    private final ApplicationEventPublisher eventPublisher;
     
    @Scheduled(fixedDelay = 60000) // 每分鐘
    public void retryFailedEvents() {
        List failures = failedEventRepository.findRetryable();
         
        for (FailedEvent failure : failures) {
            try {
                // 重建並重新發布事件
                DomainEvent event = reconstructEvent(failure);
                eventPublisher.publishEvent(event);
                 
                failure.markRetried();
                failedEventRepository.save(failure);
            } catch (Exception e) {
                logger.warn("Retry failed for event {}", failure.getEventId(), e);
                failure.incrementRetryCount();
                failedEventRepository.save(failure);
            }
        }
    }
}

這種模式提供了最終一致性——系統可能暫時不一致,但通過重試自行恢復。

轉向異步事件

Spring 的 @EventListener 默認是同步的。事件處理髮生在發佈事件的同一線程中,發佈者等待所有監聽器完成。這提供了強有力的保證,但限制了可擴展性。

通過啓用異步支持並註解監聽器來使監聽器異步:

@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean(name = "eventExecutor")
    public Executor eventExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("event-");
        executor.initialize();
        return executor;
    }
}
 
@Component
public class OrderEventListeners {
    // ... 依賴 ...
     
    @Async("eventExecutor")
    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        shippingService.createShipment(event.getOrderId());
    }
     
    @Async("eventExecutor")
    @EventListener
    public void sendConfirmationEmail(OrderPlacedEvent event) {
        emailService.sendOrderConfirmation(event.getOrderId());
    }
}

使用 @AsynccreateOrder() 方法在發佈事件後立即返回。監聽器在線程池中併發執行。這顯著提高了響應時間——訂單創建不再等待電子郵件發送或分析跟蹤。

但異步事件引入了新的複雜性。當監聽器執行時,訂單創建事務可能尚未提交。監聽器可能嘗試從數據庫加載訂單,但由於事務仍在進行中而找不到它。

Spring 提供了 @TransactionalEventListener 來處理這種情況:

@Component
public class OrderEventListeners {
    @Async("eventExecutor")
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // 這僅在訂單創建事務成功提交後運行
        shippingService.createShipment(event.getOrderId());
    }
}

AFTER_COMMIT 階段確保監聽器僅在發佈事務成功提交後運行。如果訂單創建失敗並回滾,監聽器永遠不會執行。這可以防止處理實際上不存在的訂單的事件。

實現事件存儲

隨着事件驅動架構的成熟,存儲事件變得有價值。事件存儲提供了審計日誌,支持調試,並支持更復雜的模式,如事件溯源。

創建一個簡單的事件存儲:

@Entity
@Table(name = "domain_events")
public class StoredEvent {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
     
    @Column(nullable = false)
    private String eventId;
     
    @Column(nullable = false)
    private String eventType;
     
    @Column(nullable = false, columnDefinition = "TEXT")
    private String payload;
     
    @Column(nullable = false)
    private Instant occurredAt;
     
    @Column(nullable = false)
    private Instant storedAt;
     
    @Column
    private String aggregateId;
     
    @Column
    private String aggregateType;
     
    // 構造器、getter、setter
}
 
@Repository
public interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {
    List<StoredEvent> findByAggregateIdOrderByOccurredAt(String aggregateId);
    List<StoredEvent> findByEventType(String eventType);
}

攔截並存儲所有領域事件:

@Component
public class EventStoreListener {
    private final StoredEventRepository repository;
    private final ObjectMapper objectMapper;
     
    public EventStoreListener(StoredEventRepository repository, ObjectMapper objectMapper) {
        this.repository = repository;
        this.objectMapper = objectMapper;
    }
     
    @EventListener
    @Order(Ordered.HIGHEST_PRECEDENCE) // 在其他監聽器之前存儲
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void storeEvent(DomainEvent event) {
        try {
            StoredEvent stored = new StoredEvent();
            stored.setEventId(event.getEventId());
            stored.setEventType(event.getClass().getSimpleName());
            stored.setPayload(objectMapper.writeValueAsString(event));
            stored.setOccurredAt(event.getOccurredAt());
            stored.setStoredAt(Instant.now());
             
            // 如果可用,提取聚合信息
            if (event instanceof OrderPlacedEvent) {
                OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
                stored.setAggregateId(orderEvent.getOrderId().toString());
                stored.setAggregateType("Order");
            }
             
            repository.save(stored);
        } catch (JsonProcessingException e) {
            throw new EventStoreException("Failed to serialize event", e);
        }
    }
}

現在,每個領域事件在業務邏輯處理之前都會持久化。您可以通過重放事件來重建系統中發生的情況:

@Service
public class OrderHistoryService {
    private final StoredEventRepository eventRepository;
     
    public List<OrderEvent> getOrderHistory(Long orderId) {
        List<StoredEvent> events = eventRepository.findByAggregateIdOrderByOccurredAt(
            orderId.toString()
        );
         
        return events.stream()
            .map(this::deserializeEvent)
            .collect(Collectors.toList());
    }
     
    private OrderEvent deserializeEvent(StoredEvent stored) {
        // 根據事件類型反序列化
        try {
            Class<?> eventClass = Class.forName("com.example.events." + stored.getEventType());
            return (OrderEvent) objectMapper.readValue(stored.getPayload(), eventClass);
        } catch (Exception e) {
            throw new EventStoreException("Failed to deserialize event", e);
        }
    }
}

這實現了強大的調試能力。當客户報告其訂單問題時,您可以準確看到發生了什麼事件以及發生的順序。

Saga 和補償操作

某些工作流需要跨多個步驟進行協調,其中每個步驟都可能失敗。傳統方法使用分佈式事務,但這些方法擴展性不佳且增加了複雜性。Saga 使用編排事件和補償操作提供了一種替代方案。

考慮一個更復雜的訂單流程,您需要:

  1. 預留庫存
  2. 處理支付
  3. 創建發貨單

如果在預留庫存後支付失敗,您需要釋放預留。通過補償事件實現這一點:

public class InventoryReservedEvent extends DomainEvent {
    private final Long orderId;
    private final List<ReservationDetail> reservations;
     
    // 構造器、getter
}
 
public class PaymentFailedEvent extends DomainEvent {
    private final Long orderId;
    private final String reason;
     
    // 構造器、getter
}
 
@Component
public class InventorySagaHandler {
    private final InventoryService inventoryService;
     
    @EventListener
    public void handlePaymentFailed(PaymentFailedEvent event) {
        // 補償操作:釋放預留庫存
        inventoryService.releaseReservation(event.getOrderId());
    }
}

Saga 通過事件而不是中央協調器進行協調:

@Service
public class OrderSagaService {
    private final ApplicationEventPublisher eventPublisher;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
     
    public void processOrder(Order order) {
        // 步驟 1: 預留庫存
        List<ReservationDetail> reservations = inventoryService.reserve(order.getItems());
        eventPublisher.publishEvent(new InventoryReservedEvent(order.getId(), reservations));
         
        try {
            // 步驟 2: 處理支付
            PaymentResult payment = paymentService.processPayment(order);
             
            if (payment.isSuccessful()) {
                eventPublisher.publishEvent(new PaymentSucceededEvent(order.getId(), payment));
            } else {
                // 觸發補償
                eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), payment.getReason()));
                throw new PaymentException(payment.getReason());
            }
        } catch (Exception e) {
            // 觸發補償
            eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), e.getMessage()));
            throw e;
        }
    }
}

這種模式在沒有分佈式事務的情況下保持了一致性。每個步驟發佈記錄所發生事件的事件。當發生故障時,補償事件會觸發撤銷先前步驟的操作。

橋接到外部消息代理

隨着單體應用的增長,您可能希望與外部系統集成或為最終的服務提取做準備。Spring Cloud Stream 提供了對 RabbitMQ 或 Kafka 等消息代理的抽象,同時保持相同的事件驅動模式:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

application.yml 中配置綁定:

spring:
  cloud:
    stream:
      bindings:
        orderPlaced-out-0:
          destination: order.placed
        orderPlaced-in-0:
          destination: order.placed
          group: order-processors
      kafka:
        binder:
          brokers: localhost:9092

創建內部事件和外部消息之間的橋接:

@Component
public class EventPublisher {
    private final StreamBridge streamBridge;
     
    public EventPublisher(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
     
    @EventListener
    public void publishToExternalBroker(OrderPlacedEvent event) {
        // 將內部事件發佈到外部消息代理
        streamBridge.send("orderPlaced-out-0", event);
    }
}
 
@Component
public class ExternalEventConsumer {
    private final ApplicationEventPublisher eventPublisher;
     
    public ExternalEventConsumer(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
     
    @Bean
    public Consumer<OrderPlacedEvent> orderPlaced() {
        return event -> {
            // 將外部事件重新發布為內部事件
            eventPublisher.publishEvent(event);
        };
    }
}

這種模式讓您可以選擇性地將事件發佈到外部,同時將內部事件保留在本地。關鍵的實時操作使用內部事件以實現低延遲。跨服務通信使用消息代理以實現可靠性和可擴展性。

監控與可觀測性

事件驅動系統引入了新的可觀測性挑戰。理解正在發生的情況需要跨多個異步處理步驟跟蹤事件。實施全面的日誌記錄和指標:

@Aspect
@Component
public class EventMonitoringAspect {
    private static final Logger logger = LoggerFactory.getLogger(EventMonitoringAspect.class);
    private final MeterRegistry meterRegistry;
     
    public EventMonitoringAspect(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
     
    @Around("@annotation(org.springframework.context.event.EventListener)")
    public Object monitorEventListener(ProceedingJoinPoint joinPoint) throws Throwable {
        String listenerName = joinPoint.getSignature().getName();
        Object[] args = joinPoint.getArgs();
        DomainEvent event = (DomainEvent) args[0];
         
        Timer.Sample sample = Timer.start(meterRegistry);
         
        try {
            logger.info("Processing event {} in listener {}", 
                event.getEventId(), listenerName);
             
            Object result = joinPoint.proceed();
             
            sample.stop(Timer.builder("event.listener.duration")
                .tag("listener", listenerName)
                .tag("event_type", event.getClass().getSimpleName())
                .tag("status", "success")
                .register(meterRegistry));
             
            meterRegistry.counter("event.listener.processed",
                "listener", listenerName,
                "event_type", event.getClass().getSimpleName(),
                "status", "success"
            ).increment();
             
            return result;
        } catch (Exception e) {
            sample.stop(Timer.builder("event.listener.duration")
                .tag("listener", listenerName)
                .tag("event_type", event.getClass().getSimpleName())
                .tag("status", "failure")
                .register(meterRegistry));
             
            meterRegistry.counter("event.listener.processed",
                "listener", listenerName,
                "event_type", event.getClass().getSimpleName(),
                "status", "failure"
            ).increment();
             
            logger.error("Error processing event {} in listener {}", 
                event.getEventId(), listenerName, e);
             
            throw e;
        }
    }
}

這個切面自動跟蹤每個事件監聽器的執行時間和成功率。結合 Prometheus 和 Grafana 等工具,您可以可視化事件處理模式並識別瓶頸。

添加關聯 ID 以跟蹤系統中的事件:

public abstract class DomainEvent {
    private final Instant occurredAt;
    private final String eventId;
    private final String correlationId;
     
    protected DomainEvent(String correlationId) {
        this.occurredAt = Instant.now();
        this.eventId = UUID.randomUUID().toString();
        this.correlationId = correlationId != null ? correlationId : UUID.randomUUID().toString();
    }
     
    // Getters
}

通過事件鏈傳播關聯 ID:

@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
    MDC.put("correlationId", event.getCorrelationId());
     
    try {
        // 執行工作
         
        // 發佈具有相同關聯 ID 的後續事件
        eventPublisher.publishEvent(new ShipmentCreatedEvent(
            event.getOrderId(),
            event.getCorrelationId()
        ));
    } finally {
        MDC.clear();
    }
}

現在,與單個訂單流相關的所有日誌消息共享一個關聯 ID,使得跨多個異步操作跟蹤整個工作流變得微不足道。

測試事件驅動代碼

事件驅動架構需要不同的測試策略。傳統的單元測試適用於單個監聽器,但集成測試對於驗證事件流變得更加重要:

@SpringBootTest
@TestConfiguration
public class OrderEventIntegrationTest {
    @Autowired
    private ApplicationEventPublisher eventPublisher;
     
    @Autowired
    private ShippingService shippingService;
     
    @Autowired
    private EmailService emailService;
     
    @Test
    public void shouldProcessOrderPlacedEventCompletely() throws Exception {
        // 給定
        Order order = createTestOrder();
        OrderPlacedEvent event = new OrderPlacedEvent(order);
         
        // 當
        eventPublisher.publishEvent(event);
         
        // 等待異步處理
        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
            // 然後
            verify(shippingService).createShipment(order.getId());
            verify(emailService).sendOrderConfirmation(order.getId());
        });
    }
}

對於單元測試,注入一個間諜事件發佈器以驗證事件是否正確發佈:

@ExtendWith(MockitoExtension.class)
public class OrderServiceTest {
    @Mock
    private OrderRepository orderRepository;
     
    @Mock
    private InventoryService inventoryService;
     
    @Mock
    private PaymentService paymentService;
     
    @Spy
    private ApplicationEventPublisher eventPublisher = new SimpleApplicationEventPublisher();
     
    @InjectMocks
    private OrderService orderService;
     
    @Test
    public void shouldPublishOrderPlacedEventAfterCreatingOrder() {
        // 給定
        CreateOrderRequest request = createValidRequest();
         
        when(inventoryService.checkAvailability(any(), anyInt())).thenReturn(true);
        when(paymentService.processPayment(any(), any(), any()))
            .thenReturn(PaymentResult.successful("txn-123"));
        when(orderRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
         
        // 當
        orderService.createOrder(request);
         
        // 然後
        verify(eventPublisher).publishEvent(argThat(event -> 
            event instanceof OrderPlacedEvent
        ));
    }
}

遷移之旅

將單體應用重構為使用事件驅動架構並非全有或全無的命題。從一個工作流開始——通常是造成最多痛苦的那個。識別可以事件驅動的直接服務調用,並逐步引入事件。

從同步事件開始,以最小化行為變更。一旦事件正確流動,為非關鍵監聽器切換到異步處理。當您需要審計跟蹤或調試能力時,添加事件存儲。僅當您需要跨服務通信或準備提取微服務時,才集成外部消息代理。

目標不是實現完美的事件驅動架構。而是減少耦合、提高可測試性,並在組件之間創建更清晰的邊界。即使是部分採用也能提供價值——具有一些事件驅動模式的單體應用比完全沒有的模式更易於維護。

這種漸進式方法使您能夠持續交付價值,而不是投入一個需要數月時間、直到完全結束時才能交付任何成果的重構項目。您能夠了解在特定領域和團隊中哪些方法有效,根據實際經驗而非理論理想來調整實施策略。

有用的資源

  • Spring 應用事件:https://docs.spring.io/spring-framework/reference/core/beans/...
    關於應用事件系統以及如何有效使用它的官方 Spring 文檔。
  • Spring Cloud Stream:https://spring.io/projects/spring-cloud-stream
    用於構建事件驅動微服務的框架,在橋接到外部消息代理時很有用。
  • Eric Evans 的《領域驅動設計》:https://www.domainlanguage.com/ddd/
    理解事件驅動系統中的領域事件和限界上下文的必讀材料。
  • 企業集成模式:https://www.enterpriseintegrationpatterns.com/
    適用於事件驅動架構的消息傳遞模式的全面目錄。
  • Micrometer:https://micrometer.io/
    用於監控事件處理和系統行為的應用指標庫。
  • Awaitility:https://github.com/awaitility/awaitility
    用於異步系統的測試庫,對於事件驅動代碼的集成測試至關重要。

【注】本文譯自: Event-Driven Architecture in Monoliths: Incremental Refactoring for Java Apps - Java Code Geeks

user avatar xiaoniuhululu 頭像 lenglingx 頭像 jiangyi 頭像 chuanghongdengdeqingwa_eoxet2 頭像 yizhidanshendetielian 頭像 lu_lu 頭像 flydean 頭像 changqingdezi 頭像 xiaoxiansheng_5e75673e1ae30 頭像 pottercoding 頭像 junxiudedoujiang 頭像 litongjava 頭像
點贊 15 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.