1. 引言
在創建軟件能力時,從不同來源檢索數據並將其聚合到響應中是一個日常活動。在微服務中,這些來源通常是外部 REST API。
在本教程中,我們將使用 Java 的 CompletableFuture 以高效的方式從多個外部 REST API 中並行檢索數據。
2. 使用並行調用 REST API 的原因
設想一個場景,我們需要更新對象中的多個字段,每個字段的值來自外部 REST 調用。一種替代方案是依次調用每個 API 來更新每個字段。
但是,等待一個 REST 調用完成後再啓動另一個調用會增加服務的響應時間。例如,如果我們調用兩個每個持續 5 秒的 API,總時間至少為 10 秒,因為第二個調用需要等待第一個完成。
相反,我們可以並行調用所有 API,這樣總時間將等於最慢的 REST 調用時間。例如,一個調用持續 7 秒,另一個持續 5 秒。在這種情況下,我們將等待 7 秒,因為我們已並行處理所有內容,並且必須等待所有結果完成。
因此,並行調用是一種極好的替代方案,可以減少服務的響應時間,使其更具可擴展性並改善用户體驗。
3. 使用 CompletableFuture 進行並行化
Java 中的 CompletableFuture 類是一個方便的工具,用於組合和運行不同的並行任務,並處理單個任務的錯誤。
在以下部分,我們將使用它來組合和運行每個輸入列表中的三個 REST 調用。
3.1. 創建演示應用程序
首先,我們定義目標 POJO 的更新:
public class Purchase {
String orderDescription;
String paymentDescription;
String buyerName;
String orderId;
String paymentId;
String userId;
// all-arg constructor, getters and setters
}
Purchase 類有三個字段需要更新,每個字段通過一個 ID 查詢一個 REST 調用。
首先,我們創建一個定義 RestTemplate 豆和 REST 調用用於域名的類:
@Component
public class PurchaseRestCallsAsyncExecutor {
RestTemplate restTemplate;
static final String BASE_URL = "https://internal-api.com";
// all-arg constructor
}
現在,我們定義 /orders API 調用:
public String getOrderDescription(String orderId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
String.class);
return result.getBody();
}
然後,我們定義 /payments API 調用:
public String getPaymentDescription(String paymentId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
String.class);
return result.getBody();
}
最後,我們定義 /users API 調用:
public String getUserName(String userId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
String.class);
return result.getBody();
}
所有三個方法都使用 getForEntity() 方法來執行 REST 調用,並將結果包裝在 ResponseEntity 對象中。
然後,我們調用 getBody() 來從 REST 調用中獲取響應正文。
3.2. 使用 CompletableFuture 執行多個 REST 調用
現在,我們創建一個用於構建和運行三個 CompletableFuture 的方法:
public void updatePurchase(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
).join();
}
我們使用了 allOf() 方法來構建我們的 CompletableFuture 的步驟。每個參數是另一個 CompletableFuture 的形式,它構建了另一個 CompletableFuture,它使用 REST 調用和其結果。
為了構建每個並行任務,我們首先使用了 supplyAsync() 方法來提供 Supplier,從中我們檢索數據。然後,我們使用 thenAccept() 來消費 supplyAsync() 從中檢索的結果,並將其設置為 Purchase 類中的相應字段。
在 allOf() 的末尾,我們剛剛構建了任務,沒有采取任何行動。
最後,我們調用 join() 在末尾運行所有任務並行並收集其結果。 由於 join() 是一個線程阻塞操作,因此我們只在末尾調用它,而不是在每個任務步驟中調用它。 這是為了優化應用程序的性能,通過減少線程阻塞次數。
由於我們沒有向 supplyAsync() 方法提供自定義 ExecutorService,因此所有任務都在相同的執行器中運行。 默認情況下,Java 使用 ForkJoinPool.commonPool()。
通常,為了更好地控制線程池參數,最好指定自定義 ExecutorService 到 supplyAsync()。
3.3. 執行每個列表中的多個 REST 調用
為了將 updatePurchase() 方法應用於集合,我們可以簡單地在 forEach() 循環中調用它:
public void updatePurchases(List<Purchase> purchases) {
purchases.forEach(this::updatePurchase);
}
我們的 updatePurchases() 方法接收一個 Purchase 的列表,並將先前創建的 updatePurchase() 方法應用於每個元素。
每次調用 updatePurchases() 時,都會運行三個並行任務,如 CompletableFuture 中定義的那樣。 因此,每個購買都有自己的 CompletableFuture 對象,用於在並行 REST 調用中運行。
4. Handling Errors
在分佈式系統中,服務不可用或網絡故障是很常見的。這些故障可能發生在外部 REST API 中,而作為這些 API 的客户端,我們可能無法感知到。例如,如果應用程序已停止,發送到網絡的請求永遠不會完成。
4.1. 使用 優雅地處理錯誤
在 REST 調用執行過程中可能會發生異常。例如,如果 API 服務已停止或我們輸入了無效參數,我們將會收到錯誤。
因此,我們可以使用 方法分別處理每個 REST 調用異常:
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
該方法參數是一個 ,包含上一個任務的結果和異常作為參數。
為了説明,讓我們將 步驟添加到我們 的一個步驟中:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription)
.handle((result, exception) -> {
if (exception != null) {
// handle exception
return null;
}
return result;
})
).join();
}
在上述示例中, 從 中獲得一個 類型,該類型由 通過調用 調用的。
然後,它將任何在 操作中拋出的錯誤存儲在 中。因此,我們使用它來檢查是否存在錯誤並正確地在 語句中處理它。
最後, 如果沒有拋出異常,則返回作為參數傳遞的值;否則,返回 。
4.2. 處理 REST 調用超時
當我們使用 時,我們可以指定與我們在 REST 調用中定義的類似的任務超時。因此,如果任務在指定時間內未完成,Java 將使用 結束任務執行。
要做到這一點,讓我們修改我們 的一個任務,以處理超時:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription)
.orTimeout(5, TimeUnit.SECONDS)
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// handle exception
return null;
}
return result;
})
).join();
}
我們已經在 構造器中添加了 行,如果任務在 秒內未完成,則強制停止任務執行。
我們還在 方法中添加了一個 語句,以單獨處理 。
向 添加超時可確保任務始終完成。這對於避免線程無限期地等待操作的結果非常重要,而該操作可能永遠不會完成。 這減少了長時間處於 狀態中的線程數量,並提高了應用程序的健康狀況。
5. 結論
在處理分佈式系統時,一個常見的任務是向不同的API發出REST調用,以構建適當的響應。
在本文中,我們看到了如何使用CompletableFuture 構建一個集合中每個對象的並行REST調用任務。
我們還看到了如何優雅地處理超時和一般異常,使用handle()方法。