使用 Entity Framework 进行 BulkInsert






4.94/5 (32投票s)
如何使用 DbContext 扩展 Entity Framework 的批量插入功能
引言
该项目的最新状态可以在 https://github.com/mtanneryd/ef6-bulk-operations 找到。也有一个 nuget 包可以在 https://nuget.net.cn/packages/Tanneryd.BulkOperations.EF6 找到。
Entity Framework (EF) 在很多方面都很棒,但在插入大量数据时,它并不是你真正需要的,除非你有大量时间可以用来等待。在这篇文章中,我将展示一种使用 EF 和批量插入功能的方法,而无需离开 EF 的舒适区。该代码基于我几年前的一个想法,但自那以后我添加了一些新功能。我认为是时候写一篇完整的文章并提供一些示例代码了。
该代码的最新版本可以在 这里 以 nuget 包的形式找到。
此外,这篇文章的更新版本可以在 这里 找到。
背景
之所以花费这么长时间,是因为每个单独的 insert
语句都被封装在存储过程 sp_executesql
中。 因此,如果你需要进行一百万次插入,你就会进行一百万次与数据库的往返,每次都使用该存储过程并在其中执行 insert
语句。难怪需要很长时间才能完成。
通常,这看起来是这样的
exec sp_executesql N'INSERT [dbo].[Prime]([Number], [CompositeId])
VALUES (@0, @1)
SELECT [Id]
FROM [dbo].[Prime]
WHERE @@ROWCOUNT > 0 AND [Id] = scope_identity()',N'@0 bigint,@1 bigint',@0=7,@1=198
当我着手寻找一种方法来稍微加快速度时,我有以下要求。
- 极其简单地将该功能添加到任何 EF6 项目
- 最大限度地减少对现有代码的影响
- 处理完整的对象图
挑战
扩展 DbContext 的功能
为了与派生的 DbContext
类无缝集成,我决定将 BulkInsert
方法添加到一个单独的局部类声明中。BulkInsertAll
是唯一的 public
方法,但也有几个 private
方法,将它们都放在一个单独的文件中并利用局部类模式似乎是一个实用的解决方案。
表映射
我想能够直接从上下文和实体对象本身推断出所有相关信息,例如表名、列名、外键以及我可能需要的任何其他信息。但是,据我所知,没有文档记录的方法可以访问 Entity Framework 的映射数据,所以我不得不诉诸于使用反射。这可行,但也意味着代码在新版本的 EF 中可能无法正常工作。以下代码用于使用反射提取初始映射数据。
var objectContext = ((IObjectContextAdapter)this).ObjectContext;
var workspace = objectContext.MetadataWorkspace;
var containerName = objectContext.DefaultContainerName;
Type t = entities[0].GetType();
var entityName = t.Name;
var storageMapping = workspace.GetItem<GlobalItem>(containerName, DataSpace.CSSpace);
dynamic temp = storageMapping.GetType().InvokeMember(
"EntitySetMappings",
BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance,
null, storageMapping, null);
temp
变量现在包含一个 EntitySetMapping
对象的枚举,我们将其转换为一个列表,以便我们可以 Single()
出我们实体类型的映射。
var entitySetMaps = new List<EntitySetMapping>();
foreach (var t in temp)
{
entitySetMaps.Add((EntitySetMapping)t);
}
var entitySetMap = entitySetMaps.Single(m => m.EntitySet.ElementType.Name == entityName);
我们的下一步是找到 CLR 属性名和表列名之间的映射。
var typeMappings = entitySetMap.EntityTypeMappings;
EntityTypeMapping typeMapping = typeMappings[0];
var fragments = typeMapping.Fragments;
var fragment = fragments[0];
var properties = fragment.PropertyMappings;
foreach (var property in properties.Where(p => p is ScalarPropertyMapping).Cast<ScalarPropertyMapping>())
{
var clrProperty = property.Property;
var columnProperty = property.Column;
columnMappings.Add(clrProperty.Name, new CLR2ColumnMapping
{
CLRProperty = clrProperty,
ColumnProperty = columnProperty,
});
}
最后,我们查找任何外键关系。
var foreignKeyMappings = new List<ForeignKeyMapping>();
var navigationProperties =
typeMapping.EntityType.DeclaredMembers.Where(
m => m.BuiltInTypeKind == BuiltInTypeKind.NavigationProperty)
.Cast<NavigationProperty>()
.Where(p => p.RelationshipType is AssociationType)
.ToArray();
foreach (var navigationProperty in navigationProperties)
{
var relType = (AssociationType)navigationProperty.RelationshipType;
if (foreignKeyMappings.All(m => m.Name != relType.Name))
{
var fkMapping = new ForeignKeyMapping
{
NavigationPropertyName = navigationProperty.Name,
BuiltInTypeKind = navigationProperty.TypeUsage.EdmType.BuiltInTypeKind,
Name = relType.Name,
FromType = relType.Constraint.FromProperties.Single().DeclaringType.Name,
FromProperty = relType.Constraint.FromProperties.Single().Name,
ToType = relType.Constraint.ToProperties.Single().DeclaringType.Name,
ToProperty = relType.Constraint.ToProperties.Single().Name,
};
foreignKeyMappings.Add(fkMapping);
}
}
标识列
Entity Framework 在插入新行时会设置任何生成的标识列,正如你在上面的 sp_executesql
代码片段中看到的,它使用 scope_identity()
来做到这一点。只要一次插入一行,就像 EF 所做的那样,这效果很好,但现在我们想要使用批量插入,我们需要一种稍微更复杂的方法。因此,以下代码会在主键列是标识列时为我们完成此操作。
var pkColumnName = pkColumn.Name;
var pkColumnType = Type.GetType(pkColumn.PrimitiveType.ClrEquivalentType.FullName);
// Get the number of existing rows in the table.
cmd.CommandText = $@"SELECT COUNT(*) FROM {tableName}";
var result = cmd.ExecuteScalar();
var count = Convert.ToInt64(result);
// Get the identity increment value
cmd.CommandText = $"SELECT IDENT_INCR('{tableName}')";
result = cmd.ExecuteScalar();
dynamic identIncrement = Convert.ChangeType(result, pkColumnType);
// Get the last identity value generated for our table
cmd.CommandText = $"SELECT IDENT_CURRENT('{tableName}')";
result = cmd.ExecuteScalar();
dynamic identcurrent = Convert.ChangeType(result, pkColumnType);
var nextId = identcurrent + (count > 0 ? identIncrement : 0);
bulkCopy.BulkCopyTimeout = 5 * 60;
bulkCopy.WriteToServer(table);
cmd.CommandText = $"SELECT SCOPE_IDENTITY()";
result = cmd.ExecuteScalar();
dynamic lastId = Convert.ChangeType(result, pkColumnType);
cmd.CommandText = $"SELECT {pkColumnName} From {tableName}
WHERE {pkColumnName} >= {nextId} and {pkColumnName} <= {lastId}";
var reader = cmd.ExecuteReader();
var ids = (from IDataRecord r in reader
let pk = r[pkColumnName]
select pk)
.OrderBy(i => i)
.ToArray();
if (ids.Length != entities.Count) throw new ArgumentException
("More id values generated than we had entities. Something went wrong, try again.");
for (int i = 0; i < entities.Count; i++)
{
SetProperty(pkColumnName, entities[i], ids[i]);
}
外键关系
我们对实体映射的搜索获得了有关外键的信息,并借此,我们可以实现一个递归模式,该模式将允许批量插入方法保存完整的实体图并为所有已保存的对象设置标识列。BulkInsert
方法接受三个参数。
public void BulkInsertAll(IList entities, SqlTransaction transaction = null, bool recursive = false)
实体列表必须都是同一类型。事务是可选的,递归标志告诉我们是仅保存列表中的实体(false
)还是同时保存任何导航属性中的实体(true
)。我们有两种类型的导航属性需要处理。
- 单属性,此实体中的外键指向另一个实体的结果
- 集合属性,另一个实体具有指向此实体的外键的结果
我们首先查找所有单个导航属性并保存它们(如果它们是新的),然后用外键值更新实体。这必须一次处理一个导航属性。完成此操作后,我们可以安全地保存所有实体。最后,我们遍历所有集合导航属性,将正确的外键设置为我们当前拥有主键的实体,然后我们对每个集合导航属性进行递归调用 BulkInsert
方法。
Using the Code
你所要做的就是将下面的 partial
类重命名并放在与你的派生 DbContext
类相同的命名空间中。
/*
* Copyright © 2017 Tånneryd IT AB
*
* This file is part of the tutorial application BulkInsert.App.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://apache.ac.cn/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.Entity.Core.Mapping;
using System.Data.Entity.Infrastructure;
using System.Data.Entity.Core.Metadata.Edm;
using System.Data.SqlClient;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
namespace BulkInsert.App
{
public partial class NumberContext
{
public NumberContext(string nameOrConnectionString)
: base($"name={nameOrConnectionString}")
{
}
public void BulkInsertAll(IList entities, SqlTransaction transaction = null,
bool recursive = false)
{
BulkInsertAll(entities, transaction, recursive,
new Dictionary<object, object>(new IdentityEqualityComparer<object>()));
}
private void BulkInsertAll(IList entities, SqlTransaction transaction,
bool recursive, Dictionary<object, object> savedEntities)
{
if (entities.Count == 0) return;
var objectContext = ((IObjectContextAdapter)this).ObjectContext;
var workspace = objectContext.MetadataWorkspace;
Type t = entities[0].GetType();
var mappings = GetMappings(workspace, objectContext.DefaultContainerName, t.Name);
if (recursive)
{
foreach (var fkMapping in mappings.ToForeignKeyMappings)
{
var navProperties = new HashSet<object>();
var modifiedEntities = new List<object[]>();
foreach (var entity in entities)
{
var navProperty = GetProperty(fkMapping.NavigationPropertyName, entity);
var navPropertyKey = GetProperty(fkMapping.ToProperty, entity);
if (navProperty != null && navPropertyKey == 0)
{
var currentValue = GetProperty(fkMapping.FromProperty, navProperty);
if (currentValue > 0)
{
SetProperty(fkMapping.ToProperty, entity, currentValue);
}
else
{
navProperties.Add(navProperty);
modifiedEntities.Add(new object[] { entity, navProperty });
}
}
}
if (navProperties.Any())
{
BulkInsertAll(navProperties.ToArray(), transaction, true, savedEntities);
foreach (var modifiedEntity in modifiedEntities)
{
var e = modifiedEntity[0];
var p = modifiedEntity[1];
SetProperty(fkMapping.ToProperty, e,
GetProperty(fkMapping.FromProperty, p));
}
}
}
}
var validEntities = new ArrayList();
var ignoredEntities = new ArrayList();
foreach (dynamic entity in entities)
{
if (savedEntities.ContainsKey(entity))
{
ignoredEntities.Add(entity);
continue;
}
validEntities.Add(entity);
savedEntities.Add(entity, entity);
}
BulkInsertAll(validEntities, t, mappings, transaction);
if (recursive)
{
foreach (var fkMapping in mappings.FromForeignKeyMappings)
{
var navigationPropertyName = fkMapping.NavigationPropertyName;
var navPropertyEntities = new List<dynamic>();
foreach (var entity in entities)
{
if (fkMapping.BuiltInTypeKind == BuiltInTypeKind.CollectionType ||
fkMapping.BuiltInTypeKind ==
BuiltInTypeKind.CollectionKind)
{
var navProperties = GetProperty(navigationPropertyName, entity);
foreach (var navProperty in navProperties)
{
SetProperty(fkMapping.ToProperty, navProperty,
GetProperty(fkMapping.FromProperty, entity));
navPropertyEntities.Add(navProperty);
}
}
else
{
var navProperty = GetProperty(navigationPropertyName, entity);
if (navProperty != null)
{
SetProperty(fkMapping.ToProperty, navProperty,
GetProperty(fkMapping.FromProperty, entity));
navPropertyEntities.Add(navProperty);
}
}
}
if (navPropertyEntities.Any())
{
BulkInsertAll(navPropertyEntities.ToArray(), transaction, true, savedEntities);
}
}
}
}
private void BulkInsertAll(IList entities, Type t, Mappings mappings,
lTransaction transaction = null)
{
Set(t).ToString();
var tableName = GetTableName(t);
var columnMappings = mappings.ColumnMappings;
var conn = (SqlConnection)Database.Connection;
if (conn.State == ConnectionState.Closed)
conn.Open();
var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, transaction)
var properties = t.GetProperties().Where
(p => columnMappings.ContainsKey(p.Name)).ToArray();
var table = new DataTable();
foreach (var property in properties)
{
Type propertyType = property.PropertyType;
// Nullable properties need special treatment.
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
// Ignore all properties that we have no mappings for.
if (columnMappings.ContainsKey(property.Name))
{
// Since we cannot trust the CLR type properties to be in the same order as
// the table columns we use the SqlBulkCopy column mappings.
table.Columns.Add(new DataColumn(property.Name, propertyType));
var clrPropertyName = property.Name;
var tableColumnName = columnMappings[property.Name].ColumnProperty.Name;
bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping
(clrPropertyName, tableColumnName));
}
}
// Add all our entities to our data table
foreach (var entity in entities)
{
var e = entity;
table.Rows.Add(properties.Select(property => GetPropertyValue
(property.GetValue(e, null))).ToArray());
}
var cmd = conn.CreateCommand();
cmd.Transaction = transaction;
// Check to see if the table has a primary key with auto identity set. If so
// set the generated primary key values on the entities.
var pkColumn = columnMappings.Values.Where(m =>
m.ColumnProperty.IsStoreGeneratedIdentity).Select(m => m.ColumnProperty).SingleOrDefault();
if (pkColumn != null)
{
var pkColumnName = pkColumn.Name;
var pkColumnType = Type.GetType(pkColumn.PrimitiveType.ClrEquivalentType.FullName);
// Get the number of existing rows in the table.
cmd.CommandText = $@"SELECT COUNT(*) FROM {tableName}";
var result = cmd.ExecuteScalar();
var count = Convert.ToInt64(result);
// Get the identity increment value
cmd.CommandText = $"SELECT IDENT_INCR('{tableName}')";
result = cmd.ExecuteScalar();
dynamic identIncrement = Convert.ChangeType(result, pkColumnType);
// Get the last identity value generated for our table
cmd.CommandText = $"SELECT IDENT_CURRENT('{tableName}')";
result = cmd.ExecuteScalar();
dynamic identcurrent = Convert.ChangeType(result, pkColumnType);
var nextId = identcurrent + (count > 0 ? identIncrement : 0);
bulkCopy.BulkCopyTimeout = 5 * 60;
bulkCopy.WriteToServer(table);
cmd.CommandText = $"SELECT SCOPE_IDENTITY()";
result = cmd.ExecuteScalar();
dynamic lastId = Convert.ChangeType(result, pkColumnType);
cmd.CommandText = $"SELECT {pkColumnName} From {tableName}
WHERE {pkColumnName} >= {nextId} and {pkColumnName} <= {lastId}";
var reader = cmd.ExecuteReader();
var ids = (from IDataRecord r in reader
let pk = r[pkColumnName]
select pk)
.OrderBy(i => i)
.ToArray();
if (ids.Length != entities.Count) throw new ArgumentException
SetProperty(pkColumnName, entities[i], ids[i]);
}
}
else
{
bulkCopy.BulkCopyTimeout = 5 * 60;
bulkCopy.WriteToServer(table);
}
}
private string GetTableName(Type t)
{
var dbSet = Set(t);
var sql = dbSet.ToString();
var regex = new Regex(@"FROM (?<table>.*) AS");
var match = regex.Match(sql);
return match.Groups["table"].Value;
}
private object GetPropertyValue(object o)
{
if (o == null)
return DBNull.Value;
return o;
}
private Mappings GetMappings(MetadataWorkspace workspace,
string containerName, string entityName)
{
var columnMappings = new Dictionary<string, CLR2ColumnMapping>();
var storageMapping = workspace.GetItem<GlobalItem>(containerName, DataSpace.CSSpace);
dynamic temp = storageMapping.GetType().InvokeMember(
"EntitySetMappings",
BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance,
null, storageMapping, null);
var entitySetMaps = new List<EntitySetMapping>();
foreach (var t in temp)
{
entitySetMaps.Add((EntitySetMapping)t);
}
var entitySetMap = entitySetMaps.Single(m => m.EntitySet.ElementType.Name == entityName);
var typeMappings = entitySetMap.EntityTypeMappings;
EntityTypeMapping typeMapping = typeMappings[0];
var fragments = typeMapping.Fragments;
var fragment = fragments[0];
var properties = fragment.PropertyMappings;
foreach (var property in properties.Where
(p => p is ScalarPropertyMapping).Cast<ScalarPropertyMapping>())
{
var clrProperty = property.Property;
var columnProperty = property.Column;
columnMappings.Add(clrProperty.Name, new CLR2ColumnMapping
{
CLRProperty = clrProperty,
ColumnProperty = columnProperty,
});
}
var foreignKeyMappings = new List<ForeignKeyMapping>();
var navigationProperties =
typeMapping.EntityType.DeclaredMembers.Where
(m => m.BuiltInTypeKind == BuiltInTypeKind.NavigationProperty)
.Cast<NavigationProperty>()
.Where(p => p.RelationshipType is AssociationType)
.ToArray();
foreach (var navigationProperty in navigationProperties)
{
var relType = (AssociationType)navigationProperty.RelationshipType;
if (foreignKeyMappings.All(m => m.Name != relType.Name))
{
var fkMapping = new ForeignKeyMapping
{
NavigationPropertyName = navigationProperty.Name,
BuiltInTypeKind = navigationProperty.TypeUsage.EdmType.BuiltInTypeKind,
Name = relType.Name,
FromType = relType.Constraint.FromProperties.Single().DeclaringType.Name,
FromProperty = relType.Constraint.FromProperties.Single().Name,
ToType = relType.Constraint.ToProperties.Single().DeclaringType.Name,
ToProperty = relType.Constraint.ToProperties.Single().Name,
};
foreignKeyMappings.Add(fkMapping);
}
}
return new Mappings
{
ColumnMappings = columnMappings,
ToForeignKeyMappings = foreignKeyMappings.Where(m => m.ToType == entityName).ToArray(),
FromForeignKeyMappings = foreignKeyMappings.Where
(m => m.FromType == entityName).ToArray()
};
}
private dynamic GetProperty(string property, object instance)
{
var type = instance.GetType();
return type.InvokeMember(property, BindingFlags.GetProperty | BindingFlags.Public |
BindingFlags.Instance, Type.DefaultBinder, instance, null);
}
private void SetProperty(string property, object instance, object value)
{
var type = instance.GetType();
type.InvokeMember(property, BindingFlags.SetProperty | BindingFlags.Public |
BindingFlags.Instance, Type.DefaultBinder, instance, new[] { value });
}
}
class IdentityEqualityComparer<T> : IEqualityComparer<T> where T : class
{
public int GetHashCode(T value)
{
return RuntimeHelpers.GetHashCode(value);
}
public bool Equals(T left, T right)
{
return left == right; // Reference identity comparison
}
}
class Mappings
{
public Dictionary<string, CLR2ColumnMapping> ColumnMappings { get; set; }
public ForeignKeyMapping[] ToForeignKeyMappings { get; set; }
public ForeignKeyMapping[] FromForeignKeyMappings { get; set; }
}
class CLR2ColumnMapping
{
public EdmProperty CLRProperty { get; set; }
public EdmProperty ColumnProperty { get; set; }
}
class ForeignKeyMapping
{
public BuiltInTypeKind BuiltInTypeKind { get; set; }
public string NavigationPropertyName { get; set; }
public string Name { get; set; }
public string FromType { get; set; }
public string FromProperty { get; set; }
public string ToType { get; set; }
public string ToProperty { get; set; }
}
}
你还可以从 此链接 下载 Bulkinsert.App
教程应用程序。
历史
- 2017-04-15
- 修复了另一个与复杂实体图相关的次要 bug
- 2017-04-07
- 将软件许可证更改为 Apache License 2.0
- 2017-03-29
- 修复了一些次要 bug,并增加了对相同实体对象存在于多个地方的复杂实体图的支持。现在确保每个实体只保存一次!
- 2017-03-17
- 上传了更新的示例代码。保存复杂实体图的性能显著提高
- 2017-03-10
- 上传了初始版本