大家好!今天我想和大家聊聊 Java 異步編程中的那些"坑"。如果你正在使用 CompletableFuture,或者打算在項目中引入它,這篇文章絕對不容錯過。我會通過實際案例帶你避開那些我(和許多開發者)曾經踩過的坑。
1. CompletableFuture 簡介
CompletableFuture 是 Java 8 引入的強大異步編程工具,它允許我們通過鏈式調用處理異步操作。但強大的工具往往伴隨着複雜性,使用不當就會引發各種併發問題。
┌─────────────┐
│ 任務A │
└──────┬──────┘
│
▼
┌─────────────────────────────────┐
│ CompletableFuture │
└─────────────┬───────────────────┘
│
┌───────────┴───────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ thenApply(任務B) │ │ exceptionally(處理)│
└──────────┬──────────┘ └──────────┬──────────┘
│ │
└──────────┬─────────────┘
▼
┌──────────────────────┐
│ 最終結果 │
└──────────────────────┘
2. 陷阱一:忽略異常處理
這是最常見的錯誤,也是最容易被忽視的。
問題案例
看看這段代碼:
CompletableFuture.supplyAsync(() -> {
// 模擬從遠程服務獲取數據
if (new Random().nextBoolean()) {
throw new RuntimeException("遠程服務調用失敗");
}
return "數據";
}).thenApply(data -> {
// 處理數據
return data.toUpperCase();
}).thenAccept(result -> {
System.out.println("處理結果: " + result);
});
問題在哪? 如果supplyAsync拋出異常,整個任務鏈會中斷,但程序不會崩潰。更糟糕的是,異常被"吞掉"了,你可能根本不知道發生了什麼!
解決方案
- 使用 exceptionally 捕獲異常:
CompletableFuture.supplyAsync(() -> {
// 可能拋出異常的代碼
if (new Random().nextBoolean()) {
throw new RuntimeException("遠程服務調用失敗");
}
return "數據";
}).thenApply(data -> {
return data.toUpperCase();
}).exceptionally(ex -> {
System.err.println("處理異常: " + ex.getMessage());
return "默認值"; // 提供一個默認值繼續鏈式調用
}).thenAccept(result -> {
System.out.println("處理結果: " + result);
});
- 使用 handle 同時處理正常結果和異常:
CompletableFuture.supplyAsync(() -> {
// 可能拋出異常的代碼
if (new Random().nextBoolean()) {
throw new RuntimeException("遠程服務調用失敗");
}
return "數據";
}).handle((data, ex) -> {
if (ex != null) {
System.err.println("處理異常: " + ex.getMessage());
return "默認值";
}
return data.toUpperCase();
}).thenAccept(result -> {
System.out.println("處理結果: " + result);
});
- 使用 whenComplete/whenCompleteAsync 記錄日誌但不影響結果:
CompletableFuture.supplyAsync(() -> {
if (new Random().nextBoolean()) {
throw new RuntimeException("遠程服務調用失敗");
}
return "數據";
}).whenComplete((result, ex) -> {
// 不改變結果,只記錄狀態
if (ex != null) {
System.err.println("操作失敗,記錄異常: " + ex.getMessage());
} else {
System.out.println("操作成功,記錄結果: " + result);
}
}).exceptionally(ex -> {
// 仍然需要處理異常
return "默認值";
}).thenAccept(result -> {
System.out.println("最終結果: " + result);
});
whenComplete方法非常適合日誌記錄、監控和度量收集,它不會改變 CompletableFuture 的結果,但可以觀察結果或異常。
真實項目中,你應該根據業務需求決定是否需要默認值,或者進行重試、記錄日誌等操作。
3. 陷阱二:線程池使用不當
問題案例
// 創建固定大小的線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
List<CompletableFuture<String>> futures = new ArrayList<>();
// 提交10個任務
for (int i = 0; i < 10; i++) {
int taskId = i;
futures.add(CompletableFuture.supplyAsync(() -> {
try {
// 每個任務耗時較長
Thread.sleep(1000);
return "任務" + taskId + "完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, executor).thenApplyAsync(result -> {
// 注意這裏也使用了同一個線程池
try {
Thread.sleep(1000);
return result + " 處理完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, executor));
}
// 等待所有任務完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();
問題在哪? 這段代碼存在潛在的死鎖風險。我們使用了大小為 2 的線程池,但每個任務都會通過thenApplyAsync再次使用相同的線程池提交新任務。當所有線程都被佔用,且都在等待隊列中的任務完成時,會發生死鎖。
重要説明:如果不指定線程池,thenApplyAsync默認使用ForkJoinPool.commonPool(),這是一個全局共享的線程池。在高負載系統中,這可能導致資源競爭,影響其他使用相同池的任務。
┌─────────────────────────────────────────┐
│ 線程池(2個線程) │
└───────────────────┬─────────────────────┘
│
┌───────────────────────────┴───────────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 線程1 │ │ 線程2 │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 任務A(supplyAsync)│ │ 任務B(supplyAsync)│
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│任務A的thenApplyAsync│◄─── 等待線程 ─── 死鎖! ───► │任務B的thenApplyAsync│
└─────────────────┘ └─────────────────┘
解決方案
- 為不同階段使用不同的線程池:
// 創建兩個線程池
ExecutorService computeExecutor = Executors.newFixedThreadPool(2);
ExecutorService processExecutor = Executors.newFixedThreadPool(2);
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
futures.add(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "任務" + taskId + "完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, computeExecutor).thenApplyAsync(result -> {
try {
Thread.sleep(1000);
return result + " 處理完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, processExecutor)); // 使用不同的線程池
}
// 等待所有任務完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
computeExecutor.shutdown();
processExecutor.shutdown();
- 使用無界線程池或容量充足的線程池(謹慎使用,資源可能被耗盡):
// 根據預期負載合理設置線程池大小
ExecutorService executor = Executors.newFixedThreadPool(20);
- 使用不帶 Async 後綴的方法在同一個線程中執行後續階段:
// thenApply而不是thenApplyAsync
futures.add(CompletableFuture.supplyAsync(() -> {
// 長時間任務
return "任務完成";
}, executor).thenApply(result -> { // 注意這裏沒有Async
// 這部分會在上一步驟的線程中執行,不需要從線程池獲取新線程
return result + " 處理完成";
}));
記住:在決定使用哪種線程池策略時,考慮任務的特性(CPU 密集型還是 IO 密集型),以及系統的整體資源狀況。過度競爭共享線程池會導致性能下降。
性能對比
【優化前】單線程池 - 10個任務,每個包含兩個階段
┌───────────────────────────────────────────────────────────┐
│ │
│ 執行時間: ~20秒 │
│ │
│ 線程池使用率: │
│ ██████████████████████████████████████████████████████████ │
│ │
│ 吞吐量: 0.5任務/秒 │
│ │
│ 風險: ⚠️ 高死鎖風險 │
└───────────────────────────────────────────────────────────┘
【優化後】雙線程池 - 同樣的10個任務
┌───────────────────────────────────────────────────────────┐
│ │
│ 執行時間: ~10秒 │
│ │
│ 線程池使用率: │
│ 計算線程池: ████████████████████████████ │
│ 處理線程池: ████████████████████████████ │
│ │
│ 吞吐量: 1.0任務/秒 │
│ │
│ 風險: ✓ 無死鎖風險 │
└───────────────────────────────────────────────────────────┘
4. 陷阱三:超時處理不當
問題案例
try {
String result = CompletableFuture.supplyAsync(() -> {
try {
// 模擬長時間運行的任務
Thread.sleep(10000);
return "任務完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("任務被中斷"); // 這行代碼永遠不會執行
return "任務中斷";
}
}).get(5, TimeUnit.SECONDS); // 等待5秒
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("任務超時");
}
問題在哪? 當get()方法超時,會拋出 TimeoutException,但原任務繼續在後台運行,浪費系統資源。實際生產中,如果有大量此類超時任務,可能導致線程池資源耗盡。
解決方案
Java 9+中使用orTimeout和completeOnTimeout:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
return "任務完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任務中斷";
}
}).orTimeout(5, TimeUnit.SECONDS) // 5秒後拋出異常
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "任務超時,返回默認值";
}
return "其他異常: " + ex.getMessage();
});
String result = future.join();
System.out.println(result);
Java 8 中可以這樣實現超時並取消任務:
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
return "任務完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("任務被中斷了"); // 這次會執行
return "任務中斷";
}
}, executor);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
// 5秒後如果任務還沒完成,取消它
boolean canceled = future.cancel(true);
if (canceled) {
System.out.println("任務已取消");
}
}, 5, TimeUnit.SECONDS);
try {
String result = future.join();
System.out.println(result);
} catch (CompletionException e) {
if (e.getCause() instanceof CancellationException) {
System.out.println("任務被取消");
} else {
System.out.println("任務執行異常: " + e.getMessage());
}
}
executor.shutdown();
scheduler.shutdown();
關於 cancel(boolean mayInterruptIfRunning)的説明:
cancel(true): 允許中斷正在執行的任務。適用於可以安全中斷的長時間運行任務。cancel(false): 只取消尚未開始的任務,已經運行的任務會繼續執行。適用於不應中斷的關鍵任務,或資源釋放依賴於任務正常完成的情況。
選擇哪種取消策略取決於你的業務需求和任務特性。
5. 陷阱四:thenApply 與 thenCompose 混淆
問題案例
CompletableFuture<CompletableFuture<String>> nestedFuture =
CompletableFuture.supplyAsync(() -> "第一步")
.thenApply(result ->
CompletableFuture.supplyAsync(() -> result + " -> 第二步")
);
// 嘗試獲取最終結果
CompletableFuture<String> extractedFuture = nestedFuture.join(); // 返回的是CompletableFuture而不是String
String finalResult = extractedFuture.join(); // 需要再次調用join才能獲取結果
問題在哪? 使用thenApply處理返回另一個 CompletableFuture 的函數時,會導致 Future 嵌套(CompletableFuture<CompletableFuture<T>>),使代碼變得複雜且難以理解。
使用thenApply導致的嵌套:
┌─────────────────────────────────────────┐
│ CompletableFuture<CompletableFuture<T>> │
│ │
│ ┌─────────────────────────────┐ │
│ │ CompletableFuture<T> │ │
│ │ │ │
│ │ ┌───────────────┐ │ │
│ │ │ T │ │ │
│ │ └───────────────┘ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────────┘
解決方案
使用thenCompose方法來平展嵌套的 CompletableFuture:
CompletableFuture<String> flatFuture =
CompletableFuture.supplyAsync(() -> "第一步")
.thenCompose(result ->
CompletableFuture.supplyAsync(() -> result + " -> 第二步")
);
// 直接獲取結果,無需處理嵌套
String finalResult = flatFuture.join();
使用thenCompose平展結果:
┌─────────────────────────────┐
│ CompletableFuture<T> │
│ │
│ ┌───────────────┐ │
│ │ T │ │
│ └───────────────┘ │
└─────────────────────────────┘
規則很簡單:
- 如果你的函數返回值類型 T,使用
thenApply - 如果你的函數返回 CompletableFuture<T>,使用
thenCompose
6. 陷阱五:不當的依賴處理
問題案例
考慮一個獲取用户信息的場景,需要並行調用多個服務:
CompletableFuture<UserProfile> getUserProfile(long userId) {
CompletableFuture<UserBasicInfo> basicInfoFuture =
CompletableFuture.supplyAsync(() -> userService.getBasicInfo(userId));
CompletableFuture<List<Order>> ordersFuture =
basicInfoFuture.thenCompose(basicInfo ->
CompletableFuture.supplyAsync(() -> orderService.getOrders(basicInfo.getUserId()))
);
CompletableFuture<CreditScore> creditScoreFuture =
basicInfoFuture.thenCompose(basicInfo ->
CompletableFuture.supplyAsync(() -> creditService.getScore(basicInfo.getUserId()))
);
// 等待所有數據準備好
return CompletableFuture.allOf(basicInfoFuture, ordersFuture, creditScoreFuture)
.thenApply(v -> {
// 所有Future都完成後合併結果
UserBasicInfo info = basicInfoFuture.join();
List<Order> orders = ordersFuture.join();
CreditScore score = creditScoreFuture.join();
return new UserProfile(info, orders, score);
});
}
問題在哪? 這段代碼看起來合理,但實際上不夠高效。ordersFuture和creditScoreFuture都依賴於basicInfoFuture的結果,但它們本身可以並行執行,而不是一個接一個地執行。
解決方案
- 重構代碼,讓獨立的查詢並行執行:
CompletableFuture<UserProfile> getUserProfile(long userId) {
// 先獲取基本信息
CompletableFuture<UserBasicInfo> basicInfoFuture =
CompletableFuture.supplyAsync(() -> userService.getBasicInfo(userId));
// 一旦有了基本信息,並行啓動其他查詢
CompletableFuture<UserProfile> profileFuture = basicInfoFuture.thenCompose(basicInfo -> {
// 這兩個查詢可以並行執行
CompletableFuture<List<Order>> ordersFuture =
CompletableFuture.supplyAsync(() -> orderService.getOrders(basicInfo.getUserId()));
CompletableFuture<CreditScore> creditScoreFuture =
CompletableFuture.supplyAsync(() -> creditService.getScore(basicInfo.getUserId()));
// 等待兩個並行查詢都完成
return CompletableFuture.allOf(ordersFuture, creditScoreFuture)
.thenApply(v -> {
List<Order> orders = ordersFuture.join();
CreditScore score = creditScoreFuture.join();
return new UserProfile(basicInfo, orders, score);
});
});
return profileFuture;
}
- 使用 thenCombine 合併獨立 Future 的結果:
對於兩個獨立的 Future,可以使用thenCombine方法更優雅地合併結果:
CompletableFuture<UserProfile> getUserProfile(long userId) {
// 獲取基本信息
CompletableFuture<UserBasicInfo> basicInfoFuture =
CompletableFuture.supplyAsync(() -> userService.getBasicInfo(userId));
// 基於基本信息,獲取訂單信息
CompletableFuture<List<Order>> ordersFuture = basicInfoFuture.thenCompose(info ->
CompletableFuture.supplyAsync(() -> orderService.getOrders(info.getUserId())));
// 基於基本信息,獲取信用評分
CompletableFuture<CreditScore> creditScoreFuture = basicInfoFuture.thenCompose(info ->
CompletableFuture.supplyAsync(() -> creditService.getScore(info.getUserId())));
// 使用thenCombine合併訂單和信用評分
CompletableFuture<CombinedData> combinedDataFuture = ordersFuture.thenCombine(
creditScoreFuture, (orders, creditScore) -> new CombinedData(orders, creditScore));
// 最後合併所有數據
return basicInfoFuture.thenCombine(combinedDataFuture, (info, data) ->
new UserProfile(info, data.orders, data.creditScore));
}
// Java 14+ 可以使用record簡化
record CombinedData(List<Order> orders, CreditScore creditScore) {}
// 或者傳統類定義
class CombinedData {
final List<Order> orders;
final CreditScore creditScore;
CombinedData(List<Order> orders, CreditScore creditScore) {
this.orders = orders;
this.creditScore = creditScore;
}
}
優化的執行流程:
┌────────────────┐
│ 獲取基本信息 │
└────────┬───────┘
│
▼
┌────────────────────────────────────┐
│ 獲取到用户基本信息 │
└───────────┬─────────────┬──────────┘
│ │
並行執行▼ ▼並行執行
┌────────────────┐ ┌────────────────┐
│ 獲取訂單信息 │ │ 獲取信用評分 │
└────────┬───────┘ └───────┬────────┘
│ │
└──thenCombine────┘
▼
┌────────────────┐
│ 組裝用户檔案 │
└────────────────┘
性能對比
【優化前】串行依賴處理
┌──────────────────────────────────────────────────────────┐
│ │
│ 執行時間: 基本信息(100ms) + 訂單(200ms) + 信用(150ms) │
│ = 450ms │
│ │
│ API調用時序: │
│ 基本信息: ████████████ │
│ 訂單信息: ████████████████████ │
│ 信用評分: ███████████ │
│ │
│ 總響應時間: ~450ms │
└──────────────────────────────────────────────────────────┘
【優化後】並行依賴處理
┌──────────────────────────────────────────────────────────┐
│ │
│ 執行時間: 基本信息(100ms) + max(訂單(200ms), 信用(150ms)) │
│ = 300ms │
│ │
│ API調用時序: │
│ 基本信息: ████████████ │
│ 訂單信息: ████████████████████ │
│ 信用評分: ███████████████ │
│ [並行執行] │
│ │
│ 總響應時間: ~300ms (節省33%) │
└──────────────────────────────────────────────────────────┘
對於多個獨立的 Future,你也可以考慮使用thenCombine的擴展版本,例如CompletableFuture.allOf(...).thenApply(...)或通過多個thenCombine調用組合結果。
7. 陷阱六:資源泄漏
問題案例
下面的代碼試圖並行處理多個文件:
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (File file : files) {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
return reader.lines().count();
} catch (IOException e) {
// 不必要的包裝,CompletableFuture會自動處理
throw new CompletionException(e);
}
});
futures.add(future);
}
// 等待所有任務完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
問題在哪? 雖然使用了 try-with-resources 確保文件被關閉,但如果任務被取消或超時,無法保證資源被正確釋放。另外,異常處理上有不必要的包裝,因為 CompletableFuture 會自動將未捕獲的異常包裝為 CompletionException。
解決方案
- 結合 AutoCloseable 接口優雅管理資源:
class ResourceManager<R extends AutoCloseable, T> {
private final Function<R, T> processor;
private final Supplier<R> resourceSupplier;
public ResourceManager(Supplier<R> resourceSupplier, Function<R, T> processor) {
this.resourceSupplier = resourceSupplier;
this.processor = processor;
}
public CompletableFuture<T> process() {
return CompletableFuture.supplyAsync(() -> {
try (R resource = resourceSupplier.get()) {
return processor.apply(resource);
} catch (Exception e) {
// 直接拋出原始異常,避免不必要的包裝
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException("處理資源時出錯", e);
}
});
}
}
// 使用示例
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (File file : files) {
ResourceManager<BufferedReader, Long> manager = new ResourceManager<>(
() -> {
try {
return new BufferedReader(new FileReader(file));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
reader -> reader.lines().count()
);
futures.add(manager.process());
}
- 使用框架提供的異步資源管理:
如果你使用的是 Quarkus 等現代 Java 框架,可以利用其提供的異步資源管理功能:
// Quarkus示例
@Asynchronous
public CompletionStage<Long> countLines(File file) {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
return CompletableFuture.completedFuture(reader.lines().count());
} catch (IOException e) {
CompletableFuture<Long> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
// 使用
List<CompletionStage<Long>> futures = files.stream()
.map(this::countLines)
.collect(Collectors.toList());
CompletableFuture.allOf(futures.stream()
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new))
.join();
- 使用專門的資源處理器:
class ResourceHandler<T> implements AutoCloseable {
private final T resource;
private final Consumer<T> cleanup;
public ResourceHandler(T resource, Consumer<T> cleanup) {
this.resource = resource;
this.cleanup = cleanup;
}
public <U> CompletableFuture<U> process(Function<T, U> processor) {
return CompletableFuture.supplyAsync(() -> {
try {
return processor.apply(resource);
} catch (Exception e) {
// 直接拋出異常,避免不必要的包裝
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
});
}
@Override
public void close() {
cleanup.accept(resource);
}
}
// 使用並註冊shutdown hook
List<ResourceHandler<BufferedReader>> handlers = new ArrayList<>();
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (File file : files) {
try {
BufferedReader reader = new BufferedReader(new FileReader(file));
ResourceHandler<BufferedReader> handler = new ResourceHandler<>(reader, r -> {
try {
r.close();
} catch (IOException e) {
System.err.println("關閉資源失敗: " + e.getMessage());
}
});
handlers.add(handler);
futures.add(handler.process(r -> r.lines().count()));
} catch (IOException e) {
System.err.println("無法打開文件: " + e.getMessage());
}
}
// 添加shutdown hook確保資源正確釋放
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
handlers.forEach(ResourceHandler::close);
}));
通過這些方式,即使在異常情況下,也能確保資源被正確釋放。
總結與落地建議
使用 CompletableFuture 時,請記住以下幾點:
- 始終處理異常 - 使用
exceptionally、handle或whenComplete確保異常不被吞噬 - 合理規劃線程池 - 為不同類型的任務使用不同的線程池,避免死鎖;注意默認線程池的限制
- 正確處理超時 - 實現超時機制並確保任務被適當地取消;理解
cancel(true)和cancel(false)的區別 - 理解 API 差異 - 掌握
thenApply/thenCompose/thenCombine等方法的區別和適用場景 - 並行處理獨立任務 - 分析任務依賴關係,最大化並行執行;使用
thenCombine合併獨立結果 - 確保資源釋放 - 結合
AutoCloseable和 shutdown hook 確保資源在各種情況下都能正確釋放 - 區分同步和異步變體 - 明確什麼時候使用帶 Async 後綴的方法
- 考慮測試場景 - 編寫單元測試驗證異步邏輯,包括異常和超時情況
- 避免不必要的異常包裝 - 瞭解 CompletableFuture 本身會處理異常包裝,避免重複包裝
希望這篇文章對你有所幫助!異步編程雖然強大,但需要謹慎使用。通過避開這些常見陷阱,你可以構建更穩定、高效的 Java 應用程序。
感謝您耐心閲讀到這裏!如果覺得本文對您有幫助,歡迎點贊 👍、收藏 ⭐、分享給需要的朋友,您的支持是我持續輸出技術乾貨的最大動力!
如果想獲取更多 Java 技術深度解析,歡迎點擊頭像關注我,後續會每日更新高質量技術文章,陪您一起進階成長~