博客 / 詳情

返回

深入理解 Parallel.ForEachAsync:C#.NET 並行調度模型揭秘

簡介

┌──────────────┐
│ 數據源枚舉器 │  IEnumerable / IAsyncEnumerable
└──────┬───────┘
       ↓
┌────────────────────┐
│ 併發調度器(Pump) │  ← 控制最多 N 個任務
└──────┬─────────────┘
       ↓
┌────────────────────┐
│ async Body(item)   │  ← 異步邏輯
└──────┬─────────────┘
       ↓
┌────────────────────┐
│ 完成 / 異常 / 取消 │
└────────────────────┘
  • 不是一次性啓動所有任務
  • 是一個 “邊消費、邊執行、邊補位” 的模型

核心設計目標

在異步場景下,維持固定併發度,持續消耗數據源,直到完成
痛點 ForEachAsync 的解法
Task.WhenAll 不限流 MaxDegreeOfParallelism
SemaphoreSlim 模板繁瑣 內建
async foreach 調度複雜 自動處理

調度模型核心:滑動窗口(Sliding Window)

併發度不是“一次性分配”

假設:

MaxDegreeOfParallelism = 3
items = [A, B, C, D, E, F]

執行順序是這樣的:

啓動 A  B  C   (佔滿 3 個槽位)
      │  │  │
      │  │  └─ C 完成 → 啓動 D
      │  └──── B 完成 → 啓動 E
      └─────── A 完成 → 啓動 F

這就是滑動窗口

任何時刻:

  • 運行中的任務 ≤ MaxDegreeOfParallelism
  • 永遠“有空位就補”

內部不是 Parallel.For,而是 Task 泵

關鍵認知

Parallel.ForEachAsync 並沒有複用 Parallel.For 的線程切分模型

原因很簡單:

  • Parallel.For → 同步代碼 + 線程
  • ForEachAsync → 異步代碼 + continuation

內部本質是一個 Task Pump(任務泵)

偽代碼級理解(高度簡化)

async Task RunAsync()
{
    using var enumerator = source.GetEnumerator();
    var runningTasks = new List<Task>();

    while (true)
    {
        while (runningTasks.Count < maxDegree && enumerator.MoveNext())
        {
            var item = enumerator.Current;
            runningTasks.Add(ProcessAsync(item));
        }

        if (runningTasks.Count == 0)
            break;

        var finished = await Task.WhenAny(runningTasks);
        runningTasks.Remove(finished);
    }
}

真實實現更復雜(異常、取消、ValueTaskExecutionContext),

為什麼它天然適合 async,而 Parallel.For 不行?

對比一下兩者的“調度單位”

API 調度單位
Parallel.For 線程 + 同步委託
ForEachAsync Task / ValueTask

async 的關鍵特性:

  • await 會 釋放線程
  • 繼續執行靠 Continuation
  • 不綁定固定線程

所以 ForEachAsync

  • 不關心“用哪個線程”
  • 只關心“同時有多少個未完成任務”

枚舉器訪問是串行的

數據源的枚舉(MoveNext)是串行的

也就是説:

items.GetEnumerator().MoveNext()

只會在 一個調度上下文 中執行,不會併發訪問枚舉器。

為什麼?

  • IEnumerable<T> 默認 不是線程安全的
  • 併發枚舉會直接炸

所以 ForEachAsync 的並行點在:

  • Body 執行
  • 不是枚舉階段

異常與取消的調度策略

異常模型

  • 任意一個 Body 拋異常
  • 會:

    • 請求取消
    • 等待已啓動任務結束
    • 最終聚合拋出異常

行為類似:

await Task.WhenAll(...)

CancellationToken 不是“硬中斷”

Token 被取消後:

  • 不再啓動新任務
  • 已啓動任務 需要自己響應 ct
await Parallel.ForEachAsync(items, async (item, ct) =>
{
    ct.ThrowIfCancellationRequested();
    await DoAsync(item, ct);
});

為什麼返回 ValueTask 而不是 Task?

原因只有一個:性能

  • Body 很可能:

    • 同步完成
    • 快速失敗
  • ValueTask

    • 避免不必要的 Task 分配
    • 降低 GC 壓力

和 SemaphoreSlim 手寫模型的本質對比

手寫版本

var sem = new SemaphoreSlim(5);

var tasks = items.Select(async item =>
{
    await sem.WaitAsync();
    try
    {
        await ProcessAsync(item);
    }
    finally
    {
        sem.Release();
    }
});

await Task.WhenAll(tasks);

ForEachAsync 內部其實就是:

  • SemaphoreSlim + Task.WhenAny
  • 加上:

    • 枚舉安全
    • 異常聚合
    • 取消傳播
    • ExecutionContext 管理

什麼時候不該用 Parallel.ForEachAsync?

  • 強順序依賴
  • 需要複雜生產者-消費者關係
  • 需要背壓、緩衝區
  • 多階段流水線

這些場景用:

  • Channel
  • TPL Dataflow

總結

Parallel.ForEachAsync = 一個為 async 設計的、滑動窗口式的併發任務調度器

它不是魔法,也不是線程並行,而是:

  • 控併發
  • 自動補位
  • 資源友好
  • 工程可控
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.