簡介
SqlBulkCopy 是 .NET 中針對 SQL Server 的高性能批量數據導入類,通過最小化網絡往返和利用 SQL Server 的批量加載機制,實現遠超傳統 INSERT 語句的性能(通常快 10-100 倍)。它通過利用 SQL Server 的批量插入機制(BCP,Bulk Copy Protocol),顯著提高了數據導入的效率,特別適合大數據量場景。
背景和作用
在 .NET 應用中,插入大量數據到 SQL Server 數據庫時,傳統的逐行插入(如通過 EF Core 的 Add 和 SaveChanges)效率低下,容易導致性能瓶頸。SqlBulkCopy 解決了以下問題:
- 高性能批量插入:通過批量操作,減少數據庫往返,提升插入速度。
- 大吞吐量支持:適合導入數千到數百萬行數據。
- 靈活的數據源:支持從
DataTable、DataReader或其他實現IDataReader的數據源導入。 - 事務支持:允許在事務中執行批量插入,確保數據一致性。
安裝與配置
SqlBulkCopy 是 SQL Server 提供的一個類,位於 System.Data.SqlClient 中,所以需要確保在項目中引用了相應的包。
NuGet包
如果項目是 .NET Core 或 .NET 5+,可以使用 Microsoft.Data.SqlClient 包。
Install-Package Microsoft.Data.SqlClient
- 命名空間引用
using Microsoft.Data.SqlClient;
SQL Server環境
需要 SQL Server 數據庫支持 SqlBulkCopy,並且操作需要數據庫連接字符串和相關表結構。
核心功能
| 功能 | 描述 |
|---|---|
SqlBulkCopy.WriteToServer |
將數據從內存寫入到 SQL Server 數據庫 |
SqlBulkCopy.BulkCopyTimeout |
設置執行超時。默認 30 秒,超過會拋出異常 |
SqlBulkCopy.BatchSize |
批量插入的行數。控制一次性提交的行數,提升性能 |
SqlBulkCopy.DestinationTableName |
指定目標數據庫表名 |
SqlBulkCopy.ColumnMappings |
設置源列到目標列的映射關係 |
SqlBulkCopy.NotifyAfter |
設置每處理指定行數時觸發 SqlRowsCopied 事件 |
SqlBulkCopy.SqlRowsCopied |
捕獲批量插入的數據統計信息(如行數) |
主要 API 用法
基本用法
using Microsoft.Data.SqlClient;
public void BulkInsert()
{
// 連接到數據庫
string connectionString = "Your_Connection_String";
using var connection = new SqlConnection(connectionString);
connection.Open();
// 使用 SqlBulkCopy 執行批量插入
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "TargetTable" // 目標表名
};
// 創建一個 DataTable 或 IDataReader
var dataTable = GetDataTable(); // 從數據庫或其他數據源獲取數據
bulkCopy.WriteToServer(dataTable); // 執行插入操作
Console.WriteLine("批量插入成功!");
}
從 IDataReader 插入數據
從另一個數據庫讀取數據並插入:
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string sourceConnString = "Server=source;Database=sourcedb;Trusted_Connection=True;";
string destConnString = "Server=localhost;Database=testdb;Trusted_Connection=True;";
using var sourceConn = new SqlConnection(sourceConnString);
using var destConn = new SqlConnection(destConnString);
sourceConn.Open();
destConn.Open();
using var command = new SqlCommand("SELECT Id, Name FROM SourceUsers", sourceConn);
using var reader = command.ExecuteReader();
using var bulkCopy = new SqlBulkCopy(destConn)
{
DestinationTableName = "Users",
BatchSize = 1000
};
// 列映射(如果列名不匹配)
bulkCopy.ColumnMappings.Add("Id", "UserId");
bulkCopy.ColumnMappings.Add("Name", "UserName");
bulkCopy.WriteToServer(reader);
Console.WriteLine("Data copied from source to destination");
}
}
異步插入
使用異步方法提升性能:
using Microsoft.Data.SqlClient;
using System.Data;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
string connectionString = "Server=localhost;Database=testdb;Trusted_Connection=True;";
var dataTable = new DataTable("Users");
dataTable.Columns.Add("Id", typeof(int));
dataTable.Columns.Add("Name", typeof(string));
for (int i = 1; i <= 10000; i++)
{
dataTable.Rows.Add(i, $"User{i}");
}
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "Users",
BatchSize = 1000
};
await bulkCopy.WriteToServerAsync(dataTable);
Console.WriteLine("Inserted 10,000 rows asynchronously");
}
}
設置映射關係
如果源數據列與目標表列名稱不同,可以通過 ColumnMappings 來映射它們。
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "TargetTable"
};
bulkCopy.ColumnMappings.Add("SourceColumn1", "DestinationColumn1");
bulkCopy.ColumnMappings.Add("SourceColumn2", "DestinationColumn2");
bulkCopy.WriteToServer(dataTable);
設置批量大小與超時
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "TargetTable",
BatchSize = 1000, // 每次提交 1000 行數據
BulkCopyTimeout = 600 // 設置超時時間為 600 秒
};
bulkCopy.WriteToServer(dataTable);
使用事件通知批量插入進度
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "TargetTable",
NotifyAfter = 1000 // 每處理 1000 行數據時觸發事件
};
bulkCopy.SqlRowsCopied += (sender, e) =>
{
Console.WriteLine($"已插入 {e.RowsCopied} 行數據");
};
bulkCopy.WriteToServer(dataTable);
事務與錯誤處理
using (var transaction = connection.BeginTransaction())
{
try
{
bulkCopy.BatchSize = 5000;
bulkCopy.BulkCopyTimeout = 120;
bulkCopy.WriteToServer(data);
transaction.Commit();
}
catch (SqlException ex)
{
transaction.Rollback();
// 處理錯誤(如違反約束)
foreach (SqlError error in ex.Errors)
{
Console.WriteLine($"錯誤: {error.Message}");
}
}
}
性能優化
批量大小(BatchSize)
設置合適的批量大小(BatchSize)可以提高性能。默認情況下,每次插入 10000 行。如果批量大小過大,可能導致內存消耗過高;太小則會影響性能。一般來説,批量大小在 1000-10000 行之間最為合適。
禁用或減少日誌
如果不需要插入時的日誌記錄,可以通過設置 SqlBulkCopyOptions 來禁用:
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepNulls);
禁用外鍵約束
批量插入時,外鍵約束會導致性能下降。如果可以禁用外鍵約束,插入會更快:
ALTER TABLE TargetTable NOCHECK CONSTRAINT ALL;
在插入後,可以重新啓用約束:
ALTER TABLE TargetTable CHECK CONSTRAINT ALL;
關閉索引
插入大量數據時,可以暫時禁用索引,插入完成後再重建索引,性能將顯著提高。
ALTER INDEX ALL ON TargetTable DISABLE;
-- 插入數據
ALTER INDEX ALL ON TargetTable REBUILD;
使用示例
示例 1:將 List<T> 數據批量插入到數據庫
public class Employee
{
public int Id { get; set; }
public string Name { get; set; }
public string Department { get; set; }
}
public void BulkInsertEmployees(List<Employee> employees)
{
using var connection = new SqlConnection("Your_Connection_String");
connection.Open();
// 將 List 轉為 DataTable
var dataTable = ConvertToDataTable(employees);
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "Employees",
BatchSize = 5000,
NotifyAfter = 1000
};
bulkCopy.SqlRowsCopied += (sender, e) =>
{
Console.WriteLine($"已插入 {e.RowsCopied} 行數據");
};
bulkCopy.WriteToServer(dataTable);
}
public DataTable ConvertToDataTable(List<Employee> employees)
{
var table = new DataTable();
table.Columns.Add("Id", typeof(int));
table.Columns.Add("Name", typeof(string));
table.Columns.Add("Department", typeof(string));
foreach (var employee in employees)
{
table.Rows.Add(employee.Id, employee.Name, employee.Department);
}
return table;
}
示例 2:處理 CSV 文件批量插入
public void BulkInsertFromCsv(string filePath)
{
using var connection = new SqlConnection("Your_Connection_String");
connection.Open();
var dataTable = new DataTable();
dataTable.Columns.Add("Id", typeof(int));
dataTable.Columns.Add("Name", typeof(string));
dataTable.Columns.Add("Age", typeof(int));
var lines = File.ReadLines(filePath);
foreach (var line in lines.Skip(1)) // 假設第一行是表頭
{
var values = line.Split(',');
dataTable.Rows.Add(int.Parse(values[0]), values[1], int.Parse(values[2]));
}
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = "People",
BatchSize = 5000
};
bulkCopy.WriteToServer(dataTable);
}
使用場景與限制
理想應用場景
- 數據遷移:從舊系統導入歷史數據
ETL處理:數據倉庫定期加載- 實時數據流:傳感器/
IoT設備批量上傳 - 報表生成:預計算數據批量存儲
- 緩存預熱:初始化內存數據庫
功能限制
- 僅限
SQL Server:不直接支持其他數據庫 - 無更新能力:僅插入新數據(需配合臨時表實現更新)
- 觸發器影響:默認觸發插入觸發器(可通過選項控制)
- 標識列處理:需顯式設置
KeepIdentity選項
資源和文檔
-
官方文檔:
Microsoft Learn:https://learn.microsoft.com/en-us/dotnet/api/microsoft.data.s...Microsoft.Data.SqlClient:https://learn.microsoft.com/en-us/dotnet/api/microsoft.data.s...
NuGet包:https://www.nuget.org/packages/Microsoft.Data.SqlClientGitHub:https://github.com/dotnet/SqlClient