簡介
Parallel 並行編程是 .NET 中利用多核 CPU 進行併發執行的編程模型,主要通過 System.Threading.Tasks 命名空間中的 Parallel 類實現。它允許將任務分解成多個子任務,在多個線程上同時執行,以加速 CPU 密集型操作(如循環計算、數據處理)。
核心組件:
Parallel類:提供靜態方法如Parallel.For、Parallel.ForEach、Parallel.Invoke,用於並行執行循環或方法。PLINQ(Parallel LINQ):LINQ的並行版本,通過AsParallel()擴展方法啓用並行查詢。- 底層依賴:基於 Task Parallel Library (TPL),利用線程池(
ThreadPool)管理線程,避免手動創建線程的開銷。
關鍵概念:
- 並行度(
Degree of Parallelism):控制併發線程數,默認基於 CPU 核心(e.g., 4 核 CPU 可能 4 線程)。 - 數據並行:將數據分成塊(
chunks),每個線程處理一塊(e.g.,Parallel.For)。 - 任務並行:同時執行獨立方法(e.g.,
Parallel.Invoke)。
Parallel 的核心定位與價值
Parallel 類位於 System.Threading.Tasks 命名空間,是 .NET 提供的 “高層級並行工具”,核心價值:
- 自動線程調度:無需手動創建 / 管理線程,由
TPL自動分配線程池線程,實現負載均衡; - 簡化並行邏輯:用類似串行循環的語法實現並行執行,降低並行編程門檻;
- 適配多核
CPU:默認根據CPU核心數調整並行度,最大化利用硬件資源; - 支持取消 / 超時:可通過
ParallelOptions控制並行過程,如取消執行、限制並行數。
Parallel僅適合 CPU 密集型任務(如數據計算、圖像處理、複雜邏輯運算);I/O 密集型任務(文件讀寫、網絡請求)應使用async/await,否則線程會阻塞,浪費資源。
核心概念與基礎
並行 vs. 併發 vs. 異步:
- 併發: 多個任務在重疊的時間段內執行(不一定同時)。單核上通過時間片切換實現。
- 並行: 多個任務真正同時執行,需要多核或多處理器支持。是併發的一種特例。
- 異步: 一種編程模式,允許啓動一個操作後不阻塞當前線程,待操作完成後再處理結果。異步操作可以利用併發或並行(通常通過線程池),但其核心目標是非阻塞和響應性。
線程 vs. 任務:
- 線程: 操作系統調度的基本單位。直接操作
Thread類相對底層,需要手動管理生命週期、同步等,易出錯且開銷較大。 -
任務:
TPL引入的核心概念Task和Task<TResult>。代表一個異步操作單元。任務通常由線程池線程執行,但提供了更高級別的抽象:- 任務調度: 由
TaskScheduler管理,默認使用線程池。 - 組合與延續: 使用
ContinueWith, WhenAll, WhenAny等輕鬆組合任務。 - 異常傳播: 異常被封裝在
AggregateException中,便於統一處理。 - 取消支持: 與
CancellationTokenSource/CancellationToken集成。 - 狀態跟蹤: 有
Created, Running, RanToCompletion, Canceled, Faulted等狀態。
- 任務調度: 由
-
線程池:
.NET維護一個全局的工作線程池。TPL默認使用線程池來執行任務。- 優點:避免頻繁創建銷燬線程的開銷,自動管理線程數量(根據負載增減)。
- 使用
ThreadPool.QueueUserWorkItem或Task.Run/Task.Factory.StartNew提交工作項。
Parallel 核心 API 一覽
| API | 用途 |
|---|---|
| Parallel.For | 並行 for 循環 |
| Parallel.ForEach | 並行 foreach |
| Parallel.Invoke | 並行執行多個 Action |
| ParallelOptions | 控制並行度、取消等 |
Parallel.Invoke:並行執行多個獨立任務
用於一次性並行執行多個無關聯的方法(任務),適合 “多任務並行執行,等待全部完成” 的場景。
語法:
public static void Invoke(params Action[] actions);
public static void Invoke(ParallelOptions options, params Action[] actions);
示例:並行執行三個獨立的 CPU 密集型方法
using System;
using System.Threading.Tasks;
class ParallelInvokeDemo
{
static void Main()
{
// 記錄開始時間
var watch = System.Diagnostics.Stopwatch.StartNew();
// 並行執行三個方法
Parallel.Invoke(
() => CalculateSum(1, 100000000), // 任務1:計算1~1億的和
() => CalculatePrimeCount(1, 100000), // 任務2:統計1~10萬的質數數量
() => GenerateRandomData(1000000) // 任務3:生成100萬條隨機數據
);
watch.Stop();
Console.WriteLine($"並行執行耗時:{watch.ElapsedMilliseconds}ms");
// 對比:串行執行(耗時遠高於並行)
watch.Restart();
CalculateSum(1, 100000000);
CalculatePrimeCount(1, 100000);
GenerateRandomData(1000000);
watch.Stop();
Console.WriteLine($"串行執行耗時:{watch.ElapsedMilliseconds}ms");
}
// 模擬CPU密集型任務1:計算累加和
static void CalculateSum(int start, int end)
{
long sum = 0;
for (long i = start; i <= end; i++) sum += i;
Console.WriteLine($"累加和:{sum}");
}
// 模擬CPU密集型任務2:統計質數數量
static void CalculatePrimeCount(int start, int end)
{
int count = 0;
for (int i = start; i <= end; i++)
{
if (IsPrime(i)) count++;
}
Console.WriteLine($"質數數量:{count}");
}
// 模擬CPU密集型任務3:生成隨機數據
static void GenerateRandomData(int count)
{
var random = new Random();
double[] data = new double[count];
for (int i = 0; i < count; i++) data[i] = random.NextDouble();
Console.WriteLine($"隨機數據生成完成,長度:{data.Length}");
}
// 輔助方法:判斷是否為質數
static bool IsPrime(int num)
{
if (num < 2) return false;
for (int i = 2; i <= Math.Sqrt(num); i++)
{
if (num % i == 0) return false;
}
return true;
}
}
關鍵説明:
Parallel.Invoke會等待所有傳入的Action執行完成後才返回;- 方法執行順序不保證(由
TPL調度),但最終會全部執行; - 若其中一個方法拋出異常,其他方法仍會繼續執行,最終所有異常會被包裝為
AggregateException拋出。
Parallel.For:並行執行 for 循環
替代傳統的 for 循環,將循環迭代分配到多個線程並行執行,適合 “固定次數的循環,迭代間無依賴” 的場景。
核心語法:
// 基礎版:從fromInclusive到toExclusive(不包含)的並行循環
public static ParallelLoopResult For(
int fromInclusive,
int toExclusive,
Action<int> body
);
// 帶配置版:支持取消、限制並行度
public static ParallelLoopResult For(
int fromInclusive,
int toExclusive,
ParallelOptions options,
Action<int> body
);
示例:並行累加數組元素(線程安全版)
using System;
using System.Threading;
using System.Threading.Tasks;
class ParallelForDemo
{
static void Main()
{
// 初始化1000萬個元素的數組
int[] numbers = new int[10_000_000];
Random random = new Random();
for (int i = 0; i < numbers.Length; i++) numbers[i] = random.Next(1, 100);
long total = 0; // 共享累加變量(需保證線程安全)
var watch = System.Diagnostics.Stopwatch.StartNew();
// 並行循環累加
ParallelLoopResult result = Parallel.For(
0, // 起始索引(包含)
numbers.Length, // 結束索引(不包含)
// 循環體:i為當前迭代索引
(i) => Interlocked.Add(ref total, numbers[i]) // 用Interlocked保證原子累加
);
watch.Stop();
Console.WriteLine($"並行累加結果:{total}");
Console.WriteLine($"耗時:{watch.ElapsedMilliseconds}ms");
Console.WriteLine($"循環是否完成:{result.IsCompleted}");
}
}
關鍵説明:
Parallel.For的迭代索引i是線程局部的,無需擔心衝突,但共享變量(如total)必須保證線程安全(用Interlocked、lock或ConcurrentBag等);- 返回值
ParallelLoopResult包含循環執行狀態(IsCompleted:是否全部完成;LowestBreakIteration:是否提前中斷); - 迭代間不能有依賴(如第
i次迭代依賴第i-1次的結果),否則會導致結果錯誤。
Parallel.ForEach:並行執行 foreach 循環
替代傳統的 foreach 循環,遍歷 IEnumerable<T> 集合,將元素分配到多個線程並行處理。
核心語法:
// 基礎版:遍歷IEnumerable<T>集合
public static ParallelLoopResult ForEach<TSource>(
IEnumerable<TSource> source,
Action<TSource> body
);
// 帶索引版:獲取元素的索引
public static ParallelLoopResult ForEach<TSource>(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState, long> body
);
示例:並行處理文件列表( CPU 密集型的文件內容解析)
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
class ParallelForEachDemo
{
static void Main()
{
// 獲取指定目錄下的所有文本文件
string[] files = Directory.GetFiles(@"D:\test", "*.txt");
// 存儲解析結果(線程安全集合)
var parseResults = new System.Collections.Concurrent.ConcurrentDictionary<string, int>();
var watch = System.Diagnostics.Stopwatch.StartNew();
// 並行遍歷文件列表
Parallel.ForEach(
files, // 要遍歷的集合
(file, state, index) => // 循環體:file=當前文件,state=循環狀態,index=當前索引
{
try
{
// 解析文件:統計文件中的數字數量(CPU密集型)
int numberCount = CountNumbersInFile(file);
// 將結果存入線程安全字典
parseResults.TryAdd(file, numberCount);
Console.WriteLine($"已處理第{index+1}個文件:{file},數字數量:{numberCount}");
}
catch (Exception ex)
{
Console.WriteLine($"處理文件{file}失敗:{ex.Message}");
// 可選:終止所有迭代
// state.Stop();
}
}
);
watch.Stop();
Console.WriteLine($"\n全部處理完成,共{parseResults.Count}個文件,耗時:{watch.ElapsedMilliseconds}ms");
}
// 模擬CPU密集型任務:統計文件中的數字數量
static int CountNumbersInFile(string filePath)
{
string content = File.ReadAllText(filePath);
int count = 0;
foreach (char c in content)
{
if (char.IsDigit(c)) count++;
}
// 模擬複雜計算(放大CPU消耗)
for (int i = 0; i < 100000; i++) { Math.Sqrt(i); }
return count;
}
}
關鍵説明:
ParallelLoopState:用於控制循環(Stop()終止所有迭代、Break()終止後續迭代、IsStopped判斷是否終止);- 推薦使用線程安全集合(如
ConcurrentDictionary、ConcurrentBag)存儲並行處理的結果,避免共享集合的線程安全問題; - 若集合元素數量少、循環體執行時間極短,並行開銷可能超過收益,此時應使用串行循環。
ParallelOptions:配置並行行為
ParallelOptions 用於自定義並行執行的規則,核心屬性:
MaxDegreeOfParallelism |
限制最大並行度(線程數),默認值為-1(自動適配 CPU 核心數),可設置為具體數值(如4表示最多 4 個線程並行) |
|---|---|
CancellationToken |
取消令牌,用於取消並行執行 |
TaskScheduler |
指定任務調度器(默認使用線程池調度器) |
示例:限制並行度 + 取消並行執行
using System;
using System.Threading;
using System.Threading.Tasks;
class ParallelOptionsDemo
{
static void Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
// 5秒後取消執行
cts.CancelAfter(5000);
ParallelOptions options = new ParallelOptions
{
MaxDegreeOfParallelism = 4, // 最多4個線程並行
CancellationToken = cts.Token // 綁定取消令牌
};
try
{
Parallel.For(
0,
1000000,
options,
(i) =>
{
// 模擬耗時操作
Thread.Sleep(10);
if (i % 100000 == 0) Console.WriteLine($"已處理{i}次");
// 檢查取消令牌(可選,TPL會自動檢查,但手動檢查更及時)
options.CancellationToken.ThrowIfCancellationRequested();
}
);
}
catch (OperationCanceledException)
{
Console.WriteLine("並行執行被取消");
}
catch (AggregateException ex)
{
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"異常:{innerEx.Message}");
}
}
finally
{
cts.Dispose();
}
}
}
併發度説明:
- 默認:≈ CPU 核心數
- 並不是越大越好
- 過大 → 線程切換成本上升
CPU 密集型 ≈ 核心數
輕計算 ≈ 核心數 × 1.5
PLINQ(Parallel LINQ)
並行查詢:
var query = from num in data.AsParallel() // 啓用並行
where num % 2 == 0
select num * 2;
var results = query.ToArray(); // 強制執行
- 選項:
WithDegreeOfParallelism(4)控制並行度;WithExecutionMode(ParallelExecutionMode.ForceParallelism)強制並行。 - 有序 vs. 無序:默認無序;用
AsOrdered()保持順序(性能稍低)。
底層原理:Parallel 的執行機制
Parallel 的高效性源於 TPL 的核心設計:
- 分區策略:將循環拆分為多個 “分區”(
Partitioner),每個分區由一個線程處理,避免單個線程處理過多迭代; - 工作竊取算法(
Work-Stealing):若某個線程完成自身分區後,會從其他線程的分區中 “竊取” 未處理的迭代,實現負載均衡; - 線程池複用:使用線程池的工作線程,避免頻繁創建 / 銷燬線程的開銷;
PLINQ內部:用ParallelQuery<T>包裝,查詢樹上附加並行運算符;合併結果時用Barrier同步。- 性能開銷:啓動線程 ~1ms;適合 >100ms 任務;小任務可能負優化(開銷 > 收益)。
- 異常聚合:所有迭代拋出的異常會被包裝為
AggregateException,需遍歷InnerExceptions處理。
Parallel 與線程安全
錯誤示例:共享變量
int sum = 0;
Parallel.For(0, 1000, i =>
{
sum += i; // ❌ 線程不安全
});
結果:不確定
正確方式一:Interlocked
int sum = 0;
Parallel.For(0, 1000, i =>
{
Interlocked.Add(ref sum, i);
});
正確方式二:局部變量 + 聚合(推薦)
int sum = 0;
Parallel.For(0, 1000,
() => 0,
(i, state, local) => local + i,
local => Interlocked.Add(ref sum, local)
);
高性能、低競爭
替代方案:
- 小任務:用
Task.Run或Parallel LINQ。 - 數據並行:
System.Numerics (SIMD)或GPU (CUDA.NET)。 - 高併發:
Actor模型 (Akka.NET) 或Channels。
Parallel.ForEachAsync
在受控併發度下,並行執行異步操作
它本質是:
async / await友好- 支持併發限制
- 內置
CancellationToken - 自動調度,不用手寫
SemaphoreSlim
基本用法
最簡單示例
await Parallel.ForEachAsync(items, async (item, ct) =>
{
await ProcessAsync(item, ct);
});
- 必須
await lambda參數裏有CancellationToken- 返回
ValueTask
限制併發度
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 5
};
await Parallel.ForEachAsync(items, options, async (item, ct) =>
{
await CallApiAsync(item, ct);
});
這相當於:
SemaphoreSlim(5) + Task.WhenAll
但 更簡潔、更安全
Parallel.ForEachAsync vs Task.WhenAll
Task.WhenAll(無併發控制)
await Task.WhenAll(items.Select(item =>
ProcessAsync(item)
));
特點:
- 一次性創建所有
Task - 不限制併發
- 數量大 → 容易壓垮資源
Parallel.ForEachAsync(有併發控制)
await Parallel.ForEachAsync(items,
new ParallelOptions { MaxDegreeOfParallelism = 5 },
async (item, ct) =>
{
await ProcessAsync(item, ct);
});
特點:
- 滑動窗口式併發
- 同時最多
N個任務 -
更適合:
- HTTP
- DB
- 文件 IO
- 調用第三方接口
什麼時候選哪個?
| 場景 | 推薦 |
|---|---|
| 少量任務(<20) | Task.WhenAll |
| 大量任務 | Parallel.ForEachAsync |
| 需要限流 | Parallel.ForEachAsync |
| CPU 密集 | Parallel.For / PLINQ |
典型實戰場景
批量調用第三方 API
await Parallel.ForEachAsync(userIds,
new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (id, ct) =>
{
await apiClient.SyncUserAsync(id, ct);
});
批量文件處理(IO)
await Parallel.ForEachAsync(files,
async (file, ct) =>
{
var content = await File.ReadAllTextAsync(file, ct);
await SaveAsync(content, ct);
});