1. 概述
Spring Integration 使得使用某些企業集成模式變得容易。其中一種方式是通過其 DSL。
在本教程中,我們將探討 DSL 對子流程的支持,以簡化一些配置。
2. 我們的任務
假設我們有一系列整數,我們希望將其分成三個不同的分組。
如果我們想使用 Spring Integration 來完成這項任務,我們可以首先創建三個輸出通道:
- 像 0、3、6 和 9 這樣的數字將進入 multipleOfThreeChannel
- 像 1、4、7 和 10 這樣的數字將進入 remainderIsOneChannel
- 以及像 2、5、8 和 11 這樣的數字將進入 remainderIsTwoChannel
為了展示子流程的實用性,讓我們先看看在不使用子流程的情況下它會是什麼樣子。
然後,我們將使用子流程來簡化我們的配置,包括:
- publishSubscribeChannel
- routeToRecipients
- Filters,用於配置我們的 if-then 邏輯
- Routers,用於配置我們的 switch 邏輯
3. 前提條件
現在在配置我們的子流程之前,我們先創建這些輸出通道。
我們將這些 QueueChannel 創建為隊列通道,因為這樣更容易演示。
@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderIOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
}最終,這些將是分組數字的歸宿地。
請注意,Spring Integration 很容易看起來很複雜,因此我們將添加一些輔助方法以提高可讀性。
4. 不使用子流程解決問題
現在我們需要定義我們的流程。
在不使用子流程的情況下,簡單的思路是定義三個獨立的集成流程,每個流程對應一種數字類型。
我們將向每個 IntegrationFlow 組件發送相同的消息序列,但每個組件的輸出消息將不同。
4.1. 定義 <emIntegrationFlow> 組件
首先,讓我們定義我們在 <emSubflowConfiguration> 類中的每個 <emIntegrationFlow> bean:
@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}我們的流程包含兩個端點——一個分割器,隨後是一個過濾器。
過濾器就像它的名字一樣,會做相應的事情。但為什麼我們還需要一個分割器?稍後我們會看到的,基本上,它會將輸入集合分割成單獨的消息。
當然,我們也可以以相同的方式定義兩個額外的集成流程 Bean。
4.2. 消息網關
對於每個流程,都需要一個 消息網關。
簡單來説,它們將 Spring Integration Messages API 從調用者那裏抽象出來,類似於 REST 服務如何抽象 HTTP:
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleOfThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
}對於每個對象,我們需要使用<em>@Gateway</em>註解,並指定輸入通道的隱式名稱,即為 Bean 的名稱後加上<em>.input</em>。<strong>注意,由於我們使用基於 lambda 的流,因此可以使用這種約定。</strong>
現在,我們進行測試: 請注意,我們已將消息發送為列表,因此我們需要分割器,將單個“列表消息”轉換為多個“數字消息”。 我們調用receive,傳入o,以便在不等待的情況下獲取下一個可用的消息。由於我們的列表中有兩倍於三個的數字,我們預計可以調用它兩次。第三次調用receive返回null。 receive,當然,返回一個Message,因此我們調用getPayload來提取數字。 同樣,我們也可以對另外兩個執行相同的操作。 所以,這就是在不使用子流程的解決方案。我們有三個獨立的流程和三個獨立的網關方法需要維護。 接下來,我們將三個IntegrationFlow Bean 替換為單個 Bean,並將三個網關方法替換為單個方法。 publishSubscribeChannel() 方法將消息廣播到所有訂閲的子流程。 這樣,我們就可以創建一個流程,而不是三個流程。 通過這種方式,子流程是匿名的,這意味着它們不能獨立地進行處理。 現在,我們只有一個流程,所以我們來編輯我們的 NumbersClassifier 吧: 現在,由於我們只有一個 IntegrationFlow Bean 和一個 gateway 方法,因此我們只需要發送一次我們的列表:4.3. 發送消息和檢查輸出
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
@Autowired
private QueueChannel multipleOfThreeChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
assertNull(outMessage);
}
}5. 使用 publishSubscribeChannel
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription ->
subscription
.subscribe(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
// same assertions as before
}請注意,從現在開始,只會更改集成流程的定義,因此我們將不再展示測試結果。
6. 使用<em routeToRecipients
另一種實現相同目的的方法是,因為它具有內置的過濾功能。
使用此方法,我們可以指定頻道和子流程以進行廣播。
6.1. 接收者
在下面的代碼中,我們將根據我們的條件,指定multipleof3Channel、remainderIs1Channel和remainderIsTwoChannel作為接收者:
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.<Integer> recipient("multipleOfThreeChannel",
this::isMultipleOfThree)
.<Integer> recipient("remainderIsOneChannel",
this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",
this::isRemainderTwo));
}我們還可以無條件地調用 recipient,並且 routeToRecipients 將無條件地發佈到該目標目的地。
6.2. recipientFlow
並且請注意,routeToRecipients 允許我們定義完整的流程,就像 publishSubscribeChannel 一樣。
讓我們修改上述代碼,並指定一個 匿名子流程作為第一個接收者:
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("mutipleOfThreeChannel"))
...);這個子流程將接收整個消息序列,因此我們需要像之前一樣進行過濾,以獲得相同的行為。
再次強調,一個IntegrationFlow Bean 已經足夠滿足我們的需求。
現在我們繼續討論if-else 組件。其中之一是Filter。
7. 使用 if-then 流程
我們之前已經在所有示例中使用了 過濾器 。 令人高興的是,我們可以不僅指定進一步處理的條件,還可以指定丟棄的消息 通道或流程 。
我們可以將丟棄流程和通道視為 else 塊:
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree,
notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow
.discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel"))
.channel("multipleofThreeChannel");
}在這種情況下,我們已實現 if-else 路由邏輯:
- 如果數字不是3的倍數,則將消息流向丟棄流程;我們這裏使用流程,因為需要更多邏輯來確定其目標通道。
- 在丟棄流程中,如果數字的餘數不是1,則將這些消息流向丟棄通道。
8. 計算值的啓用
最後,讓我們嘗試 route 方法,它比 routeToRecipients 提供了更多的控制。 它的優點在於一個 路由器 可以將流程分割成任意數量的部分,而 過濾器 只能進行兩個分割。
8.1. channelMapping
讓我們定義我們的 IntegrationFlow Bean:
@Bean
public IntegrationFlow classify() {
return classify -> classify.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleOfThreeChannel")
.channelMapping(1, "remainderIsOneChannel")
.channelMapping(2, "remainderIsTwoChannel"));
}在上述代碼中,我們通過計算除法來計算路由鍵:
route(p -> p % 3,...根據此密鑰,我們路由消息:
channelMapping(0, "multipleof3Channel")8.2. subFlowMapping
現在,與其它方式類似,我們可以通過指定subFlowMapping,來獲得更大的控制權,將channelMapping替換為subFlowMapping。
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))或者,通過調用handle方法而不是channel方法,可以獲得更多的控制:
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");在這種情況下,子流程將在 route() 方法後返回到主流程,因此我們需要指定通道 remainderIsTwoChannel。
9. 結論
在本教程中,我們探討了如何使用子流程過濾和路由消息的一些方法。