使用SqlBulkCopy和IDataReader进行高性能插入






4.84/5 (16投票s)
使用 SqlBulkCopy 和两种 IDataReader 实现,用于将静态和动态数据结构插入 SQL Server 表。
引言
最终,`SqlBulkCopy` 可以有三种使用方式:插入表示为 `DataTable` 对象、`DataRow` 对象数组或 `IDataReader` 实例的数据。在本文中,我将演示两种 `IDataReader` 接口的实现,它们与 `SqlBulkCopy` 结合使用以实现高性能数据库插入。其他两种选项彼此相似,可用于相对少量的数据,因为它们要求在将所有记录交给 `SqlBulkCopy` 之前将其全部加载到内存中。相比之下,`IDataReader` 方法更灵活,允许在“惰性”模式下处理无限数量的记录,这意味着数据可以根据服务器的消耗速度即时提供给 `SqlBulkCopy`。这类似于 `IList
使用演示
附加的演示项目包含一个预编译的控制台应用程序,其中包含配置文件和 `Data` 子文件夹,里面有示例 `CSV` 文件。在运行演示之前,请确保调整配置文件,指定名为“`DefaultDb`”的正确连接字符串。另一个设置“`MaxRecordCount`”默认等于 `100,000`,这对于此演示来说应该足够了。请注意,连接字符串可以指向任何现有数据库。所有演示表都将自动创建,因此无需手动设置数据库。
启动演示后,它会在控制台窗口中提示您按 `Enter` 键,然后初始化数据库,并在执行每个演示操作之前再次提示。我的机器上完成的演示如下所示:
首先,应用程序将尝试初始化数据库。它将创建(或重新创建)三个表 - 每个演示操作一个。
- `Contacts` 表,包含 `Id`、`FirstName`、`LastName` 和 `BirthDate` 列。
- `DynamicData` 表,包含一个 `Id`、10 个 `integer`、10 个 `string`、10 个 `datetime` 和 10 个 `guid` 列。
- `CsvData` 表,其结构与 `DynamicData` 相同。
然后应用程序将执行三个演示操作,并测量每个操作的时间。
- `Static Dataset Demo` 演示了 `ObjectDataReader
`,它允许处理任何 `POCO` 类的实例(在本例中为 `Contact` 类)。 - `Dynamic Dataset Demo` 演示了 `DynamicDataReader
`,它也实现了 `IDataReader`,但允许用户通过用户定义的 lambda 表达式决定如何从 `T` 的底层对象中提取数据。在此演示中,我使用 `IDictionary ` 来表示数据。 - `CSV Import Demo` 利用 `CsvParser` 类和上述 `DynamicDataReader
` 来高效地将附加的“Data\CsvData.csv”文件加载到数据库中。
前两个演示的数据是使用辅助类 `RandomDataGenerator` 动态随机生成的。另一个辅助类 `TableSchemaProvider` 用于从 SQL Server 提取一些元数据并执行一些实用 SQL 命令。
ObjectDataReader<T>
如下所示,`ObjectDataReader
public sealed class ObjectDataReader<TData> : IDataReader
{
private class PropertyAccessor
{
public List<Func<TData, object>> Accessors { get; set; }
public Dictionary<string, int> Lookup { get; set; }
}
private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
new Lazy<PropertyAccessor>(() =>
{
var propertyAccessors = typeof(TData)
.GetProperties(BindingFlags.Instance | BindingFlags.Public)
.Where(p => p.CanRead)
.Select((p, i) => new
{
Index = i,
Property = p,
Accessor = CreatePropertyAccessor(p)
})
.ToArray();
return new PropertyAccessor
{
Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
Lookup = propertyAccessors.ToDictionary(
p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase)
};
});
private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
{
var parameter = Expression.Parameter(typeof(TData), "input");
var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
return lamda.Compile();
}
private IEnumerator<TData> m_dataEnumerator;
public ObjectDataReader(IEnumerable<TData> data)
{
m_dataEnumerator = data.GetEnumerator();
}
#region IDataReader Members
public void Close()
{
Dispose();
}
public int Depth => 1;
public DataTable GetSchemaTable()
{
return null;
}
public bool IsClosed => m_dataEnumerator == null;
public bool NextResult()
{
return false;
}
public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}
public int RecordsAffected => -1;
#endregion
// IDisposable Members
#region IDataRecord Members
public int GetOrdinal(string name)
{
int ordinal;
if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}
public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);
return s_propertyAccessorCache.Value.Accessors[i](m_dataEnumerator.Current);
}
public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;
// Not Implemented IDataRecord Members ...
#endregion
}
一旦实现了 `ObjectDataReader
private static async Task RunStaticDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "Contacts";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
bulkCopy.ColumnMappings.Add("Id", "Id");
bulkCopy.ColumnMappings.Add("FirstName", "FirstName");
bulkCopy.ColumnMappings.Add("LastName", "LastName");
bulkCopy.ColumnMappings.Add("BirthDate", "BirthDate");
using (var reader = new ObjectDataReader<Contact>(new RandomDataGenerator().GetContacts(count)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
DynamicDataReader<T>
如果不存在表示数据的静态类,您可以使用 `DynamicDataReader
public sealed class DynamicDataReader<T> : IDataReader
{
private readonly IList<SchemaFieldDef> m_schema;
private readonly IDictionary<string, int> m_schemaMapping;
private readonly Func<T, string, object> m_selector;
private IEnumerator<T> m_dataEnumerator;
public DynamicDataReader(IList<SchemaFieldDef> schema, IEnumerable<T> data,
Func<T, string, object> selector)
{
m_schema = schema;
m_schemaMapping = m_schema
.Select((x, i) => new { x.FieldName, Index = i })
.ToDictionary(x => x.FieldName, x => x.Index);
m_selector = selector;
m_dataEnumerator = data.GetEnumerator();
}
#region IDataReader Members
public void Close()
{
Dispose();
}
public int Depth => 1;
public DataTable GetSchemaTable()
{
return null;
}
public bool IsClosed => m_dataEnumerator == null;
public bool NextResult()
{
return false;
}
public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}
public int RecordsAffected => -1;
#endregion
// IDisposable Members
#region IDataRecord Members
public int FieldCount => m_schema.Count;
public int GetOrdinal(string name)
{
int ordinal;
if (!m_schemaMapping.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}
public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);
var value = m_selector(m_dataEnumerator.Current, m_schema[i].FieldName);
if (value == null)
return DBNull.Value;
var strValue = value as string;
if (strValue != null)
{
if (strValue.Length > m_schema[i].Size && m_schema[i].Size > 0)
strValue = strValue.Substring(0, m_schema[i].Size);
if (m_schema[i].DataType == DbType.String)
return strValue;
return SchemaFieldDef.StringToTypedValue(strValue, m_schema[i].DataType) ?? DBNull.Value;
}
return value;
}
// Not Implemented IDataRecord Members
#endregion
}
`DynamicDataReader
以下是 `DynamicDataReader
private static async Task RunDynamicDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
var fields = await new TableSchemaProvider(connection, "DynamicData").GetFieldsAsync();
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "DynamicData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);
var data = new RandomDataGenerator().GetDynamicData(count);
using (var reader = new DynamicDataReader<IDictionary<string, object>>
(fields, data, (x, k) => x.GetValueOrDefault(k)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
这与 `ObjectDataReader` 的用法非常相似,只是字段不是静态绑定的。
CSV 文件导入
最后,第三个演示操作结合了 `CsvParser`、`DynamicDataReader` 和 `SqlBulkCopy` 类,以实现 `CSV` 格式的高性能、可扩展的数据导入。
private static async Task RunCsvDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var csvReader = new StreamReader(@"Data\CsvData.csv"))
{
var csvData = CsvParser.ParseHeadAndTail(csvReader, ',', '"');
var csvHeader = csvData.Item1
.Select((x, i) => new {Index = i, Field = x})
.ToDictionary(x => x.Field, x => x.Index);
var csvLines = csvData.Item2;
var fields = await new TableSchemaProvider(connection, "CsvData").GetFieldsAsync();
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "CsvData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);
using (var reader = new DynamicDataReader<IList<string>>(fields, csvLines.Take(count),
(x, k) => x.GetValueOrDefault(csvHeader.GetValueOrDefault(k, -1))))
{
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
}
}
为了演示目的,*CsvData.csv* 文件中只有 `1,000` 行。然而,这个特定的解决方案能够以相对稳定的性能处理任意数量的行。它会将 `CSV` 文件中的列名与目标表的列名进行匹配。缺失的数据将用 `Null` 填充。目标表中不存在的任何额外列都将被忽略。
摘要
在本文中,我演示了使用托管代码处理高性能数据库插入的一种可能方法。我的目标是构建一个灵活且易于使用的 API,以便将其应用于许多不同的场景。特别是,使用 `ObjectDataReader
历史
- 更新了 **CSV 文件导入** 演示,以便值直接从 `IList
` 按索引访问。