动态

详情 返回 返回

循環插入太慢?試試 C#.NET SqlBulkCopy,一次導入上百萬數據 - 动态 详情

簡介

SqlBulkCopy.NET 中針對 SQL Server 的高性能批量數據導入類,通過最小化網絡往返和利用 SQL Server 的批量加載機制,實現遠超傳統 INSERT 語句的性能(通常快 10-100 倍)。它通過利用 SQL Server 的批量插入機制(BCP,Bulk Copy Protocol),顯著提高了數據導入的效率,特別適合大數據量場景。

背景和作用

.NET 應用中,插入大量數據到 SQL Server 數據庫時,傳統的逐行插入(如通過 EF CoreAddSaveChanges)效率低下,容易導致性能瓶頸。SqlBulkCopy 解決了以下問題:

  • 高性能批量插入:通過批量操作,減少數據庫往返,提升插入速度。
  • 大吞吐量支持:適合導入數千到數百萬行數據。
  • 靈活的數據源:支持從 DataTable、DataReader 或其他實現 IDataReader 的數據源導入。
  • 事務支持:允許在事務中執行批量插入,確保數據一致性。

安裝與配置

SqlBulkCopySQL Server 提供的一個類,位於 System.Data.SqlClient 中,所以需要確保在項目中引用了相應的包。

  • NuGet

如果項目是 .NET Core.NET 5+,可以使用 Microsoft.Data.SqlClient 包。

Install-Package Microsoft.Data.SqlClient
  • 命名空間引用
using Microsoft.Data.SqlClient;
  • SQL Server 環境

需要 SQL Server 數據庫支持 SqlBulkCopy,並且操作需要數據庫連接字符串和相關表結構。

核心功能

功能 描述
SqlBulkCopy.WriteToServer 將數據從內存寫入到 SQL Server 數據庫
SqlBulkCopy.BulkCopyTimeout 設置執行超時。默認 30 秒,超過會拋出異常
SqlBulkCopy.BatchSize 批量插入的行數。控制一次性提交的行數,提升性能
SqlBulkCopy.DestinationTableName 指定目標數據庫表名
SqlBulkCopy.ColumnMappings 設置源列到目標列的映射關係
SqlBulkCopy.NotifyAfter 設置每處理指定行數時觸發 SqlRowsCopied 事件
SqlBulkCopy.SqlRowsCopied 捕獲批量插入的數據統計信息(如行數)

主要 API 用法

基本用法

using Microsoft.Data.SqlClient;

public void BulkInsert()
{
    // 連接到數據庫
    string connectionString = "Your_Connection_String";
    using var connection = new SqlConnection(connectionString);
    connection.Open();

    // 使用 SqlBulkCopy 執行批量插入
    using var bulkCopy = new SqlBulkCopy(connection)
    {
        DestinationTableName = "TargetTable"  // 目標表名
    };

    // 創建一個 DataTable 或 IDataReader
    var dataTable = GetDataTable(); // 從數據庫或其他數據源獲取數據
    bulkCopy.WriteToServer(dataTable); // 執行插入操作

    Console.WriteLine("批量插入成功!");
}

從 IDataReader 插入數據

從另一個數據庫讀取數據並插入:

using Microsoft.Data.SqlClient;

class Program
{
    static void Main()
    {
        string sourceConnString = "Server=source;Database=sourcedb;Trusted_Connection=True;";
        string destConnString = "Server=localhost;Database=testdb;Trusted_Connection=True;";

        using var sourceConn = new SqlConnection(sourceConnString);
        using var destConn = new SqlConnection(destConnString);
        sourceConn.Open();
        destConn.Open();

        using var command = new SqlCommand("SELECT Id, Name FROM SourceUsers", sourceConn);
        using var reader = command.ExecuteReader();

        using var bulkCopy = new SqlBulkCopy(destConn)
        {
            DestinationTableName = "Users",
            BatchSize = 1000
        };

        // 列映射(如果列名不匹配)
        bulkCopy.ColumnMappings.Add("Id", "UserId");
        bulkCopy.ColumnMappings.Add("Name", "UserName");

        bulkCopy.WriteToServer(reader);
        Console.WriteLine("Data copied from source to destination");
    }
}

異步插入

使用異步方法提升性能:

using Microsoft.Data.SqlClient;
using System.Data;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        string connectionString = "Server=localhost;Database=testdb;Trusted_Connection=True;";
        var dataTable = new DataTable("Users");
        dataTable.Columns.Add("Id", typeof(int));
        dataTable.Columns.Add("Name", typeof(string));

        for (int i = 1; i <= 10000; i++)
        {
            dataTable.Rows.Add(i, $"User{i}");
        }

        using var connection = new SqlConnection(connectionString);
        await connection.OpenAsync();

        using var bulkCopy = new SqlBulkCopy(connection)
        {
            DestinationTableName = "Users",
            BatchSize = 1000
        };

        await bulkCopy.WriteToServerAsync(dataTable);
        Console.WriteLine("Inserted 10,000 rows asynchronously");
    }
}

設置映射關係

如果源數據列與目標表列名稱不同,可以通過 ColumnMappings 來映射它們。

using var bulkCopy = new SqlBulkCopy(connection)
{
    DestinationTableName = "TargetTable"
};
bulkCopy.ColumnMappings.Add("SourceColumn1", "DestinationColumn1");
bulkCopy.ColumnMappings.Add("SourceColumn2", "DestinationColumn2");

bulkCopy.WriteToServer(dataTable);

設置批量大小與超時

using var bulkCopy = new SqlBulkCopy(connection)
{
    DestinationTableName = "TargetTable",
    BatchSize = 1000,          // 每次提交 1000 行數據
    BulkCopyTimeout = 600      // 設置超時時間為 600 秒
};
bulkCopy.WriteToServer(dataTable);

使用事件通知批量插入進度

using var bulkCopy = new SqlBulkCopy(connection)
{
    DestinationTableName = "TargetTable",
    NotifyAfter = 1000       // 每處理 1000 行數據時觸發事件
};

bulkCopy.SqlRowsCopied += (sender, e) =>
{
    Console.WriteLine($"已插入 {e.RowsCopied} 行數據");
};

bulkCopy.WriteToServer(dataTable);

事務與錯誤處理

using (var transaction = connection.BeginTransaction())
{
    try
    {
        bulkCopy.BatchSize = 5000;
        bulkCopy.BulkCopyTimeout = 120;
        bulkCopy.WriteToServer(data);
        transaction.Commit();
    }
    catch (SqlException ex)
    {
        transaction.Rollback();
        // 處理錯誤(如違反約束)
        foreach (SqlError error in ex.Errors)
        {
            Console.WriteLine($"錯誤: {error.Message}");
        }
    }
}

性能優化

批量大小(BatchSize)

設置合適的批量大小(BatchSize)可以提高性能。默認情況下,每次插入 10000 行。如果批量大小過大,可能導致內存消耗過高;太小則會影響性能。一般來説,批量大小在 1000-10000 行之間最為合適。

禁用或減少日誌

如果不需要插入時的日誌記錄,可以通過設置 SqlBulkCopyOptions 來禁用:

using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepNulls);

禁用外鍵約束

批量插入時,外鍵約束會導致性能下降。如果可以禁用外鍵約束,插入會更快:

ALTER TABLE TargetTable NOCHECK CONSTRAINT ALL;

在插入後,可以重新啓用約束:

ALTER TABLE TargetTable CHECK CONSTRAINT ALL;

關閉索引

插入大量數據時,可以暫時禁用索引,插入完成後再重建索引,性能將顯著提高。

ALTER INDEX ALL ON TargetTable DISABLE;
-- 插入數據
ALTER INDEX ALL ON TargetTable REBUILD;

使用示例

示例 1:將 List<T> 數據批量插入到數據庫

public class Employee
{
    public int Id { get; set; }
    public string Name { get; set; }
    public string Department { get; set; }
}

public void BulkInsertEmployees(List<Employee> employees)
{
    using var connection = new SqlConnection("Your_Connection_String");
    connection.Open();

    // 將 List 轉為 DataTable
    var dataTable = ConvertToDataTable(employees);

    using var bulkCopy = new SqlBulkCopy(connection)
    {
        DestinationTableName = "Employees",
        BatchSize = 5000,
        NotifyAfter = 1000
    };

    bulkCopy.SqlRowsCopied += (sender, e) =>
    {
        Console.WriteLine($"已插入 {e.RowsCopied} 行數據");
    };

    bulkCopy.WriteToServer(dataTable);
}

public DataTable ConvertToDataTable(List<Employee> employees)
{
    var table = new DataTable();
    table.Columns.Add("Id", typeof(int));
    table.Columns.Add("Name", typeof(string));
    table.Columns.Add("Department", typeof(string));

    foreach (var employee in employees)
    {
        table.Rows.Add(employee.Id, employee.Name, employee.Department);
    }

    return table;
}

示例 2:處理 CSV 文件批量插入

public void BulkInsertFromCsv(string filePath)
{
    using var connection = new SqlConnection("Your_Connection_String");
    connection.Open();

    var dataTable = new DataTable();
    dataTable.Columns.Add("Id", typeof(int));
    dataTable.Columns.Add("Name", typeof(string));
    dataTable.Columns.Add("Age", typeof(int));

    var lines = File.ReadLines(filePath);
    foreach (var line in lines.Skip(1))  // 假設第一行是表頭
    {
        var values = line.Split(',');
        dataTable.Rows.Add(int.Parse(values[0]), values[1], int.Parse(values[2]));
    }

    using var bulkCopy = new SqlBulkCopy(connection)
    {
        DestinationTableName = "People",
        BatchSize = 5000
    };

    bulkCopy.WriteToServer(dataTable);
}

使用場景與限制

理想應用場景

  • 數據遷移:從舊系統導入歷史數據
  • ETL 處理:數據倉庫定期加載
  • 實時數據流:傳感器/IoT 設備批量上傳
  • 報表生成:預計算數據批量存儲
  • 緩存預熱:初始化內存數據庫

功能限制

  • 僅限 SQL Server:不直接支持其他數據庫
  • 無更新能力:僅插入新數據(需配合臨時表實現更新)
  • 觸發器影響:默認觸發插入觸發器(可通過選項控制)
  • 標識列處理:需顯式設置 KeepIdentity 選項

資源和文檔

  • 官方文檔:

    • Microsoft Learn:https://learn.microsoft.com/en-us/dotnet/api/microsoft.data.s...
    • Microsoft.Data.SqlClient:https://learn.microsoft.com/en-us/dotnet/api/microsoft.data.s...
  • NuGet 包:https://www.nuget.org/packages/Microsoft.Data.SqlClient
  • GitHub:https://github.com/dotnet/SqlClient
user avatar Javaer1995 头像 jihu_gitlab 头像 xujianguo_5fd496321fcf9 头像 null_null_null 头像
点赞 4 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.