簡介
┌──────────────┐
│ 數據源枚舉器 │ IEnumerable / IAsyncEnumerable
└──────┬───────┘
↓
┌────────────────────┐
│ 併發調度器(Pump) │ ← 控制最多 N 個任務
└──────┬─────────────┘
↓
┌────────────────────┐
│ async Body(item) │ ← 異步邏輯
└──────┬─────────────┘
↓
┌────────────────────┐
│ 完成 / 異常 / 取消 │
└────────────────────┘
- 不是一次性啓動所有任務
- 是一個 “邊消費、邊執行、邊補位” 的模型
核心設計目標
在異步場景下,維持固定併發度,持續消耗數據源,直到完成
| 痛點 | ForEachAsync 的解法 |
|---|---|
| Task.WhenAll 不限流 | MaxDegreeOfParallelism |
| SemaphoreSlim 模板繁瑣 | 內建 |
| async foreach 調度複雜 | 自動處理 |
調度模型核心:滑動窗口(Sliding Window)
併發度不是“一次性分配”
假設:
MaxDegreeOfParallelism = 3
items = [A, B, C, D, E, F]
執行順序是這樣的:
啓動 A B C (佔滿 3 個槽位)
│ │ │
│ │ └─ C 完成 → 啓動 D
│ └──── B 完成 → 啓動 E
└─────── A 完成 → 啓動 F
這就是滑動窗口
任何時刻:
- 運行中的任務 ≤
MaxDegreeOfParallelism - 永遠“有空位就補”
內部不是 Parallel.For,而是 Task 泵
關鍵認知
Parallel.ForEachAsync 並沒有複用 Parallel.For 的線程切分模型
原因很簡單:
Parallel.For→ 同步代碼 + 線程ForEachAsync→ 異步代碼 +continuation
內部本質是一個 Task Pump(任務泵)
偽代碼級理解(高度簡化)
async Task RunAsync()
{
using var enumerator = source.GetEnumerator();
var runningTasks = new List<Task>();
while (true)
{
while (runningTasks.Count < maxDegree && enumerator.MoveNext())
{
var item = enumerator.Current;
runningTasks.Add(ProcessAsync(item));
}
if (runningTasks.Count == 0)
break;
var finished = await Task.WhenAny(runningTasks);
runningTasks.Remove(finished);
}
}
真實實現更復雜(異常、取消、ValueTask、ExecutionContext),
為什麼它天然適合 async,而 Parallel.For 不行?
對比一下兩者的“調度單位”
| API | 調度單位 |
|---|---|
| Parallel.For | 線程 + 同步委託 |
| ForEachAsync | Task / ValueTask |
async 的關鍵特性:
await會 釋放線程- 繼續執行靠
Continuation - 不綁定固定線程
所以 ForEachAsync:
- 不關心“用哪個線程”
- 只關心“同時有多少個未完成任務”
枚舉器訪問是串行的
數據源的枚舉(MoveNext)是串行的
也就是説:
items.GetEnumerator().MoveNext()
只會在 一個調度上下文 中執行,不會併發訪問枚舉器。
為什麼?
IEnumerable<T>默認 不是線程安全的- 併發枚舉會直接炸
所以 ForEachAsync 的並行點在:
Body執行- 不是枚舉階段
異常與取消的調度策略
異常模型
- 任意一個
Body拋異常 -
會:
- 請求取消
- 等待已啓動任務結束
- 最終聚合拋出異常
行為類似:
await Task.WhenAll(...)
CancellationToken 不是“硬中斷”
Token 被取消後:
- 不再啓動新任務
- 已啓動任務 需要自己響應
ct
await Parallel.ForEachAsync(items, async (item, ct) =>
{
ct.ThrowIfCancellationRequested();
await DoAsync(item, ct);
});
為什麼返回 ValueTask 而不是 Task?
原因只有一個:性能
-
Body 很可能:
- 同步完成
- 快速失敗
-
ValueTask:- 避免不必要的
Task分配 - 降低
GC壓力
- 避免不必要的
和 SemaphoreSlim 手寫模型的本質對比
手寫版本
var sem = new SemaphoreSlim(5);
var tasks = items.Select(async item =>
{
await sem.WaitAsync();
try
{
await ProcessAsync(item);
}
finally
{
sem.Release();
}
});
await Task.WhenAll(tasks);
ForEachAsync 內部其實就是:
SemaphoreSlim+Task.WhenAny-
加上:
- 枚舉安全
- 異常聚合
- 取消傳播
ExecutionContext管理
什麼時候不該用 Parallel.ForEachAsync?
- 強順序依賴
- 需要複雜生產者-消費者關係
- 需要背壓、緩衝區
- 多階段流水線
這些場景用:
ChannelTPL Dataflow
總結
Parallel.ForEachAsync = 一個為 async 設計的、滑動窗口式的併發任務調度器
它不是魔法,也不是線程並行,而是:
- 控併發
- 自動補位
- 資源友好
- 工程可控