65.9K
CodeProject 正在变化。 阅读更多。
Home

使用SqlBulkCopy和IDataReader进行高性能插入

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.84/5 (16投票s)

2016年4月27日

CPOL

5分钟阅读

viewsIcon

46002

downloadIcon

1429

使用 SqlBulkCopy 和两种 IDataReader 实现,用于将静态和动态数据结构插入 SQL Server 表。

引言

最终,`SqlBulkCopy` 可以有三种使用方式:插入表示为 `DataTable` 对象、`DataRow` 对象数组或 `IDataReader` 实例的数据。在本文中,我将演示两种 `IDataReader` 接口的实现,它们与 `SqlBulkCopy` 结合使用以实现高性能数据库插入。其他两种选项彼此相似,可用于相对少量的数据,因为它们要求在将所有记录交给 `SqlBulkCopy` 之前将其全部加载到内存中。相比之下,`IDataReader` 方法更灵活,允许在“惰性”模式下处理无限数量的记录,这意味着数据可以根据服务器的消耗速度即时提供给 `SqlBulkCopy`。这类似于 `IList` 与 `IEnumerable` 的方法。

使用演示

附加的演示项目包含一个预编译的控制台应用程序,其中包含配置文件和 `Data` 子文件夹,里面有示例 `CSV` 文件。在运行演示之前,请确保调整配置文件,指定名为“`DefaultDb`”的正确连接字符串。另一个设置“`MaxRecordCount`”默认等于 `100,000`,这对于此演示来说应该足够了。请注意,连接字符串可以指向任何现有数据库。所有演示表都将自动创建,因此无需手动设置数据库。

启动演示后,它会在控制台窗口中提示您按 `Enter` 键,然后初始化数据库,并在执行每个演示操作之前再次提示。我的机器上完成的演示如下所示:

首先,应用程序将尝试初始化数据库。它将创建(或重新创建)三个表 - 每个演示操作一个。

  1. `Contacts` 表,包含 `Id`、`FirstName`、`LastName` 和 `BirthDate` 列。
  2. `DynamicData` 表,包含一个 `Id`、10 个 `integer`、10 个 `string`、10 个 `datetime` 和 10 个 `guid` 列。
  3. `CsvData` 表,其结构与 `DynamicData` 相同。

然后应用程序将执行三个演示操作,并测量每个操作的时间。

  1. `Static Dataset Demo` 演示了 `ObjectDataReader`,它允许处理任何 `POCO` 类的实例(在本例中为 `Contact` 类)。
  2. `Dynamic Dataset Demo` 演示了 `DynamicDataReader`,它也实现了 `IDataReader`,但允许用户通过用户定义的 lambda 表达式决定如何从 `T` 的底层对象中提取数据。在此演示中,我使用 `IDictionary` 来表示数据。
  3. `CSV Import Demo` 利用 `CsvParser` 类和上述 `DynamicDataReader` 来高效地将附加的“Data\CsvData.csv”文件加载到数据库中。
`CsvParser` 的实现我在我另一篇文章 这里 中进行了描述。

前两个演示的数据是使用辅助类 `RandomDataGenerator` 动态随机生成的。另一个辅助类 `TableSchemaProvider` 用于从 SQL Server 提取一些元数据并执行一些实用 SQL 命令。

ObjectDataReader<T>

如下所示,`ObjectDataReader` 在其构造函数中接受 `IEnumerable`,它代表由 `SqlBulkCopy` 类使用的实际数据流。重要的是要注意,`GetOrdinal()` 和 `GetValue()` 方法不会在每次需要访问 `T` 的属性时都使用反射。相反,它们使用预编译和缓存的 lambda 表达式,这些表达式充当属性访问器和查找器。这些预编译的 lambda 表达式比使用反射快很多倍。

public sealed class ObjectDataReader<TData> : IDataReader
{
    private class PropertyAccessor
    {
        public List<Func<TData, object>> Accessors { get; set; }
        public Dictionary<string, int> Lookup { get; set; }
    }

    private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
        new Lazy<PropertyAccessor>(() =>
    {
        var propertyAccessors = typeof(TData)
            .GetProperties(BindingFlags.Instance | BindingFlags.Public)
            .Where(p => p.CanRead)
            .Select((p, i) => new
            {
                Index = i,
                Property = p,
                Accessor = CreatePropertyAccessor(p)
            })
            .ToArray();

        return new PropertyAccessor
        {
            Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
            Lookup = propertyAccessors.ToDictionary(
                p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase)
        };
    });

    private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
    {
        var parameter = Expression.Parameter(typeof(TData), "input");
        var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
        var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
        var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
        return lamda.Compile();
    }

    private IEnumerator<TData> m_dataEnumerator;

    public ObjectDataReader(IEnumerable<TData> data)
    {
        m_dataEnumerator = data.GetEnumerator();
    }

    #region IDataReader Members

    public void Close()
    {
        Dispose();
    }

    public int Depth => 1;

    public DataTable GetSchemaTable()
    {
        return null;
    }

    public bool IsClosed => m_dataEnumerator == null;

    public bool NextResult()
    {
        return false;
    }

    public bool Read()
    {
        if (IsClosed)
            throw new ObjectDisposedException(GetType().Name);
        return m_dataEnumerator.MoveNext();
    }

    public int RecordsAffected => -1;

    #endregion

    // IDisposable Members

    #region IDataRecord Members

    public int GetOrdinal(string name)
    {
        int ordinal;
        if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
            throw new InvalidOperationException("Unknown parameter name: " + name);
        return ordinal;
    }

    public object GetValue(int i)
    {
        if (m_dataEnumerator == null)
            throw new ObjectDisposedException(GetType().Name);
        return s_propertyAccessorCache.Value.Accessors[i](m_dataEnumerator.Current);
    }

    public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;

    // Not Implemented IDataRecord Members ...
        
    #endregion
}

一旦实现了 `ObjectDataReader`,我们就可以像这样将其集成到 `SqlBulkCopy` 中:

private static async Task RunStaticDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    using (var bulkCopy = new SqlBulkCopy(connection))
    {
        bulkCopy.DestinationTableName = "Contacts";
        bulkCopy.BatchSize = 1000;
        bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

        bulkCopy.ColumnMappings.Add("Id", "Id");
        bulkCopy.ColumnMappings.Add("FirstName", "FirstName");
        bulkCopy.ColumnMappings.Add("LastName", "LastName");
        bulkCopy.ColumnMappings.Add("BirthDate", "BirthDate");

        using (var reader = new ObjectDataReader<Contact>(new RandomDataGenerator().GetContacts(count)))
            await bulkCopy.WriteToServerAsync(reader, cancellationToken);
    }
}

DynamicDataReader<T>

如果不存在表示数据的静态类,您可以使用 `DynamicDataReader`。最能说明 `DynamicDataReader` 用途的例子是,当您表的每条记录都表示为 `Dictionary` 时,其中键是列名。这样,如果字典中不存在给定列的值,则假定为 `Null` 值。相反,字典中与表中任何列都不关联的所有项都将被忽略。

public sealed class DynamicDataReader<T> : IDataReader
{
    private readonly IList<SchemaFieldDef> m_schema;
    private readonly IDictionary<string, int> m_schemaMapping;
    private readonly Func<T, string, object> m_selector;
    private IEnumerator<T> m_dataEnumerator;

    public DynamicDataReader(IList<SchemaFieldDef> schema, IEnumerable<T> data, 
        Func<T, string, object> selector)
    {
        m_schema = schema;
        m_schemaMapping = m_schema
            .Select((x, i) => new { x.FieldName, Index = i })
            .ToDictionary(x => x.FieldName, x => x.Index);
        m_selector = selector;
        m_dataEnumerator = data.GetEnumerator();
    }

    #region IDataReader Members

    public void Close()
    {
        Dispose();
    }

    public int Depth => 1;

    public DataTable GetSchemaTable()
    {
        return null;
    }

    public bool IsClosed => m_dataEnumerator == null;

    public bool NextResult()
    {
        return false;
    }

    public bool Read()
    {
        if (IsClosed)
            throw new ObjectDisposedException(GetType().Name);
        return m_dataEnumerator.MoveNext();
    }

    public int RecordsAffected => -1;

    #endregion

    // IDisposable Members

    #region IDataRecord Members

    public int FieldCount => m_schema.Count;

    public int GetOrdinal(string name)
    {
        int ordinal;
        if (!m_schemaMapping.TryGetValue(name, out ordinal))
            throw new InvalidOperationException("Unknown parameter name: " + name);
        return ordinal;
    }

    public object GetValue(int i)
    {
        if (m_dataEnumerator == null)
            throw new ObjectDisposedException(GetType().Name);

        var value = m_selector(m_dataEnumerator.Current, m_schema[i].FieldName);

        if (value == null)
            return DBNull.Value;

        var strValue = value as string;
        if (strValue != null)
        {
            if (strValue.Length > m_schema[i].Size && m_schema[i].Size > 0)
                strValue = strValue.Substring(0, m_schema[i].Size);
            if (m_schema[i].DataType == DbType.String)
                return strValue;
            return SchemaFieldDef.StringToTypedValue(strValue, m_schema[i].DataType) ?? DBNull.Value;
        }

        return value;
    }

    // Not Implemented IDataRecord Members

    #endregion
}

`DynamicDataReader` 依赖于 `SchemaFieldDef` 类,该类描述了表列的字段名称、大小和数据库数据类型。只有通过构造函数传递的列(`IList schema`)才会参与数据插入。构造函数的另外两个参数代表数据本身(`IEnumerable data`)和用于访问属性的用户定义的 lambda 表达式(`Func selector`)。如您所见,`selector` 接受 `T` 的实例和 `string` 字段名,并返回代表与该字段名关联的值的 `object`。请注意,`object` 的数据类型可以是与数据库中实际类型(`int`、`numeric`、`datetime`、`uniqueidentifier` 等)对应的非字符串 C# 类型(`int`、`decimal`、`DateTime`、`Guid` 等),或者仅仅是一个 `string`。在后一种情况下,`DynamicDataReader` 将在 `SchemaFieldDef.StringToTypedValue()` 方法的帮助下,尝试自动将字符串值转换为适当的数据类型。此方法仅支持少数数据类型,但如果需要,可以轻松扩展。

以下是 `DynamicDataReader` 与 `SqlBulkCopy` 一起使用的示例:

private static async Task RunDynamicDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    var fields = await new TableSchemaProvider(connection, "DynamicData").GetFieldsAsync();

    using (var bulkCopy = new SqlBulkCopy(connection))
    {
        bulkCopy.DestinationTableName = "DynamicData";
        bulkCopy.BatchSize = 1000;
        bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

        foreach (var field in fields)
            bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

        var data = new RandomDataGenerator().GetDynamicData(count);

        using (var reader = new DynamicDataReader<IDictionary<string, object>>
				(fields, data, (x, k) => x.GetValueOrDefault(k)))
            await bulkCopy.WriteToServerAsync(reader, cancellationToken);
    }
}

这与 `ObjectDataReader` 的用法非常相似,只是字段不是静态绑定的。

CSV 文件导入

最后,第三个演示操作结合了 `CsvParser`、`DynamicDataReader` 和 `SqlBulkCopy` 类,以实现 `CSV` 格式的高性能、可扩展的数据导入。

private static async Task RunCsvDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    using (var csvReader = new StreamReader(@"Data\CsvData.csv"))
    {
        var csvData = CsvParser.ParseHeadAndTail(csvReader, ',', '"');

        var csvHeader = csvData.Item1
            .Select((x, i) => new {Index = i, Field = x})
            .ToDictionary(x => x.Field, x => x.Index);

        var csvLines = csvData.Item2;

        var fields = await new TableSchemaProvider(connection, "CsvData").GetFieldsAsync();

        using (var bulkCopy = new SqlBulkCopy(connection))
        {
            bulkCopy.DestinationTableName = "CsvData";
            bulkCopy.BatchSize = 1000;
            bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

            foreach (var field in fields)
                bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

            using (var reader = new DynamicDataReader<IList<string>>(fields, csvLines.Take(count),
                (x, k) => x.GetValueOrDefault(csvHeader.GetValueOrDefault(k, -1))))
            {
                await bulkCopy.WriteToServerAsync(reader, cancellationToken);
            }
        }
    }
}

为了演示目的,*CsvData.csv* 文件中只有 `1,000` 行。然而,这个特定的解决方案能够以相对稳定的性能处理任意数量的行。它会将 `CSV` 文件中的列名与目标表的列名进行匹配。缺失的数据将用 `Null` 填充。目标表中不存在的任何额外列都将被忽略。

摘要

在本文中,我演示了使用托管代码处理高性能数据库插入的一种可能方法。我的目标是构建一个灵活且易于使用的 API,以便将其应用于许多不同的场景。特别是,使用 `ObjectDataReader` 上传表示为静态定义的 `POCO` 类的数据,并使用 `DynamicDataReader` 上传任何结构的数据。

历史

  • 更新了 **CSV 文件导入** 演示,以便值直接从 `IList` 按索引访问。
© . All rights reserved.