博客 / 詳情

返回

深入理解 C#.NET Parallel:並行編程的正確打開方式

簡介

Parallel 並行編程是 .NET 中利用多核 CPU 進行併發執行的編程模型,主要通過 System.Threading.Tasks 命名空間中的 Parallel 類實現。它允許將任務分解成多個子任務,在多個線程上同時執行,以加速 CPU 密集型操作(如循環計算、數據處理)。

核心組件:

  • Parallel 類:提供靜態方法如 Parallel.For、Parallel.ForEach、Parallel.Invoke,用於並行執行循環或方法。
  • PLINQ(Parallel LINQ):LINQ 的並行版本,通過 AsParallel() 擴展方法啓用並行查詢。
  • 底層依賴:基於 Task Parallel Library (TPL),利用線程池(ThreadPool)管理線程,避免手動創建線程的開銷。

關鍵概念:

  • 並行度(Degree of Parallelism):控制併發線程數,默認基於 CPU 核心(e.g., 4 核 CPU 可能 4 線程)。
  • 數據並行:將數據分成塊(chunks),每個線程處理一塊(e.g., Parallel.For)。
  • 任務並行:同時執行獨立方法(e.g., Parallel.Invoke)。

Parallel 的核心定位與價值

Parallel 類位於 System.Threading.Tasks 命名空間,是 .NET 提供的 “高層級並行工具”,核心價值:

  • 自動線程調度:無需手動創建 / 管理線程,由 TPL 自動分配線程池線程,實現負載均衡;
  • 簡化並行邏輯:用類似串行循環的語法實現並行執行,降低並行編程門檻;
  • 適配多核 CPU:默認根據 CPU 核心數調整並行度,最大化利用硬件資源;
  • 支持取消 / 超時:可通過 ParallelOptions 控制並行過程,如取消執行、限制並行數。
Parallel僅適合 CPU 密集型任務(如數據計算、圖像處理、複雜邏輯運算);I/O 密集型任務(文件讀寫、網絡請求)應使用async/await,否則線程會阻塞,浪費資源。

核心概念與基礎

並行 vs. 併發 vs. 異步:

  • 併發: 多個任務在重疊的時間段內執行(不一定同時)。單核上通過時間片切換實現。
  • 並行: 多個任務真正同時執行,需要多核或多處理器支持。是併發的一種特例。
  • 異步: 一種編程模式,允許啓動一個操作後不阻塞當前線程,待操作完成後再處理結果。異步操作可以利用併發或並行(通常通過線程池),但其核心目標是非阻塞和響應性。

線程 vs. 任務:

  • 線程: 操作系統調度的基本單位。直接操作 Thread 類相對底層,需要手動管理生命週期、同步等,易出錯且開銷較大。
  • 任務: TPL 引入的核心概念 TaskTask<TResult>。代表一個異步操作單元。任務通常由線程池線程執行,但提供了更高級別的抽象:

    • 任務調度: 由 TaskScheduler 管理,默認使用線程池。
    • 組合與延續: 使用 ContinueWith, WhenAll, WhenAny 等輕鬆組合任務。
    • 異常傳播: 異常被封裝在 AggregateException 中,便於統一處理。
    • 取消支持: 與 CancellationTokenSource/CancellationToken 集成。
    • 狀態跟蹤: 有 Created, Running, RanToCompletion, Canceled, Faulted 等狀態。
  • 線程池:

    • .NET 維護一個全局的工作線程池。
    • TPL 默認使用線程池來執行任務。
    • 優點:避免頻繁創建銷燬線程的開銷,自動管理線程數量(根據負載增減)。
    • 使用 ThreadPool.QueueUserWorkItemTask.Run/Task.Factory.StartNew 提交工作項。

Parallel 核心 API 一覽

API 用途
Parallel.For 並行 for 循環
Parallel.ForEach 並行 foreach
Parallel.Invoke 並行執行多個 Action
ParallelOptions 控制並行度、取消等

Parallel.Invoke:並行執行多個獨立任務

用於一次性並行執行多個無關聯的方法(任務),適合 “多任務並行執行,等待全部完成” 的場景。

語法:

public static void Invoke(params Action[] actions);
public static void Invoke(ParallelOptions options, params Action[] actions);

示例:並行執行三個獨立的 CPU 密集型方法

using System;
using System.Threading.Tasks;

class ParallelInvokeDemo
{
    static void Main()
    {
        // 記錄開始時間
        var watch = System.Diagnostics.Stopwatch.StartNew();

        // 並行執行三個方法
        Parallel.Invoke(
            () => CalculateSum(1, 100000000), // 任務1:計算1~1億的和
            () => CalculatePrimeCount(1, 100000), // 任務2:統計1~10萬的質數數量
            () => GenerateRandomData(1000000) // 任務3:生成100萬條隨機數據
        );

        watch.Stop();
        Console.WriteLine($"並行執行耗時:{watch.ElapsedMilliseconds}ms");

        // 對比:串行執行(耗時遠高於並行)
        watch.Restart();
        CalculateSum(1, 100000000);
        CalculatePrimeCount(1, 100000);
        GenerateRandomData(1000000);
        watch.Stop();
        Console.WriteLine($"串行執行耗時:{watch.ElapsedMilliseconds}ms");
    }

    // 模擬CPU密集型任務1:計算累加和
    static void CalculateSum(int start, int end)
    {
        long sum = 0;
        for (long i = start; i <= end; i++) sum += i;
        Console.WriteLine($"累加和:{sum}");
    }

    // 模擬CPU密集型任務2:統計質數數量
    static void CalculatePrimeCount(int start, int end)
    {
        int count = 0;
        for (int i = start; i <= end; i++)
        {
            if (IsPrime(i)) count++;
        }
        Console.WriteLine($"質數數量:{count}");
    }

    // 模擬CPU密集型任務3:生成隨機數據
    static void GenerateRandomData(int count)
    {
        var random = new Random();
        double[] data = new double[count];
        for (int i = 0; i < count; i++) data[i] = random.NextDouble();
        Console.WriteLine($"隨機數據生成完成,長度:{data.Length}");
    }

    // 輔助方法:判斷是否為質數
    static bool IsPrime(int num)
    {
        if (num < 2) return false;
        for (int i = 2; i <= Math.Sqrt(num); i++)
        {
            if (num % i == 0) return false;
        }
        return true;
    }
}

關鍵説明:

  • Parallel.Invoke 會等待所有傳入的 Action 執行完成後才返回;
  • 方法執行順序不保證(由 TPL 調度),但最終會全部執行;
  • 若其中一個方法拋出異常,其他方法仍會繼續執行,最終所有異常會被包裝為AggregateException 拋出。

Parallel.For:並行執行 for 循環

替代傳統的 for 循環,將循環迭代分配到多個線程並行執行,適合 “固定次數的循環,迭代間無依賴” 的場景。

核心語法:

// 基礎版:從fromInclusive到toExclusive(不包含)的並行循環
public static ParallelLoopResult For(
    int fromInclusive, 
    int toExclusive, 
    Action<int> body
);

// 帶配置版:支持取消、限制並行度
public static ParallelLoopResult For(
    int fromInclusive, 
    int toExclusive, 
    ParallelOptions options, 
    Action<int> body
);

示例:並行累加數組元素(線程安全版)

using System;
using System.Threading;
using System.Threading.Tasks;

class ParallelForDemo
{
    static void Main()
    {
        // 初始化1000萬個元素的數組
        int[] numbers = new int[10_000_000];
        Random random = new Random();
        for (int i = 0; i < numbers.Length; i++) numbers[i] = random.Next(1, 100);

        long total = 0; // 共享累加變量(需保證線程安全)
        var watch = System.Diagnostics.Stopwatch.StartNew();

        // 並行循環累加
        ParallelLoopResult result = Parallel.For(
            0, // 起始索引(包含)
            numbers.Length, // 結束索引(不包含)
            // 循環體:i為當前迭代索引
            (i) => Interlocked.Add(ref total, numbers[i]) // 用Interlocked保證原子累加
        );

        watch.Stop();
        Console.WriteLine($"並行累加結果:{total}");
        Console.WriteLine($"耗時:{watch.ElapsedMilliseconds}ms");
        Console.WriteLine($"循環是否完成:{result.IsCompleted}");
    }
}

關鍵説明:

  • Parallel.For 的迭代索引 i 是線程局部的,無需擔心衝突,但共享變量(如 total )必須保證線程安全(用 Interlocked、lock或ConcurrentBag 等);
  • 返回值 ParallelLoopResult 包含循環執行狀態(IsCompleted:是否全部完成;LowestBreakIteration:是否提前中斷);
  • 迭代間不能有依賴(如第 i 次迭代依賴第 i-1 次的結果),否則會導致結果錯誤。

Parallel.ForEach:並行執行 foreach 循環

替代傳統的 foreach 循環,遍歷 IEnumerable<T> 集合,將元素分配到多個線程並行處理。

核心語法:

// 基礎版:遍歷IEnumerable<T>集合
public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source, 
    Action<TSource> body
);

// 帶索引版:獲取元素的索引
public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source, 
    Action<TSource, ParallelLoopState, long> body
);

示例:並行處理文件列表( CPU 密集型的文件內容解析)

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

class ParallelForEachDemo
{
    static void Main()
    {
        // 獲取指定目錄下的所有文本文件
        string[] files = Directory.GetFiles(@"D:\test", "*.txt");
        // 存儲解析結果(線程安全集合)
        var parseResults = new System.Collections.Concurrent.ConcurrentDictionary<string, int>();

        var watch = System.Diagnostics.Stopwatch.StartNew();

        // 並行遍歷文件列表
        Parallel.ForEach(
            files, // 要遍歷的集合
            (file, state, index) => // 循環體:file=當前文件,state=循環狀態,index=當前索引
            {
                try
                {
                    // 解析文件:統計文件中的數字數量(CPU密集型)
                    int numberCount = CountNumbersInFile(file);
                    // 將結果存入線程安全字典
                    parseResults.TryAdd(file, numberCount);
                    Console.WriteLine($"已處理第{index+1}個文件:{file},數字數量:{numberCount}");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"處理文件{file}失敗:{ex.Message}");
                    // 可選:終止所有迭代
                    // state.Stop();
                }
            }
        );

        watch.Stop();
        Console.WriteLine($"\n全部處理完成,共{parseResults.Count}個文件,耗時:{watch.ElapsedMilliseconds}ms");
    }

    // 模擬CPU密集型任務:統計文件中的數字數量
    static int CountNumbersInFile(string filePath)
    {
        string content = File.ReadAllText(filePath);
        int count = 0;
        foreach (char c in content)
        {
            if (char.IsDigit(c)) count++;
        }
        // 模擬複雜計算(放大CPU消耗)
        for (int i = 0; i < 100000; i++) { Math.Sqrt(i); }
        return count;
    }
}

關鍵説明:

  • ParallelLoopState:用於控制循環( Stop() 終止所有迭代、Break() 終止後續迭代、IsStopped 判斷是否終止);
  • 推薦使用線程安全集合(如 ConcurrentDictionary、ConcurrentBag )存儲並行處理的結果,避免共享集合的線程安全問題;
  • 若集合元素數量少、循環體執行時間極短,並行開銷可能超過收益,此時應使用串行循環。

ParallelOptions:配置並行行為

ParallelOptions 用於自定義並行執行的規則,核心屬性:

MaxDegreeOfParallelism 限制最大並行度(線程數),默認值為-1(自動適配 CPU 核心數),可設置為具體數值(如4表示最多 4 個線程並行)
CancellationToken 取消令牌,用於取消並行執行
TaskScheduler 指定任務調度器(默認使用線程池調度器)

示例:限制並行度 + 取消並行執行

using System;
using System.Threading;
using System.Threading.Tasks;

class ParallelOptionsDemo
{
    static void Main()
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        // 5秒後取消執行
        cts.CancelAfter(5000);

        ParallelOptions options = new ParallelOptions
        {
            MaxDegreeOfParallelism = 4, // 最多4個線程並行
            CancellationToken = cts.Token // 綁定取消令牌
        };

        try
        {
            Parallel.For(
                0,
                1000000,
                options,
                (i) =>
                {
                    // 模擬耗時操作
                    Thread.Sleep(10);
                    if (i % 100000 == 0) Console.WriteLine($"已處理{i}次");
                    // 檢查取消令牌(可選,TPL會自動檢查,但手動檢查更及時)
                    options.CancellationToken.ThrowIfCancellationRequested();
                }
            );
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("並行執行被取消");
        }
        catch (AggregateException ex)
        {
            foreach (var innerEx in ex.InnerExceptions)
            {
                Console.WriteLine($"異常:{innerEx.Message}");
            }
        }
        finally
        {
            cts.Dispose();
        }
    }
}

併發度説明:

  • 默認:≈ CPU 核心數
  • 並不是越大越好
  • 過大 → 線程切換成本上升
CPU 密集型 ≈ 核心數
輕計算 ≈ 核心數 × 1.5

PLINQ(Parallel LINQ)

並行查詢:

var query = from num in data.AsParallel()  // 啓用並行
            where num % 2 == 0
            select num * 2;

var results = query.ToArray();  // 強制執行
  • 選項:WithDegreeOfParallelism(4) 控制並行度;WithExecutionMode(ParallelExecutionMode.ForceParallelism) 強制並行。
  • 有序 vs. 無序:默認無序;用 AsOrdered() 保持順序(性能稍低)。

底層原理:Parallel 的執行機制

Parallel 的高效性源於 TPL 的核心設計:

  • 分區策略:將循環拆分為多個 “分區”(Partitioner),每個分區由一個線程處理,避免單個線程處理過多迭代;
  • 工作竊取算法(Work-Stealing):若某個線程完成自身分區後,會從其他線程的分區中 “竊取” 未處理的迭代,實現負載均衡;
  • 線程池複用:使用線程池的工作線程,避免頻繁創建 / 銷燬線程的開銷;
  • PLINQ 內部:用 ParallelQuery<T> 包裝,查詢樹上附加並行運算符;合併結果時用 Barrier 同步。
  • 性能開銷:啓動線程 ~1ms;適合 >100ms 任務;小任務可能負優化(開銷 > 收益)。
  • 異常聚合:所有迭代拋出的異常會被包裝為 AggregateException,需遍歷InnerExceptions 處理。

Parallel 與線程安全

錯誤示例:共享變量

int sum = 0;

Parallel.For(0, 1000, i =>
{
    sum += i; // ❌ 線程不安全
});

結果:不確定

正確方式一:Interlocked

int sum = 0;

Parallel.For(0, 1000, i =>
{
    Interlocked.Add(ref sum, i);
});

正確方式二:局部變量 + 聚合(推薦)

int sum = 0;

Parallel.For(0, 1000,
    () => 0,
    (i, state, local) => local + i,
    local => Interlocked.Add(ref sum, local)
);

高性能、低競爭

替代方案:

  • 小任務:用 Task.RunParallel LINQ
  • 數據並行:System.Numerics (SIMD)GPU (CUDA.NET)
  • 高併發:Actor 模型 (Akka.NET) 或 Channels

Parallel.ForEachAsync

在受控併發度下,並行執行異步操作

它本質是:

  • async / await 友好
  • 支持併發限制
  • 內置 CancellationToken
  • 自動調度,不用手寫 SemaphoreSlim

基本用法

最簡單示例
await Parallel.ForEachAsync(items, async (item, ct) =>
{
    await ProcessAsync(item, ct);
});
  • 必須 await
  • lambda 參數裏有 CancellationToken
  • 返回 ValueTask
限制併發度
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 5
};

await Parallel.ForEachAsync(items, options, async (item, ct) =>
{
    await CallApiAsync(item, ct);
});

這相當於:

SemaphoreSlim(5) + Task.WhenAll

但 更簡潔、更安全

Parallel.ForEachAsync vs Task.WhenAll

Task.WhenAll(無併發控制)

await Task.WhenAll(items.Select(item =>
    ProcessAsync(item)
));

特點:

  • 一次性創建所有 Task
  • 不限制併發
  • 數量大 → 容易壓垮資源

Parallel.ForEachAsync(有併發控制)

await Parallel.ForEachAsync(items,
    new ParallelOptions { MaxDegreeOfParallelism = 5 },
    async (item, ct) =>
    {
        await ProcessAsync(item, ct);
    });

特點:

  • 滑動窗口式併發
  • 同時最多 N 個任務
  • 更適合:

    • HTTP
    • DB
    • 文件 IO
    • 調用第三方接口

什麼時候選哪個?

場景 推薦
少量任務(<20) Task.WhenAll
大量任務 Parallel.ForEachAsync
需要限流 Parallel.ForEachAsync
CPU 密集 Parallel.For / PLINQ

典型實戰場景

批量調用第三方 API

await Parallel.ForEachAsync(userIds,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async (id, ct) =>
    {
        await apiClient.SyncUserAsync(id, ct);
    });

批量文件處理(IO)

await Parallel.ForEachAsync(files,
    async (file, ct) =>
    {
        var content = await File.ReadAllTextAsync(file, ct);
        await SaveAsync(content, ct);
    });
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.