傳統觀點認為事件驅動架構屬於微服務架構範疇,服務通過消息代理進行異步通信。然而,事件驅動模式一些最具價值的應用恰恰發生在單體應用程序內部——在這些地方,緊密耦合已造成維護噩夢,而漸進式重構則提供了一條通往更好架構的路徑,且無需分佈式系統的運維複雜性。
為何在單體應用中使用事件有意義
傳統的分層單體應用存在一個特定問題:直接的方法調用在組件之間創建了僵化的依賴關係。您的訂單處理代碼直接調用庫存管理,庫存管理又調用倉庫系統,繼而觸發電子郵件通知。每個組件都瞭解其他幾個組件,從而形成一個糾纏不清的網,更改其中一部分需要理解並測試它所觸及的所有內容。
事件驅動模式引入了間接性。當下單時,訂單服務發佈一個"OrderPlaced"事件。其他對訂單感興趣的組件——庫存、發貨、通知——訂閲此事件並獨立響應。訂單服務不知道也不關心誰在監聽。即使這些組件存在於同一個代碼庫並共享同一個數據庫,它們也變得鬆散耦合。
這種方法提供了立竿見影的好處,而無需將應用程序拆分為微服務。您在保持單體應用運維簡單性的同時,獲得了可測試性、靈活性和更清晰的邊界。當您最終需要提取服務時,事件驅動的結構使得過渡更加平滑,因為組件已經通過定義良好的消息進行通信,而不是直接的方法調用。
起點:一個緊密耦合的訂單系統
考慮一個使用 Spring Boot 構建的典型電子商務單體應用。訂單創建流程如下所示:
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ShippingService shippingService;
private final LoyaltyService loyaltyService;
private final EmailService emailService;
private final AnalyticsService analyticsService;
public OrderService(
OrderRepository orderRepository,
InventoryService inventoryService,
PaymentService paymentService,
ShippingService shippingService,
LoyaltyService loyaltyService,
EmailService emailService,
AnalyticsService analyticsService
) {
this.orderRepository = orderRepository;
this.inventoryService = inventoryService;
this.paymentService = paymentService;
this.shippingService = shippingService;
this.loyaltyService = loyaltyService;
this.emailService = emailService;
this.analyticsService = analyticsService;
}
public Order createOrder(CreateOrderRequest request) {
// 驗證庫存
for (OrderItem item : request.getItems()) {
if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
throw new InsufficientInventoryException(item.getProductId());
}
}
// 處理支付
PaymentResult payment = paymentService.processPayment(
request.getCustomerId(),
calculateTotal(request.getItems()),
request.getPaymentDetails()
);
if (!payment.isSuccessful()) {
throw new PaymentFailedException(payment.getErrorMessage());
}
// 創建訂單
Order order = new Order(
request.getCustomerId(),
request.getItems(),
payment.getTransactionId()
);
order.setStatus(OrderStatus.CONFIRMED);
Order savedOrder = orderRepository.save(order);
// 預留庫存
for (OrderItem item : request.getItems()) {
inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
}
// 創建發貨單
shippingService.createShipment(savedOrder);
// 更新忠誠度積分
loyaltyService.addPoints(
request.getCustomerId(),
calculateLoyaltyPoints(savedOrder)
);
// 發送確認郵件
emailService.sendOrderConfirmation(savedOrder);
// 跟蹤分析
analyticsService.trackOrderPlaced(savedOrder);
return savedOrder;
}
}
這段代碼可以工作,但存在嚴重問題。OrderService 知道七個不同的服務。測試需要模擬所有這些服務。添加新的訂單後操作意味着要修改此方法。如果電子郵件服務緩慢,訂單創建就會變慢。如果分析跟蹤失敗,整個訂單就會失敗並回滾。
事務邊界也是錯誤的。所有操作都在單個數據庫事務中發生,這意味着即使電子郵件服務臨時停機也會阻止訂單創建。庫存預留和發貨單創建在事務上耦合,儘管它們在邏輯上是獨立的操作。
引入 Spring 應用事件
Spring Framework 提供了一個內置的事件系統,在單個 JVM 內工作。默認情況下它是同步的,這使得它易於推理和調試。首先定義領域事件:
public abstract class DomainEvent {
private final Instant occurredAt;
private final String eventId;
protected DomainEvent() {
this.occurredAt = Instant.now();
this.eventId = UUID.randomUUID().toString();
}
public Instant getOccurredAt() {
return occurredAt;
}
public String getEventId() {
return eventId;
}
}
public class OrderPlacedEvent extends DomainEvent {
private final Long orderId;
private final Long customerId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
public OrderPlacedEvent(Order order) {
super();
this.orderId = order.getId();
this.customerId = order.getCustomerId();
this.items = new ArrayList<>(order.getItems());
this.totalAmount = order.getTotalAmount();
}
// Getters
}
事件應該是不可變的,幷包含訂閲者需要的所有信息。避免直接傳遞實體——而是複製相關數據。這可以防止訂閲者意外修改共享狀態。
重構 OrderService 以發佈事件,而不是直接調用服務:
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ApplicationEventPublisher eventPublisher;
public OrderService(
OrderRepository orderRepository,
InventoryService inventoryService,
PaymentService paymentService,
ApplicationEventPublisher eventPublisher
) {
this.orderRepository = orderRepository;
this.inventoryService = inventoryService;
this.paymentService = paymentService;
this.eventPublisher = eventPublisher;
}
public Order createOrder(CreateOrderRequest request) {
// 驗證庫存
for (OrderItem item : request.getItems()) {
if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
throw new InsufficientInventoryException(item.getProductId());
}
}
// 處理支付
PaymentResult payment = paymentService.processPayment(
request.getCustomerId(),
calculateTotal(request.getItems()),
request.getPaymentDetails()
);
if (!payment.isSuccessful()) {
throw new PaymentFailedException(payment.getErrorMessage());
}
// 創建並保存訂單
Order order = new Order(
request.getCustomerId(),
request.getItems(),
payment.getTransactionId()
);
order.setStatus(OrderStatus.CONFIRMED);
Order savedOrder = orderRepository.save(order);
// 同步預留庫存(仍在關鍵路徑上)
for (OrderItem item : request.getItems()) {
inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
}
// 為非關鍵操作發佈事件
eventPublisher.publishEvent(new OrderPlacedEvent(savedOrder));
return savedOrder;
}
}
現在 OrderService 僅依賴四個組件,而不是八個。更重要的是,它只瞭解對訂單創建至關重要的操作——庫存驗證、支付處理和庫存預留。其他所有操作都通過事件發生。
為解耦的操作創建事件監聽器:
@Component
public class OrderEventListeners {
private static final Logger logger = LoggerFactory.getLogger(OrderEventListeners.class);
private final ShippingService shippingService;
private final LoyaltyService loyaltyService;
private final EmailService emailService;
private final AnalyticsService analyticsService;
public OrderEventListeners(
ShippingService shippingService,
LoyaltyService loyaltyService,
EmailService emailService,
AnalyticsService analyticsService
) {
this.shippingService = shippingService;
this.loyaltyService = loyaltyService;
this.emailService = emailService;
this.analyticsService = analyticsService;
}
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
shippingService.createShipment(event.getOrderId());
} catch (Exception e) {
logger.error("Failed to create shipment for order {}", event.getOrderId(), e);
// 不要重新拋出 - 其他監聽器仍應執行
}
}
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void updateLoyaltyPoints(OrderPlacedEvent event) {
try {
int points = calculatePoints(event.getTotalAmount());
loyaltyService.addPoints(event.getCustomerId(), points);
} catch (Exception e) {
logger.error("Failed to update loyalty points for order {}", event.getOrderId(), e);
}
}
@EventListener
public void sendConfirmationEmail(OrderPlacedEvent event) {
try {
emailService.sendOrderConfirmation(event.getOrderId());
} catch (Exception e) {
logger.error("Failed to send confirmation email for order {}", event.getOrderId(), e);
}
}
@EventListener
public void trackAnalytics(OrderPlacedEvent event) {
try {
analyticsService.trackOrderPlaced(event.getOrderId(), event.getTotalAmount());
} catch (Exception e) {
logger.error("Failed to track analytics for order {}", event.getOrderId(), e);
}
}
}
每個監聽器在它自己的事務中運行(在適當的時候)並獨立處理故障。如果發送電子郵件失敗,發貨單創建仍然會發生。即使分析跟蹤拋出異常,訂單創建事務也會成功提交。
理解事務邊界
@Transactional(propagation = Propagation.REQUIRES_NEW) 註解至關重要。沒有它,所有監聽器都會參與訂單創建事務。如果任何監聽器失敗,整個訂單都會回滾——這正是我們試圖避免的情況。
使用 REQUIRES_NEW,每個監聽器都會啓動一個新的事務。當監聽器運行時,訂單已經提交。這意味着:
- 監聽器無法阻止訂單創建
- 監聽器故障不會回滾訂單
- 每個監聽器的工作是獨立原子性的
但這有一個權衡。如果監聽器失敗,訂單存在但某些後處理沒有發生。您需要處理這些部分故障的策略:
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
shippingService.createShipment(event.getOrderId());
} catch (Exception e) {
logger.error("Failed to create shipment for order {}", event.getOrderId(), e);
// 記錄失敗以便重試
failedEventRepository.save(new FailedEvent(
event.getClass().getSimpleName(),
event.getEventId(),
"handleOrderPlaced",
e.getMessage()
));
}
}
一個單獨的後台作業可以重試失敗的事件:
@Component
public class FailedEventRetryJob {
private final FailedEventRepository failedEventRepository;
private final ApplicationEventPublisher eventPublisher;
@Scheduled(fixedDelay = 60000) // 每分鐘
public void retryFailedEvents() {
List failures = failedEventRepository.findRetryable();
for (FailedEvent failure : failures) {
try {
// 重建並重新發布事件
DomainEvent event = reconstructEvent(failure);
eventPublisher.publishEvent(event);
failure.markRetried();
failedEventRepository.save(failure);
} catch (Exception e) {
logger.warn("Retry failed for event {}", failure.getEventId(), e);
failure.incrementRetryCount();
failedEventRepository.save(failure);
}
}
}
}
這種模式提供了最終一致性——系統可能暫時不一致,但通過重試自行恢復。
轉向異步事件
Spring 的 @EventListener 默認是同步的。事件處理髮生在發佈事件的同一線程中,發佈者等待所有監聽器完成。這提供了強有力的保證,但限制了可擴展性。
通過啓用異步支持並註解監聽器來使監聽器異步:
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "eventExecutor")
public Executor eventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("event-");
executor.initialize();
return executor;
}
}
@Component
public class OrderEventListeners {
// ... 依賴 ...
@Async("eventExecutor")
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderPlaced(OrderPlacedEvent event) {
shippingService.createShipment(event.getOrderId());
}
@Async("eventExecutor")
@EventListener
public void sendConfirmationEmail(OrderPlacedEvent event) {
emailService.sendOrderConfirmation(event.getOrderId());
}
}
使用 @Async,createOrder() 方法在發佈事件後立即返回。監聽器在線程池中併發執行。這顯著提高了響應時間——訂單創建不再等待電子郵件發送或分析跟蹤。
但異步事件引入了新的複雜性。當監聽器執行時,訂單創建事務可能尚未提交。監聽器可能嘗試從數據庫加載訂單,但由於事務仍在進行中而找不到它。
Spring 提供了 @TransactionalEventListener 來處理這種情況:
@Component
public class OrderEventListeners {
@Async("eventExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderPlaced(OrderPlacedEvent event) {
// 這僅在訂單創建事務成功提交後運行
shippingService.createShipment(event.getOrderId());
}
}
AFTER_COMMIT 階段確保監聽器僅在發佈事務成功提交後運行。如果訂單創建失敗並回滾,監聽器永遠不會執行。這可以防止處理實際上不存在的訂單的事件。
實現事件存儲
隨着事件驅動架構的成熟,存儲事件變得有價值。事件存儲提供了審計日誌,支持調試,並支持更復雜的模式,如事件溯源。
創建一個簡單的事件存儲:
@Entity
@Table(name = "domain_events")
public class StoredEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String eventId;
@Column(nullable = false)
private String eventType;
@Column(nullable = false, columnDefinition = "TEXT")
private String payload;
@Column(nullable = false)
private Instant occurredAt;
@Column(nullable = false)
private Instant storedAt;
@Column
private String aggregateId;
@Column
private String aggregateType;
// 構造器、getter、setter
}
@Repository
public interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {
List<StoredEvent> findByAggregateIdOrderByOccurredAt(String aggregateId);
List<StoredEvent> findByEventType(String eventType);
}
攔截並存儲所有領域事件:
@Component
public class EventStoreListener {
private final StoredEventRepository repository;
private final ObjectMapper objectMapper;
public EventStoreListener(StoredEventRepository repository, ObjectMapper objectMapper) {
this.repository = repository;
this.objectMapper = objectMapper;
}
@EventListener
@Order(Ordered.HIGHEST_PRECEDENCE) // 在其他監聽器之前存儲
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void storeEvent(DomainEvent event) {
try {
StoredEvent stored = new StoredEvent();
stored.setEventId(event.getEventId());
stored.setEventType(event.getClass().getSimpleName());
stored.setPayload(objectMapper.writeValueAsString(event));
stored.setOccurredAt(event.getOccurredAt());
stored.setStoredAt(Instant.now());
// 如果可用,提取聚合信息
if (event instanceof OrderPlacedEvent) {
OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
stored.setAggregateId(orderEvent.getOrderId().toString());
stored.setAggregateType("Order");
}
repository.save(stored);
} catch (JsonProcessingException e) {
throw new EventStoreException("Failed to serialize event", e);
}
}
}
現在,每個領域事件在業務邏輯處理之前都會持久化。您可以通過重放事件來重建系統中發生的情況:
@Service
public class OrderHistoryService {
private final StoredEventRepository eventRepository;
public List<OrderEvent> getOrderHistory(Long orderId) {
List<StoredEvent> events = eventRepository.findByAggregateIdOrderByOccurredAt(
orderId.toString()
);
return events.stream()
.map(this::deserializeEvent)
.collect(Collectors.toList());
}
private OrderEvent deserializeEvent(StoredEvent stored) {
// 根據事件類型反序列化
try {
Class<?> eventClass = Class.forName("com.example.events." + stored.getEventType());
return (OrderEvent) objectMapper.readValue(stored.getPayload(), eventClass);
} catch (Exception e) {
throw new EventStoreException("Failed to deserialize event", e);
}
}
}
這實現了強大的調試能力。當客户報告其訂單問題時,您可以準確看到發生了什麼事件以及發生的順序。
Saga 和補償操作
某些工作流需要跨多個步驟進行協調,其中每個步驟都可能失敗。傳統方法使用分佈式事務,但這些方法擴展性不佳且增加了複雜性。Saga 使用編排事件和補償操作提供了一種替代方案。
考慮一個更復雜的訂單流程,您需要:
- 預留庫存
- 處理支付
- 創建發貨單
如果在預留庫存後支付失敗,您需要釋放預留。通過補償事件實現這一點:
public class InventoryReservedEvent extends DomainEvent {
private final Long orderId;
private final List<ReservationDetail> reservations;
// 構造器、getter
}
public class PaymentFailedEvent extends DomainEvent {
private final Long orderId;
private final String reason;
// 構造器、getter
}
@Component
public class InventorySagaHandler {
private final InventoryService inventoryService;
@EventListener
public void handlePaymentFailed(PaymentFailedEvent event) {
// 補償操作:釋放預留庫存
inventoryService.releaseReservation(event.getOrderId());
}
}
Saga 通過事件而不是中央協調器進行協調:
@Service
public class OrderSagaService {
private final ApplicationEventPublisher eventPublisher;
private final InventoryService inventoryService;
private final PaymentService paymentService;
public void processOrder(Order order) {
// 步驟 1: 預留庫存
List<ReservationDetail> reservations = inventoryService.reserve(order.getItems());
eventPublisher.publishEvent(new InventoryReservedEvent(order.getId(), reservations));
try {
// 步驟 2: 處理支付
PaymentResult payment = paymentService.processPayment(order);
if (payment.isSuccessful()) {
eventPublisher.publishEvent(new PaymentSucceededEvent(order.getId(), payment));
} else {
// 觸發補償
eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), payment.getReason()));
throw new PaymentException(payment.getReason());
}
} catch (Exception e) {
// 觸發補償
eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), e.getMessage()));
throw e;
}
}
}
這種模式在沒有分佈式事務的情況下保持了一致性。每個步驟發佈記錄所發生事件的事件。當發生故障時,補償事件會觸發撤銷先前步驟的操作。
橋接到外部消息代理
隨着單體應用的增長,您可能希望與外部系統集成或為最終的服務提取做準備。Spring Cloud Stream 提供了對 RabbitMQ 或 Kafka 等消息代理的抽象,同時保持相同的事件驅動模式:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
在 application.yml 中配置綁定:
spring:
cloud:
stream:
bindings:
orderPlaced-out-0:
destination: order.placed
orderPlaced-in-0:
destination: order.placed
group: order-processors
kafka:
binder:
brokers: localhost:9092
創建內部事件和外部消息之間的橋接:
@Component
public class EventPublisher {
private final StreamBridge streamBridge;
public EventPublisher(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@EventListener
public void publishToExternalBroker(OrderPlacedEvent event) {
// 將內部事件發佈到外部消息代理
streamBridge.send("orderPlaced-out-0", event);
}
}
@Component
public class ExternalEventConsumer {
private final ApplicationEventPublisher eventPublisher;
public ExternalEventConsumer(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Bean
public Consumer<OrderPlacedEvent> orderPlaced() {
return event -> {
// 將外部事件重新發布為內部事件
eventPublisher.publishEvent(event);
};
}
}
這種模式讓您可以選擇性地將事件發佈到外部,同時將內部事件保留在本地。關鍵的實時操作使用內部事件以實現低延遲。跨服務通信使用消息代理以實現可靠性和可擴展性。
監控與可觀測性
事件驅動系統引入了新的可觀測性挑戰。理解正在發生的情況需要跨多個異步處理步驟跟蹤事件。實施全面的日誌記錄和指標:
@Aspect
@Component
public class EventMonitoringAspect {
private static final Logger logger = LoggerFactory.getLogger(EventMonitoringAspect.class);
private final MeterRegistry meterRegistry;
public EventMonitoringAspect(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Around("@annotation(org.springframework.context.event.EventListener)")
public Object monitorEventListener(ProceedingJoinPoint joinPoint) throws Throwable {
String listenerName = joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
DomainEvent event = (DomainEvent) args[0];
Timer.Sample sample = Timer.start(meterRegistry);
try {
logger.info("Processing event {} in listener {}",
event.getEventId(), listenerName);
Object result = joinPoint.proceed();
sample.stop(Timer.builder("event.listener.duration")
.tag("listener", listenerName)
.tag("event_type", event.getClass().getSimpleName())
.tag("status", "success")
.register(meterRegistry));
meterRegistry.counter("event.listener.processed",
"listener", listenerName,
"event_type", event.getClass().getSimpleName(),
"status", "success"
).increment();
return result;
} catch (Exception e) {
sample.stop(Timer.builder("event.listener.duration")
.tag("listener", listenerName)
.tag("event_type", event.getClass().getSimpleName())
.tag("status", "failure")
.register(meterRegistry));
meterRegistry.counter("event.listener.processed",
"listener", listenerName,
"event_type", event.getClass().getSimpleName(),
"status", "failure"
).increment();
logger.error("Error processing event {} in listener {}",
event.getEventId(), listenerName, e);
throw e;
}
}
}
這個切面自動跟蹤每個事件監聽器的執行時間和成功率。結合 Prometheus 和 Grafana 等工具,您可以可視化事件處理模式並識別瓶頸。
添加關聯 ID 以跟蹤系統中的事件:
public abstract class DomainEvent {
private final Instant occurredAt;
private final String eventId;
private final String correlationId;
protected DomainEvent(String correlationId) {
this.occurredAt = Instant.now();
this.eventId = UUID.randomUUID().toString();
this.correlationId = correlationId != null ? correlationId : UUID.randomUUID().toString();
}
// Getters
}
通過事件鏈傳播關聯 ID:
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
MDC.put("correlationId", event.getCorrelationId());
try {
// 執行工作
// 發佈具有相同關聯 ID 的後續事件
eventPublisher.publishEvent(new ShipmentCreatedEvent(
event.getOrderId(),
event.getCorrelationId()
));
} finally {
MDC.clear();
}
}
現在,與單個訂單流相關的所有日誌消息共享一個關聯 ID,使得跨多個異步操作跟蹤整個工作流變得微不足道。
測試事件驅動代碼
事件驅動架構需要不同的測試策略。傳統的單元測試適用於單個監聽器,但集成測試對於驗證事件流變得更加重要:
@SpringBootTest
@TestConfiguration
public class OrderEventIntegrationTest {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private ShippingService shippingService;
@Autowired
private EmailService emailService;
@Test
public void shouldProcessOrderPlacedEventCompletely() throws Exception {
// 給定
Order order = createTestOrder();
OrderPlacedEvent event = new OrderPlacedEvent(order);
// 當
eventPublisher.publishEvent(event);
// 等待異步處理
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
// 然後
verify(shippingService).createShipment(order.getId());
verify(emailService).sendOrderConfirmation(order.getId());
});
}
}
對於單元測試,注入一個間諜事件發佈器以驗證事件是否正確發佈:
@ExtendWith(MockitoExtension.class)
public class OrderServiceTest {
@Mock
private OrderRepository orderRepository;
@Mock
private InventoryService inventoryService;
@Mock
private PaymentService paymentService;
@Spy
private ApplicationEventPublisher eventPublisher = new SimpleApplicationEventPublisher();
@InjectMocks
private OrderService orderService;
@Test
public void shouldPublishOrderPlacedEventAfterCreatingOrder() {
// 給定
CreateOrderRequest request = createValidRequest();
when(inventoryService.checkAvailability(any(), anyInt())).thenReturn(true);
when(paymentService.processPayment(any(), any(), any()))
.thenReturn(PaymentResult.successful("txn-123"));
when(orderRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
// 當
orderService.createOrder(request);
// 然後
verify(eventPublisher).publishEvent(argThat(event ->
event instanceof OrderPlacedEvent
));
}
}
遷移之旅
將單體應用重構為使用事件驅動架構並非全有或全無的命題。從一個工作流開始——通常是造成最多痛苦的那個。識別可以事件驅動的直接服務調用,並逐步引入事件。
從同步事件開始,以最小化行為變更。一旦事件正確流動,為非關鍵監聽器切換到異步處理。當您需要審計跟蹤或調試能力時,添加事件存儲。僅當您需要跨服務通信或準備提取微服務時,才集成外部消息代理。
目標不是實現完美的事件驅動架構。而是減少耦合、提高可測試性,並在組件之間創建更清晰的邊界。即使是部分採用也能提供價值——具有一些事件驅動模式的單體應用比完全沒有的模式更易於維護。
這種漸進式方法使您能夠持續交付價值,而不是投入一個需要數月時間、直到完全結束時才能交付任何成果的重構項目。您能夠了解在特定領域和團隊中哪些方法有效,根據實際經驗而非理論理想來調整實施策略。
有用的資源
- Spring 應用事件:https://docs.spring.io/spring-framework/reference/core/beans/...
關於應用事件系統以及如何有效使用它的官方 Spring 文檔。 - Spring Cloud Stream:https://spring.io/projects/spring-cloud-stream
用於構建事件驅動微服務的框架,在橋接到外部消息代理時很有用。 - Eric Evans 的《領域驅動設計》:https://www.domainlanguage.com/ddd/
理解事件驅動系統中的領域事件和限界上下文的必讀材料。 - 企業集成模式:https://www.enterpriseintegrationpatterns.com/
適用於事件驅動架構的消息傳遞模式的全面目錄。 - Micrometer:https://micrometer.io/
用於監控事件處理和系統行為的應用指標庫。 - Awaitility:https://github.com/awaitility/awaitility
用於異步系統的測試庫,對於事件驅動代碼的集成測試至關重要。
【注】本文譯自: Event-Driven Architecture in Monoliths: Incremental Refactoring for Java Apps - Java Code Geeks