1. 概述
Axon 框架幫助我們構建事件驅動的微服務系統。在《Axon 框架指南》中,我們通過一個簡單的 Axon Spring Boot 應用程序來了解 Axon。該應用程序可以創建和更新訂單,還可以確認和發貨這些訂單。
在《Axon 框架中查詢 Dispatching》中,我們添加了更多查詢到 OrderQueryService。
查詢通常用於 UI,這些 UI 通常調用 REST 端點。
在本教程中,我們將 創建所有查詢的 REST 端點。我們還將從集成測試中使用這些端點。
2. 使用查詢在 REST 端點中
我們可以通過向一個標註了 @RestController 的類添加函數來添加 REST 端點。 我們將使用 OrderRestEndpoint 類來實現這一點。 之前我們直接使用了 QueryGateway 在控制器中,現在我們將替換注入的 QueryGateway 用於 OrderQueryService,該服務我們已經在 Axon Framework 中通過 Dispatching Queries 實現。 這樣,控制器函數的關注點就只在於將行為綁定到 REST 路徑。
所有端點都在項目中的 order-api.http 文件中列出。 感謝這個文件,我們使用 IntelliJ 作為 IDE 時可以調用端點。
2.1. 點對點查詢
點對點查詢只返回一個響應,因此易於實現:
@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return orderQueryService.findAllOrders();
}
Spring 等待 CompletableFuture 解決,並返回 JSON 格式的 payload。 我們可以通過調用 localhost:8080/all-orders 來測試,以獲得所有訂單在一個數組中。
在乾淨的設置下,如果我們首先使用 post 調用 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768 和 http://localhost:8080/ship-order 添加兩個訂單,我們應該在調用 http://localhost:8080/all-orders 時看到如下內容:
[
{
"orderId": "72d67527-a27c-416e-a904-396ebf222344",
"products": {
"Deluxe Chair": 1
},
"orderStatus": "SHIPPED"
},
{
"orderId": "666a1661-474d-4046-8b12-8b5896312768",
"products": {},
"orderStatus": "CREATED"
}
]
2.2. 流式查詢
流式查詢將返回一個事件流並最終關閉。 我們可以等待流完成併發送響應,但更有效率的是直接流式傳輸。 我們通過使用 Server-Send 事件來實現這一點:
@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
return orderQueryService.allOrdersStreaming();
}
通過添加 media type,Spring 瞭解我們希望響應作為 server-send events。 這意味着每個訂單都單獨發送。 如果客户端支持 server-send events,localhost:8080/all-orders-streaming 將返回所有訂單一個一個。
與點對點查詢一樣,如果我們在數據庫中添加了兩個訂單,我們將看到如下結果:
data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}
這都是獨立的 server send events。
2.3. 散裝收集查詢
將 Axon 查詢返回的響應合併的邏輯已經在 OrderQueryService 中實現。 這使得實現與點對點查詢非常相似,因為只有一個響應。 例如,要使用散裝收集查詢添加一個端點:
@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId);
}
調用 http://localhost:8080/total-shipped/Deluxe Chair 將返回 Deluxe Chair 已發送的總數,包括 LegacyQueryHandler 中的 234。 如果來自 ship-order 的調用仍然存在於數據庫中,它將返回 235。
2.4. 訂閲查詢
與流式查詢不同,訂閲查詢可能永遠不會結束。 因此,對於訂閲查詢,等待其完成不可取。 我們再次利用 server-send events 來添加端點:
@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
return orderQueryService.orderUpdates(orderId);
}
對 http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768 進行調用將為該訂單提供更新流。 通過對 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3dd 進行 post 調用,我們觸發更新。 該更新將作為 server-send event 發送。
我們將看到初始狀態和更新後的狀態。 連接保持打開狀態以接收進一步的更新。
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}
正如我們所看到的,更新包含我們添加的產品。
3. Integration Tests
For the integration tests, let’s use WebClient.
For these tests, we’ll run the whole application using@SpringBootTest, first changing the state using the other REST endpoints. These other REST endpoints trigger one or multiple commands to create one or numerous events. To create orders, we’ll use the endpoints that were added in A Guide to the Axon Framework. We use the @DirtiesContext annotation on each test, so events created in one test don’t affect another.
Instead of running Axon Server during the integration tests, we set axon.axonserver.enabled=false in application.properties in our src/test/resources. This way, we’ll use the non-distributed gateways, which run faster and don’t require Axon Server. The gateways are the instances handling the three different kinds of messages.
We can create some helper methods to make our tests more readable. These helper functions provide the correct types and set HTTP headers when needed. For example:
private void verifyVoidPost(WebClient client, String uri) {
StepVerifier.create(retrieveResponse(client.post()
.uri(uri)))
.verifyComplete();
}
This is useful for calling post endpoints with a void return type. It will use the retrieveResponse() helper function to do the call and verify it’s complete. Such things are used often and take a couple of lines of code. We make the tests more readable and maintainable by putting them in helper functions.
3.1. Testing the Point-to-Point Query
To test the /all-orders REST endpoint, let’s create one order and then validate if we can retrieve the created order. To be able to do this, we first need to create a WebClient. The web client is a reactive instance we can use to make HTTP calls. After the call to create an order, we fetch all orders and verify the result:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
.uri("http://localhost:" + port + "/all-orders")))
.expectNextMatches(list -> 1 == list.size() && list.get(0)
.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();
Since it’s reactive, we can use a StepVerifier from reactor-test to validate the response.
We expect just one Order in the list, the one we just created. Furthermore, we expect the Order to have the CREATED order status.
3.2. Testing the Streaming Query
The streaming query might return multiple orders. We also want to test if the stream completes. For the test, we’ll create three new random orders and then test the streaming query response:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
for (int i = 0; i < 3; i++) {
createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
.uri("http://localhost:" + port + "/all-orders-streaming")))
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();
With the verifyComplete() at the end, we ensure the stream is closed. We should note that it’s possible to implement a streaming query in such a way that it doesn’t complete. In this case, it does, and it’s important to verify it.
3.3. Testing the Scatter-Gather Query
To test the scatter-gather query, we need to ensure the result from multiple handlers is combined. We ship one chair using an endpoint. We then retrieve all the shipped chairs. As theLegacyQueryHandler returns 234 for the chair, the result should be 235.
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
.uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
.assertNext(r -> assertEquals(235, r))
.verifyComplete();
The retrieveIntegerResponse() helper function returns an integer from the response body.
3.4. Testing the Subscription Query
The subscription query will stay active as long as we don’t close the connection. We’d like to test both the initial result and the updates. Therefore, we use a ScheduledExecutorService so we can use multiple threads in the test. The service allows updating an order from one Thread while validating the returned orders in another. To make it a bit more readable, we use a different method to do updates:
private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
String base = "http://localhost:" + port + "/order/" + orderId;
verifyVoidPost(client, base + "/product/" + productId);
verifyVoidPost(client, base + "/product/" + productId + "/increment");
// and some more
}
The method creates and uses its own instance of a web client to not interfere with the one used to verify the response.
The actual test will call this from the executor and validate the updates:
//Create two webclients, creating the id's for the test, and create an order.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
.uri("http://localhost:" + port + "/order-updates/" + orderId)))
.assertNext(p -> assertTrue(p.getProducts().isEmpty()))
//Some more assertions.
.assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
.thenCancel()
.verify();
} finally {
executor.shutdown();
}
We should note that we wait one second before the updates to be sure we don’t miss the first update. We use a random UUID to generate the productId, which is used both for updating and verifying the results. Each change should trigger an update.
Depending on the expected state after the update, we add an assertion. We need to call thenCancel() to end the test, as the subscription would stay open without it. A finally block is used to ensure we always close the executor.
4. 結論
在本文中,我們學習瞭如何為查詢添加 REST 端點。這些端點可用於構建 UI。
我們還學習瞭如何使用 WebClient 測試這些端點。
有關此主題的任何其他問題,請查看 Discuss AxonIQ。