使用中介者模式輕鬆實現命令查詢職責分離,構建高內聚、低耦合的應用系統
一、知識點回顧
1. 什麼是CQRS?
CQRS是Command Query Responsibility Segregation的縮寫,一般稱作命令查詢職責分離。從字面意思理解,就是將命令(寫入)和查詢(讀取)的責任劃分到不同的模型中。
對比一下常用的 CRUD 模式(創建-讀取-更新-刪除),通常我們會讓用户界面與負責所有四種操作的數據存儲交互。而 CQRS 則將這些操作分成兩種模式,一種用於查詢(又稱 "R"),另一種用於命令(又稱 "CUD")。
2. CQRS的作用是什麼?
CQRS將系統的寫操作(命令)和讀操作(查詢)分離到不同的模型和數據存儲中,從而實現讀寫分離,提高系統的性能、可擴展性和安全性,並使複雜業務邏輯(寫端)和高效查詢(讀端)各自得到優化,降低系統複雜性。它允許為寫操作設計嚴謹的領域模型,為讀操作設計簡單、只關注查詢效率的數據模型(如專用視圖或報表數據庫),並可通過事件等機制保持最終一致性。
3. CQRS 的優點
- 獨立縮放。 CQRS 使讀取模型和寫入模型能夠獨立縮放。 此方法可幫助最大程度地減少鎖爭用並提高負載下的系統性能。
- 優化的數據架構。 讀取操作可以使用針對查詢進行優化的模式。 寫入操作使用針對更新優化的模式。
- 安全性。 通過分隔讀取和寫入,可以確保只有適當的域實體或操作有權對數據執行寫入操作。
- 關注點分離。 分離讀取和寫入責任會導致更簡潔、更易於維護的模型。 寫入端通常處理複雜的業務邏輯。 讀取端可以保持簡單且專注於查詢效率。
- 更簡單的查詢。 在讀取數據庫中存儲具體化視圖時,應用程序可以在查詢時避免複雜的聯接。
二、關於PipelinR
項目地址
https://github.com/sizovs/PipelinR
項目開發者在Github的介紹不多,關鍵是最後一句話:It's similar to a popular MediatR .NET library. 意思就是這個項目是參考着一個叫MediatR的.net庫寫的。關於MediatR我之前有兩篇文章專門介紹過。
PipelinR(包括MediatR)提供了一種CQRS的實現方式,基於中介者模式實現進程內消息傳遞,用於解耦應用中的各個組件,支持請求/響應(一對一,有返回值)和發佈/訂閲(一對多,無返回值)兩種消息模式。它們在內部提供管道行為 (Pipeline Behaviors),用於在消息處理前後插入自定義邏輯,如日誌、驗證、異常處理等。
需要提醒的是,PipelinR並不是一個完整的CQRS框架,它只是一箇中介者模式的具體實現方式,將調用方和處理方進行了解耦,而這種模式恰好可以用來在一個單體應用(或者是微服務的服務內部)中實現簡單的CQRS。
三、依賴安裝和配置
1. Maven安裝
<dependency>
<groupId>net.sizovs</groupId>
<artifactId>pipelinr</artifactId>
<version>0.11</version>
</dependency>
2. Gradle安裝
dependencies {
compile 'net.sizovs:pipelinr:0.11'
}
在Spring項目中配置PipelinR
@Configuration
public class PipelinrConfiguration {
@Bean
Pipeline pipeline(ObjectProvider<Command.Handler> commandHandlers, ObjectProvider<Notification.Handler> notificationHandlers, ObjectProvider<Command.Middleware> middlewares) {
return new Pipelinr()
.with(commandHandlers::stream)
.with(notificationHandlers::stream)
.with(middlewares::orderedStream);
}
}
四、核心組件
Pipeline/Pipelinr:Pipeline是消息和處理器之間的中介者,調用方向Pipeline發送消息,Pipeline收到消息後通過註冊到Pipeline的中間件進行層層傳遞並最終抵達匹配的消息處理器進行處理。Pipelinr是Pipeline的默認實現。Command<R>:用於約定請求/響應模式的消息類型,泛型參數R是返回值的類型,如果不需要返回值,可以將R指定為Voidy。Notification:用於約定發佈/訂閲模式的消息類型,沒有返回值,消息可以有多個處理器。Middleware:管道中間件,Command和Notification都定義了各自的中間件接口。Pipeline接收到的消息,在到達最終的處理器之前,會經過所有註冊到Pipeline的中間。可以使用Middleware實現諸如日誌記錄、數據驗證、開啓事務等一系列操作。
五、請求/響應模式實現
請求/響應模式需要用到Command接口。
1. 定義Command
Command代表一個請求,需要實現net.sizovs.pipelinr.Command接口。泛型參數指定返回值類型。
// 定義一個創建用户的命令
public class CreateUserCommand implements Command<UserResponse> {
private String username;
private String email;
public CreateUserCommand(String username, String email) {
this.username = username;
this.email = email;
}
public String getUsername() {
return username;
}
public String getEmail() {
return email;
}
}
// 返回值類型
public class UserResponse {
private Long userId;
private String username;
private String email;
public UserResponse(Long userId, String username, String email) {
this.userId = userId;
this.username = username;
this.email = email;
}
// getters
}
2. 定義Command Handler
創建該Command對應的處理器,實現net.sizovs.pipelinr.Command.Handler接口。
@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
@Autowired
private UserRepository userRepository;
@Override
public UserResponse handle(CreateUserCommand command) {
// 業務邏輯處理
User user = new User();
user.setUsername(command.getUsername());
user.setEmail(command.getEmail());
User savedUser = userRepository.save(user);
return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
}
}
3. 在業務代碼中使用
通過注入Pipeline實例,發送Command並獲取響應。
@Service
public class UserService {
@Autowired
private Pipeline pipeline;
public UserResponse createUser(String username, String email) {
CreateUserCommand command = new CreateUserCommand(username, email);
UserResponse response = pipeline.send(command);
return response;
}
}
4. 添加Command中間件
中間件可以在Command處理前後執行一些操作,如驗證、日誌、事務管理等。
@Component
public class LoggingMiddleware implements Command.Middleware {
private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
@Override
public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
logger.info("Executing command: {}", command.getClass().getSimpleName());
try {
R result = chain.proceed(command);
logger.info("Command executed successfully");
return result;
} catch (Exception e) {
logger.error("Command execution failed", e);
throw e;
}
}
}
@Component
public class ValidationMiddleware implements Command.Middleware {
@Autowired
private Validator validator;
@Override
public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
Set<ConstraintViolation<C>> violations = validator.validate(command);
if (!violations.isEmpty()) {
throw new ConstraintViolationException("Validation failed", violations);
}
return chain.proceed(command);
}
}
@Component
@Order(1) // 指定中間件執行順序
public class TransactionMiddleware implements Command.Middleware {
@Autowired
private PlatformTransactionManager transactionManager;
@Override
public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
R result = chain.proceed(command);
transactionManager.commit(status);
return result;
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
}
六、發佈/訂閲模式實現
發佈/訂閲模式使用Notification接口,用於一對多的消息分發,沒有返回值。
1. 定義Notification
Notification代表一個事件通知,需要實現net.sizovs.pipelinr.Notification接口。
// 定義一個用户創建成功的事件通知
public class UserCreatedNotification implements Notification {
private Long userId;
private String username;
private String email;
private LocalDateTime createdTime;
public UserCreatedNotification(Long userId, String username, String email) {
this.userId = userId;
this.username = username;
this.email = email;
this.createdTime = LocalDateTime.now();
}
// getters
}
2. 定義Notification Handler
Notification可以有多個處理器,每個處理器實現net.sizovs.pipelinr.Notification.Handler接口。
@Component
public class SendWelcomeEmailHandler implements Notification.Handler<UserCreatedNotification> {
private static final Logger logger = LoggerFactory.getLogger(SendWelcomeEmailHandler.class);
@Autowired
private EmailService emailService;
@Override
public void handle(UserCreatedNotification notification) {
logger.info("Sending welcome email to user: {}", notification.getUsername());
emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());
}
}
@Component
public class LogUserCreationHandler implements Notification.Handler<UserCreatedNotification> {
private static final Logger logger = LoggerFactory.getLogger(LogUserCreationHandler.class);
@Autowired
private UserAuditLogRepository auditLogRepository;
@Override
public void handle(UserCreatedNotification notification) {
logger.info("Logging user creation: {}", notification.getUsername());
UserAuditLog auditLog = new UserAuditLog();
auditLog.setUserId(notification.getUserId());
auditLog.setOperation("CREATE");
auditLog.setTimestamp(notification.getCreatedTime());
auditLogRepository.save(auditLog);
}
}
@Component
public class UpdateUserStatisticsHandler implements Notification.Handler<UserCreatedNotification> {
private static final Logger logger = LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);
@Autowired
private UserStatisticsRepository statisticsRepository;
@Override
public void handle(UserCreatedNotification notification) {
logger.info("Updating statistics for new user: {}", notification.getUsername());
UserStatistics stats = statisticsRepository.findOrCreate();
stats.incrementTotalUsers();
statisticsRepository.save(stats);
}
}
3. 發送Notification
在Command處理完成後,可以發送Notification通知所有相關的處理器。
@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
@Autowired
private UserRepository userRepository;
@Autowired
private Pipeline pipeline;
@Override
public UserResponse handle(CreateUserCommand command) {
// 業務邏輯處理
User user = new User();
user.setUsername(command.getUsername());
user.setEmail(command.getEmail());
User savedUser = userRepository.save(user);
// 發送事件通知
UserCreatedNotification notification = new UserCreatedNotification(
savedUser.getId(),
savedUser.getUsername(),
savedUser.getEmail()
);
pipeline.send(notification);
return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
}
}
4. 添加Notification中間件
類似Command,Notification也支持中間件。
@Component
public class NotificationLoggingMiddleware implements Notification.Middleware {
private static final Logger logger = LoggerFactory.getLogger(NotificationLoggingMiddleware.class);
@Override
public <N extends Notification> void invoke(N notification, Chain chain) {
logger.info("Publishing notification: {}", notification.getClass().getSimpleName());
try {
chain.proceed(notification);
logger.info("Notification published successfully");
} catch (Exception e) {
logger.error("Notification publishing failed", e);
throw e;
}
}
}
@Component
public class NotificationErrorHandlingMiddleware implements Notification.Middleware {
private static final Logger logger = LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);
@Override
public <N extends Notification> void invoke(N notification, Chain chain) {
try {
chain.proceed(notification);
} catch (Exception e) {
logger.error("Error handling notification: {}", notification.getClass().getSimpleName(), e);
// 可以選擇吞掉異常或重新拋出,取決於業務需求
// throw e;
}
}
}
七、總結
核心收穫
通過本文的介紹,我們瞭解瞭如何在Java應用中使用PipelinR框架實現CQRS模式。核心要點總結如下:
1. CQRS的價值
- 讀寫分離:通過Command處理寫操作,Notification處理事件響應,實現職責的明確劃分
- 獨立優化:讀端和寫端可以獨立優化,不同的數據模型適應不同的場景需求
- 系統解耦:中介者模式解耦了調用方和處理方,提高了系統的可維護性和可擴展性
2. PipelinR的核心特性
- 輕量級實現:相比完整的CQRS框架,PipelinR更輕便,學習成本低
- 靈活的管道機制:通過中間件可以方便地植入橫切關注點(如日誌、驗證、事務等)
- 支持兩種消息模式:Command用於請求/響應,Notification用於發佈/訂閲
3. 最佳實踐建議
- 合理使用中間件:通過@Order註解控制中間件執行順序,但要避免中間件層級過多導致性能問題
- 異常處理:根據場景選擇合適的異常處理策略,Notification可考慮不中斷其他處理器的錯誤隔離
- 事件驅動設計:充分利用Notification實現事件驅動架構,解耦不同的業務流程
- 代碼組織:按照Command、Handler、Middleware的劃分方式組織代碼,保持結構清晰
實施建議
適用場景
- 中等複雜度的業務系統,需要良好的代碼結構和可維護性
- 業務邏輯相對複雜,需要事件驅動的系統設計
- 團隊具備良好的DDD設計理念和架構意識
注意事項
- 學習曲線:雖然PipelinR本身簡單,但要理解CQRS的設計理念需要一定時間
- 適度使用:CQRS不是銀彈,過度設計會增加系統複雜度,要根據實際需求決定是否引入
- 團隊協作:CQRS的有效實施對團隊的整體架構意識和編碼規範要求較高
- 性能考慮:雖然使用了中介者模式會引入少量額外開銷,但對大多數應用來説可以忽略不計
結論
PipelinR提供了一種輕量級、簡潔的CQRS實現方案。它特別適合那些想要在不過度複雜化系統的前提下,引入DDD思想和事件驅動設計的項目。通過合理運用Command和Notification,結合恰當的中間件設計,開發者可以構建出高內聚、低耦合、易於維護和擴展的應用系統。
關鍵是要把握好"度"——既要充分發揮CQRS和PipelinR的優勢,又要避免為了追求"高大上"的架構而過度設計,最終的目標是為業務的快速迭代和長期維護提供支撐。