博客 / 詳情

返回

PipelinR:在Java中實現優雅的CQRS架構

使用中介者模式輕鬆實現命令查詢職責分離,構建高內聚、低耦合的應用系統


一、知識點回顧

1. 什麼是CQRS?

CQRS是Command Query Responsibility Segregation的縮寫,一般稱作命令查詢職責分離。從字面意思理解,就是將命令(寫入)和查詢(讀取)的責任劃分到不同的模型中。

對比一下常用的 CRUD 模式(創建-讀取-更新-刪除),通常我們會讓用户界面與負責所有四種操作的數據存儲交互。而 CQRS 則將這些操作分成兩種模式,一種用於查詢(又稱 "R"),另一種用於命令(又稱 "CUD")。

2. CQRS的作用是什麼?

CQRS將系統的寫操作(命令)和讀操作(查詢)分離到不同的模型和數據存儲中,從而實現讀寫分離,提高系統的性能、可擴展性和安全性,並使複雜業務邏輯(寫端)和高效查詢(讀端)各自得到優化,降低系統複雜性。它允許為寫操作設計嚴謹的領域模型,為讀操作設計簡單、只關注查詢效率的數據模型(如專用視圖或報表數據庫),並可通過事件等機制保持最終一致性。

3. CQRS 的優點

  • 獨立縮放。 CQRS 使讀取模型和寫入模型能夠獨立縮放。 此方法可幫助最大程度地減少鎖爭用並提高負載下的系統性能。
  • 優化的數據架構。 讀取操作可以使用針對查詢進行優化的模式。 寫入操作使用針對更新優化的模式。
  • 安全性。 通過分隔讀取和寫入,可以確保只有適當的域實體或操作有權對數據執行寫入操作。
  • 關注點分離。 分離讀取和寫入責任會導致更簡潔、更易於維護的模型。 寫入端通常處理複雜的業務邏輯。 讀取端可以保持簡單且專注於查詢效率。
  • 更簡單的查詢。 在讀取數據庫中存儲具體化視圖時,應用程序可以在查詢時避免複雜的聯接。

二、關於PipelinR

項目地址

https://github.com/sizovs/PipelinR

項目開發者在Github的介紹不多,關鍵是最後一句話:It's similar to a popular MediatR .NET library. 意思就是這個項目是參考着一個叫MediatR的.net庫寫的。關於MediatR我之前有兩篇文章專門介紹過。

PipelinR(包括MediatR)提供了一種CQRS的實現方式,基於中介者模式實現進程內消息傳遞,用於解耦應用中的各個組件,支持請求/響應(一對一,有返回值)和發佈/訂閲(一對多,無返回值)兩種消息模式。它們在內部提供管道行為 (Pipeline Behaviors),用於在消息處理前後插入自定義邏輯,如日誌、驗證、異常處理等。

需要提醒的是,PipelinR並不是一個完整的CQRS框架,它只是一箇中介者模式的具體實現方式,將調用方和處理方進行了解耦,而這種模式恰好可以用來在一個單體應用(或者是微服務的服務內部)中實現簡單的CQRS。

三、依賴安裝和配置

1. Maven安裝

<dependency>
  <groupId>net.sizovs</groupId>
  <artifactId>pipelinr</artifactId>
  <version>0.11</version>
</dependency>

2. Gradle安裝

dependencies {
    compile 'net.sizovs:pipelinr:0.11'
}

在Spring項目中配置PipelinR

@Configuration
public class PipelinrConfiguration {

    @Bean
    Pipeline pipeline(ObjectProvider<Command.Handler> commandHandlers, ObjectProvider<Notification.Handler> notificationHandlers, ObjectProvider<Command.Middleware> middlewares) {
        return new Pipelinr()
          .with(commandHandlers::stream)
          .with(notificationHandlers::stream)
          .with(middlewares::orderedStream);
    }
}

四、核心組件

  • Pipeline/Pipelinr:Pipeline是消息和處理器之間的中介者,調用方向Pipeline發送消息,Pipeline收到消息後通過註冊到Pipeline的中間件進行層層傳遞並最終抵達匹配的消息處理器進行處理。Pipelinr是Pipeline的默認實現。
  • Command<R>:用於約定請求/響應模式的消息類型,泛型參數R是返回值的類型,如果不需要返回值,可以將R指定為Voidy。
  • Notification:用於約定發佈/訂閲模式的消息類型,沒有返回值,消息可以有多個處理器。
  • Middleware:管道中間件,Command和Notification都定義了各自的中間件接口。Pipeline接收到的消息,在到達最終的處理器之前,會經過所有註冊到Pipeline的中間。可以使用Middleware實現諸如日誌記錄、數據驗證、開啓事務等一系列操作。

五、請求/響應模式實現

請求/響應模式需要用到Command接口。

1. 定義Command

Command代表一個請求,需要實現net.sizovs.pipelinr.Command接口。泛型參數指定返回值類型。

// 定義一個創建用户的命令
public class CreateUserCommand implements Command<UserResponse> {
    private String username;
    private String email;
    
    public CreateUserCommand(String username, String email) {
        this.username = username;
        this.email = email;
    }
    
    public String getUsername() {
        return username;
    }
    
    public String getEmail() {
        return email;
    }
}

// 返回值類型
public class UserResponse {
    private Long userId;
    private String username;
    private String email;
    
    public UserResponse(Long userId, String username, String email) {
        this.userId = userId;
        this.username = username;
        this.email = email;
    }
    
    // getters
}

2. 定義Command Handler

創建該Command對應的處理器,實現net.sizovs.pipelinr.Command.Handler接口。

@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
    
    @Autowired
    private UserRepository userRepository;
    
    @Override
    public UserResponse handle(CreateUserCommand command) {
        // 業務邏輯處理
        User user = new User();
        user.setUsername(command.getUsername());
        user.setEmail(command.getEmail());
        
        User savedUser = userRepository.save(user);
        
        return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}

3. 在業務代碼中使用

通過注入Pipeline實例,發送Command並獲取響應。

@Service
public class UserService {
    
    @Autowired
    private Pipeline pipeline;
    
    public UserResponse createUser(String username, String email) {
        CreateUserCommand command = new CreateUserCommand(username, email);
        UserResponse response = pipeline.send(command);
        return response;
    }
}

4. 添加Command中間件

中間件可以在Command處理前後執行一些操作,如驗證、日誌、事務管理等。

@Component
public class LoggingMiddleware implements Command.Middleware {
    
    private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
    
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        logger.info("Executing command: {}", command.getClass().getSimpleName());
        try {
            R result = chain.proceed(command);
            logger.info("Command executed successfully");
            return result;
        } catch (Exception e) {
            logger.error("Command execution failed", e);
            throw e;
        }
    }
}

@Component
public class ValidationMiddleware implements Command.Middleware {
    
    @Autowired
    private Validator validator;
    
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        Set<ConstraintViolation<C>> violations = validator.validate(command);
        if (!violations.isEmpty()) {
            throw new ConstraintViolationException("Validation failed", violations);
        }
        return chain.proceed(command);
    }
}

@Component
@Order(1) // 指定中間件執行順序
public class TransactionMiddleware implements Command.Middleware {
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
        try {
            R result = chain.proceed(command);
            transactionManager.commit(status);
            return result;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
}

六、發佈/訂閲模式實現

發佈/訂閲模式使用Notification接口,用於一對多的消息分發,沒有返回值。

1. 定義Notification

Notification代表一個事件通知,需要實現net.sizovs.pipelinr.Notification接口。

// 定義一個用户創建成功的事件通知
public class UserCreatedNotification implements Notification {
    private Long userId;
    private String username;
    private String email;
    private LocalDateTime createdTime;
    
    public UserCreatedNotification(Long userId, String username, String email) {
        this.userId = userId;
        this.username = username;
        this.email = email;
        this.createdTime = LocalDateTime.now();
    }
    
    // getters
}

2. 定義Notification Handler

Notification可以有多個處理器,每個處理器實現net.sizovs.pipelinr.Notification.Handler接口。

@Component
public class SendWelcomeEmailHandler implements Notification.Handler<UserCreatedNotification> {
    
    private static final Logger logger = LoggerFactory.getLogger(SendWelcomeEmailHandler.class);
    
    @Autowired
    private EmailService emailService;
    
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Sending welcome email to user: {}", notification.getUsername());
        emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());
    }
}

@Component
public class LogUserCreationHandler implements Notification.Handler<UserCreatedNotification> {
    
    private static final Logger logger = LoggerFactory.getLogger(LogUserCreationHandler.class);
    
    @Autowired
    private UserAuditLogRepository auditLogRepository;
    
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Logging user creation: {}", notification.getUsername());
        UserAuditLog auditLog = new UserAuditLog();
        auditLog.setUserId(notification.getUserId());
        auditLog.setOperation("CREATE");
        auditLog.setTimestamp(notification.getCreatedTime());
        auditLogRepository.save(auditLog);
    }
}

@Component
public class UpdateUserStatisticsHandler implements Notification.Handler<UserCreatedNotification> {
    
    private static final Logger logger = LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);
    
    @Autowired
    private UserStatisticsRepository statisticsRepository;
    
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Updating statistics for new user: {}", notification.getUsername());
        UserStatistics stats = statisticsRepository.findOrCreate();
        stats.incrementTotalUsers();
        statisticsRepository.save(stats);
    }
}

3. 發送Notification

在Command處理完成後,可以發送Notification通知所有相關的處理器。

@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private Pipeline pipeline;
    
    @Override
    public UserResponse handle(CreateUserCommand command) {
        // 業務邏輯處理
        User user = new User();
        user.setUsername(command.getUsername());
        user.setEmail(command.getEmail());
        
        User savedUser = userRepository.save(user);
        
        // 發送事件通知
        UserCreatedNotification notification = new UserCreatedNotification(
            savedUser.getId(), 
            savedUser.getUsername(), 
            savedUser.getEmail()
        );
        pipeline.send(notification);
        
        return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}

4. 添加Notification中間件

類似Command,Notification也支持中間件。

@Component
public class NotificationLoggingMiddleware implements Notification.Middleware {
    
    private static final Logger logger = LoggerFactory.getLogger(NotificationLoggingMiddleware.class);
    
    @Override
    public <N extends Notification> void invoke(N notification, Chain chain) {
        logger.info("Publishing notification: {}", notification.getClass().getSimpleName());
        try {
            chain.proceed(notification);
            logger.info("Notification published successfully");
        } catch (Exception e) {
            logger.error("Notification publishing failed", e);
            throw e;
        }
    }
}

@Component
public class NotificationErrorHandlingMiddleware implements Notification.Middleware {
    
    private static final Logger logger = LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);
    
    @Override
    public <N extends Notification> void invoke(N notification, Chain chain) {
        try {
            chain.proceed(notification);
        } catch (Exception e) {
            logger.error("Error handling notification: {}", notification.getClass().getSimpleName(), e);
            // 可以選擇吞掉異常或重新拋出,取決於業務需求
            // throw e;
        }
    }
}

七、總結

核心收穫

通過本文的介紹,我們瞭解瞭如何在Java應用中使用PipelinR框架實現CQRS模式。核心要點總結如下:

1. CQRS的價值

  • 讀寫分離:通過Command處理寫操作,Notification處理事件響應,實現職責的明確劃分
  • 獨立優化:讀端和寫端可以獨立優化,不同的數據模型適應不同的場景需求
  • 系統解耦:中介者模式解耦了調用方和處理方,提高了系統的可維護性和可擴展性

2. PipelinR的核心特性

  • 輕量級實現:相比完整的CQRS框架,PipelinR更輕便,學習成本低
  • 靈活的管道機制:通過中間件可以方便地植入橫切關注點(如日誌、驗證、事務等)
  • 支持兩種消息模式:Command用於請求/響應,Notification用於發佈/訂閲

3. 最佳實踐建議

  • 合理使用中間件:通過@Order註解控制中間件執行順序,但要避免中間件層級過多導致性能問題
  • 異常處理:根據場景選擇合適的異常處理策略,Notification可考慮不中斷其他處理器的錯誤隔離
  • 事件驅動設計:充分利用Notification實現事件驅動架構,解耦不同的業務流程
  • 代碼組織:按照Command、Handler、Middleware的劃分方式組織代碼,保持結構清晰

實施建議

適用場景

  • 中等複雜度的業務系統,需要良好的代碼結構和可維護性
  • 業務邏輯相對複雜,需要事件驅動的系統設計
  • 團隊具備良好的DDD設計理念和架構意識

注意事項

  • 學習曲線:雖然PipelinR本身簡單,但要理解CQRS的設計理念需要一定時間
  • 適度使用:CQRS不是銀彈,過度設計會增加系統複雜度,要根據實際需求決定是否引入
  • 團隊協作:CQRS的有效實施對團隊的整體架構意識和編碼規範要求較高
  • 性能考慮:雖然使用了中介者模式會引入少量額外開銷,但對大多數應用來説可以忽略不計

結論

PipelinR提供了一種輕量級、簡潔的CQRS實現方案。它特別適合那些想要在不過度複雜化系統的前提下,引入DDD思想和事件驅動設計的項目。通過合理運用Command和Notification,結合恰當的中間件設計,開發者可以構建出高內聚、低耦合、易於維護和擴展的應用系統。

關鍵是要把握好"度"——既要充分發揮CQRS和PipelinR的優勢,又要避免為了追求"高大上"的架構而過度設計,最終的目標是為業務的快速迭代和長期維護提供支撐。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.