知識庫 / Spring RSS 訂閱

使用 Spring Integration 中的子流程

Spring
HongKong
5
01:36 PM · Dec 06 ,2025

1. 概述

Spring Integration 使得使用某些企業集成模式變得容易。其中一種方式是通過其 DSL。

在本教程中,我們將探討 DSL 對子流程的支持,以簡化一些配置。

2. 我們的任務

假設我們有一系列整數,我們希望將其分成三個不同的分組。

如果我們想使用 Spring Integration 來完成這項任務,我們可以首先創建三個輸出通道:

  • 像 0、3、6 和 9 這樣的數字將進入 multipleOfThreeChannel
  • 像 1、4、7 和 10 這樣的數字將進入 remainderIsOneChannel
  • 以及像 2、5、8 和 11 這樣的數字將進入 remainderIsTwoChannel

為了展示子流程的實用性,讓我們先看看在不使用子流程的情況下它會是什麼樣子。

然後,我們將使用子流程來簡化我們的配置,包括:

  • publishSubscribeChannel
  • routeToRecipients
  • Filters,用於配置我們的 if-then 邏輯
  • Routers,用於配置我們的 switch 邏輯

3. 前提條件

現在在配置我們的子流程之前,我們先創建這些輸出通道。

我們將這些 QueueChannel 創建為隊列通道,因為這樣更容易演示。

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
 
    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

最終,這些將是分組數字的歸宿地。

請注意,Spring Integration 很容易看起來很複雜,因此我們將添加一些輔助方法以提高可讀性。

4. 不使用子流程解決問題

現在我們需要定義我們的流程。

在不使用子流程的情況下,簡單的思路是定義三個獨立的集成流程,每個流程對應一種數字類型。

我們將向每個 IntegrationFlow 組件發送相同的消息序列,但每個組件的輸出消息將不同。

4.1. 定義 <emIntegrationFlow> 組件

首先,讓我們定義我們在 <emSubflowConfiguration> 類中的每個 <emIntegrationFlow> bean:

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

我們的流程包含兩個端點——一個分割器,隨後是一個過濾器

過濾器就像它的名字一樣,會做相應的事情。但為什麼我們還需要一個分割器?稍後我們會看到的,基本上,它會將輸入集合分割成單獨的消息。

當然,我們也可以以相同的方式定義兩個額外的集成流程 Bean。

4.2. 消息網關

對於每個流程,都需要一個 消息網關

簡單來説,它們將 Spring Integration Messages API 從調用者那裏抽象出來,類似於 REST 服務如何抽象 HTTP:

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);

}

對於每個對象,我們需要使用<em>@Gateway</em>註解,並指定輸入通道的隱式名稱,即為 Bean 的名稱後加上<em>.input</em>。<strong>注意,由於我們使用基於 lambda 的流,因此可以使用這種約定。</strong>

4.3. 發送消息和檢查輸出

現在,我們進行測試:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
 
    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;
    @Test
    public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

請注意,我們已將消息發送為列表,因此我們需要分割器,將單個“列表消息”轉換為多個“數字消息”。

我們調用receive,傳入o,以便在不等待的情況下獲取下一個可用的消息。由於我們的列表中有兩倍於三個的數字,我們預計可以調用它兩次。第三次調用receive返回null

receive,當然,返回一個Message,因此我們調用getPayload來提取數字。

同樣,我們也可以對另外兩個執行相同的操作。

所以,這就是在不使用子流程的解決方案。我們有三個獨立的流程和三個獨立的網關方法需要維護。

接下來,我們將三個IntegrationFlow Bean 替換為單個 Bean,並將三個網關方法替換為單個方法。

5. 使用 publishSubscribeChannel

publishSubscribeChannel() 方法將消息廣播到所有訂閲的子流程。 這樣,我們就可以創建一個流程,而不是三個流程。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

通過這種方式,子流程是匿名的,這意味着它們不能獨立地進行處理。

現在,我們只有一個流程,所以我們來編輯我們的 NumbersClassifier 吧:

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

現在,由於我們只有一個 IntegrationFlow Bean 和一個 gateway 方法,因此我們只需要發送一次我們的列表:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

    // same assertions as before
}

請注意,從現在開始,只會更改集成流程的定義,因此我們將不再展示測試結果。

6. 使用<em routeToRecipients

另一種實現相同目的的方法是,因為它具有內置的過濾功能。

使用此方法,我們可以指定頻道和子流程以進行廣播。

6.1. 接收者

在下面的代碼中,我們將根據我們的條件,指定multipleof3ChannelremainderIs1ChannelremainderIsTwoChannel作為接收者:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel", 
            this::isMultipleOfThree)       
          .<Integer> recipient("remainderIsOneChannel", 
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel", 
            this::isRemainderTwo));
}

我們還可以無條件地調用 recipient,並且 routeToRecipients 將無條件地發佈到該目標目的地。

6.2. recipientFlow

並且請注意,routeToRecipients 允許我們定義完整的流程,就像 publishSubscribeChannel 一樣。

讓我們修改上述代碼,並指定一個 匿名子流程作為第一個接收者:

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("mutipleOfThreeChannel"))
  ...);

這個子流程將接收整個消息序列,因此我們需要像之前一樣進行過濾,以獲得相同的行為。

再次強調,一個IntegrationFlow Bean 已經足夠滿足我們的需求。

現在我們繼續討論if-else 組件。其中之一是Filter

7. 使用 if-then 流程

我們之前已經在所有示例中使用了 過濾器 。 令人高興的是,我們可以不僅指定進一步處理的條件,還可以指定丟棄的消息 通道或流程

我們可以將丟棄流程和通道視為 else 塊:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleofThreeChannel");
}

在這種情況下,我們已實現 if-else 路由邏輯:

  • 如果數字不是3的倍數,則將消息流向丟棄流程;我們這裏使用流程,因為需要更多邏輯來確定其目標通道。
  • 在丟棄流程中,如果數字的餘數不是1,將這些消息流向丟棄通道。

8. 計算值的啓用

最後,讓我們嘗試 route 方法,它比 routeToRecipients 提供了更多的控制。 它的優點在於一個 路由器 可以將流程分割成任意數量的部分,而 過濾器 只能進行兩個分割。

8.1. channelMapping

讓我們定義我們的 IntegrationFlow Bean:

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3, 
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

在上述代碼中,我們通過計算除法來計算路由鍵:

route(p -> p % 3,...

根據此密鑰,我們路由消息:

channelMapping(0, "multipleof3Channel")

8.2. subFlowMapping

現在,與其它方式類似,我們可以通過指定subFlowMapping,來獲得更大的控制權,將channelMapping替換為subFlowMapping

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

或者,通過調用handle方法而不是channel方法,可以獲得更多的控制:

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
      // do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

在這種情況下,子流程將在 route() 方法後返回到主流程,因此我們需要指定通道 remainderIsTwoChannel

9. 結論

在本教程中,我們探討了如何使用子流程過濾和路由消息的一些方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.