使用 Entity Framework 进行批量操作






4.67/5 (9投票s)
使用 Entity Framework 扩展批量插入和更新操作
引言
该项目的当前状态可以在 GitHub 这里 找到。还有一个可用的 nuget 包 在这里。
为 EF 上下文扩展批量操作的想法起源于 2011 年为 LINQ-to-SQL 编写的一个 小型部分类。大约一年后,我还为 EF 发布了两个版本,分别在 这里 和 这里。它们在一定程度上解决了问题,但仅适用于相对简单的表。2017 年,我做了一些努力来处理更复杂的数据库设计,并在 CodeProject 上 发表了一篇文章,并附带了一些示例代码。在我自己的项目中使用的越多,我发现我需要添加的东西就越多,而且我也认为将其发布为一个 nuget 包会很棒。于是,我这样做了。在本文中,我将重新审视我之前文章中提到的一些内容,但也会讨论如何处理批量更新和更复杂的外键设置。希望你会觉得它有用。nuget 包的名称是 Tanneryd.BulkOperations.EF6。
背景
使用 EF 进行插入,当插入数量达到数十万甚至数百万时,之所以需要花费很长时间才能完成,主要原因是每一个插入操作都被封装并通过存储过程 sp_executesql
执行。因此,你将与数据库进行相同数量的往返操作,因为你有多少个 insert
,就有多少个往返,每个往返都会执行这个存储过程,并在其中包含你的 insert
语句。
通常,这可能看起来像这样
exec sp_executesql N'INSERT [dbo].[Prime]([Number], [CompositeId])
VALUES (@0, @1)
SELECT [Id]
FROM [dbo].[Prime]
WHERE @@ROWCOUNT > 0 AND [Id] = scope_identity()',N'@0 bigint,@1 bigint',@0=7,@1=198
更新的处理方式也类似,所以我们在那里也有同样的问题。
Using the Code
批量操作是以 DbContext
类的扩展形式提供的。所以,你只需要安装 nuget 包。
BulkInsertAll
/// <summary>
/// Insert all entities using System.Data.SqlClient.SqlBulkCopy.
/// </summary>
/// <param name="ctx"></param>
/// <param name="entities"></param>
/// <param name="transaction"></param>
/// <param name="recursive">True if the entire entity graph should be inserted,
/// false otherwise.</param>
public static void BulkInsertAll(
this DbContext ctx,
IList entities,
SqlTransaction transaction = null,
bool recursive = false)
如果你将 recursive
设置为 true
,BulkInsertAll
将递归地跟踪所有导航属性,并插入找到的所有新实体。如果你使用事务,你应该从上下文中获取 SqlTransaction
并设置事务参数。
using(var db = new MyContext())
using(var transaction = db.Database.BeginTransaction())
{
try
{
var sqlTransaction = (SqlTransaction)transaction.UnderlyingTransaction;
db.BulkInsertAll(entities, sqlTransaction, true);
transaction.Commit();
}
catch(Exception e)
{
transaction.Rollback();
}
}
BulkUpdateAll
/// <summary>
/// Update all entities using a temp table and System.Data.SqlClient.SqlBulkCopy.
/// Only tables with primary keys will be updated.
/// </summary>
/// <param name="ctx"></param>
/// <param name="entities"></param>
/// <param name="updatedColumns">If defined, only these columns will be updated.</param>
/// <param name="transaction"></param>
public static void BulkUpdateAll(
this DbContext ctx,
IList entities,
string[] updatedColumns = null,
SqlTransaction transaction = null)
BulkUpdateAll
方法将更新映射到实体属性的所有列,或 updatedColumns
参数中指定的列。它只适用于定义了主键的表。
工作原理
表映射
一切皆有关联
Entity Framework 上下文包含我们确定如何将实体映射到表所需的所有信息。然而,这些信息不一定可以通过 public
API 获得,所以我们需要进行一些反射才能获得所需的信息。
首先,我们提取一个称为存储映射的内容,从中获得两样东西:EntitySetMappings
和 AssociationSetMappings
。EntitySetMappings
包含了我们所需的一切,除了关于多对多关系的信息。这些信息我们在 AssociationSetMappings
中找到。
var objectContext = ((IObjectContextAdapter)ctx).ObjectContext;
var workspace = objectContext.MetadataWorkspace;
var containerName = objectContext.DefaultContainerName;
var entityName = t.Name;
var storageMapping = (EntityContainerMapping)workspace.GetItem<GlobalItem>
(containerName, DataSpace.CSSpace);
var entitySetMaps = storageMapping.EntitySetMappings.ToList();
var associationSetMaps = storageMapping.AssociationSetMappings.ToList();
标量属性
弄清楚标量属性如何与表列连接相对简单。标量属性是指除表示外键指向内容之外的所有属性。那些被称为导航属性。但是,持有实际外键的属性仍然是标量属性。
var entitySetMap = entitySetMaps.Single(m => m.EntitySet.ElementType.Name == entityName);
var typeMappings = entitySetMap.EntityTypeMappings;
EntityTypeMapping typeMapping = typeMappings[0];
var fragments = typeMapping.Fragments;
var fragment = fragments[0];
var properties = fragment.PropertyMappings;
var columnMappings = new Dictionary<string, CLR2ColumnMapping>();
foreach (var property in properties.Where
(p => p is ScalarPropertyMapping).Cast<ScalarPropertyMapping>())
{
var clrProperty = property.Property;
var columnProperty = property.Column;
columnMappings.Add(clrProperty.Name, new CLR2ColumnMapping
{
CLRProperty = clrProperty,
ColumnProperty = columnProperty,
});
}
导航属性
首先,我们需要找到所有的导航属性。
var navigationProperties =
typeMapping.EntityType.DeclaredMembers
.Where(m => m.BuiltInTypeKind == BuiltInTypeKind.NavigationProperty)
.Cast<NavigationProperty>()
.Where(p => p.RelationshipType is AssociationType)
.ToArray();
我们将映射数据存储在 ForeignKeyMapping
类中。
var fkMapping = new ForeignKeyMapping
{
NavigationPropertyName = navigationProperty.Name,
BuiltInTypeKind = navigationProperty.TypeUsage.EdmType.BuiltInTypeKind,
Name = relType.Name,
};
一对一和一对多映射并没有太大的麻烦。
fkMapping.FromType = relType.Constraint.FromProperties.First().DeclaringType.Name;
fkMapping.ToType = relType.Constraint.ToProperties.First().DeclaringType.Name;
var foreignKeyRelations = new List<ForeignKeyRelation>();
for (int i = 0; i < relType.Constraint.FromProperties.Count; i++)
{
foreignKeyRelations.Add(new ForeignKeyRelation
{
FromProperty = relType.Constraint.FromProperties[i].Name,
ToProperty = relType.Constraint.ToProperties[i].Name,
});
}
fkMapping.ForeignKeyRelations = foreignKeyRelations.ToArray();
多对多关系则麻烦一些。这些关系是通过一个单独的数据库表实现的,该表连接着两个关联表中的行。这个第三表的表名就是我们在代码片段末尾的存储实体集中找到的内容。
var map = associationSetMaps.Single(m => m.AssociationSet.Name == relType.Name);
var sourceMapping =
new CLR2ColumnMapping
{
ColumnProperty = map.SourceEndMapping.PropertyMappings[0].Column,
CLRProperty = map.SourceEndMapping.PropertyMappings[0].Property,
};
var targetMapping =
new CLR2ColumnMapping
{
ColumnProperty = map.TargetEndMapping.PropertyMappings[0].Column,
CLRProperty = map.TargetEndMapping.PropertyMappings[0].Property,
};
fkMapping.FromType =
(map.SourceEndMapping.AssociationEnd.TypeUsage.EdmType as RefType)?.ElementType.Name;
fkMapping.ToType =
(map.TargetEndMapping.AssociationEnd.TypeUsage.EdmType as RefType)?.ElementType.Name;
fkMapping.AssociationMapping = new AssociationMapping
{
TableName = new TableName
{
Name = map.StoreEntitySet.Table,
Schema = map.StoreEntitySet.Schema,
},
Source = sourceMapping,
Target = targetMapping
};
批量插入
当我们的映射排序好后,我们就可以开始实际的批量插入了。我们分三个步骤进行。首先,我们找到所有一对一的导航属性,如果它们是新的,我们就通过递归调用批量插入来插入它们,并在返回时,将生成的主键值设置到我们实体上的相应外键属性。其次,我们插入作为此批量 insert
调用参数传递的所有实体。最后,我们遍历所有集合类型的导航属性,即一对多和多对多的导航属性,并通过递归调用,批量插入它们。
如果我们运气好的话,这将导致整个实体图被插入。
批量更新
批量 update
不支持实体图,所以这里没有递归操作。选定的列,或者如果没有选择,则所有映射的列,无论是否已修改,都将被更新。为了加快速度,我们首先将传递的实体批量 insert
到一个临时数据库表中,然后运行一个常规的 SQL update
命令,将目标表与我们的临时表连接起来。
实际代码
/*
* Copyright © 2017, 2018 Tånneryd IT AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://apache.ac.cn/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.Entity;
using System.Data.Entity.Core.Mapping;
using System.Data.Entity.Infrastructure;
using System.Data.Entity.Core.Metadata.Edm;
using System.Data.SqlClient;
using System.Dynamic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading;
namespace Tanneryd.BulkInsert
{
public static class DbContextExtensionMethods
{
#region Public API
/// <summary>
/// Update all entities using a temp table and System.Data.SqlClient.SqlBulkCopy.
/// Only tables with primary keys will be updated.
/// </summary>
/// <param name="ctx"></param>
/// <param name="entities"></param>
/// <param name="updatedColumns">If defined, only these columns will be updated.</param>
/// <param name="transaction"></param>
public static void BulkUpdateAll(
this DbContext ctx,
IList entities,
string[] updatedColumns = null,
SqlTransaction transaction = null)
{
if (entities.Count == 0) return;
var globalId = CreateGlobalId(ctx);
using (var mutex = new Mutex(false, globalId))
{
if (mutex.WaitOne())
{
try
{
DoBulkUpdateAll(ctx, entities, updatedColumns, transaction);
}
finally
{
mutex.ReleaseMutex();
}
}
}
}
/// <summary>
/// Insert all entities using System.Data.SqlClient.SqlBulkCopy.
/// </summary>
/// <param name="ctx"></param>
/// <param name="entities"></param>
/// <param name="transaction"></param>
/// <param name="recursive">True if the entire entity graph should be inserted,
/// false otherwise.</param>
public static void BulkInsertAll(
this DbContext ctx,
IList entities,
SqlTransaction transaction = null,
bool recursive = false)
{
if (entities.Count == 0) return;
var globalId = CreateGlobalId(ctx);
using (var mutex = new Mutex(false, globalId))
{
if (mutex.WaitOne())
{
try
{
DoBulkInsertAll(ctx, entities, transaction, recursive,
new Dictionary<object, object>(new IdentityEqualityComparer<object>()));
}
finally
{
mutex.ReleaseMutex();
}
}
}
}
#endregion
#region Private methods
//
// Private methods
//
private static void DoBulkUpdateAll
(this DbContext ctx, IList entities, string[] updatedColumnNames = null,
SqlTransaction transaction = null)
{
if (entities.Count == 0) return;
Type t = entities[0].GetType();
var mappings = GetMappings(ctx, t);
var tableName = mappings.TableName;
var columnMappings = mappings.ColumnMappings;
//
// Check to see if the table has a primary key. If so,
// get a clr property name to table column name mapping.
//
dynamic declaringType = columnMappings
.Values
.First().ColumnProperty.DeclaringType;
var keyMembers = declaringType.KeyMembers;
var pkColumnMappings = columnMappings.Values
.Where(m => keyMembers.Contains(m.ColumnProperty.Name))
.ToArray();
if (pkColumnMappings.Any())
{
//
// Get a clr property name to table column name mapping
// for the columns we want to update.
//
var modifiedColumnMappingCandidates = columnMappings.Values
.Where(m => !keyMembers.Contains(m.ColumnProperty.Name))
.Select(m => m)
.ToArray();
if (updatedColumnNames != null &&
updatedColumnNames.Any())
{
modifiedColumnMappingCandidates =
modifiedColumnMappingCandidates.Where
(c => updatedColumnNames.Contains(c.CLRProperty.Name)).ToArray();
}
var modifiedColumnMappings = modifiedColumnMappingCandidates.ToArray();
//
// Create and populate a temp table to hold the updated values.
//
var conn = GetSqlConnection(ctx);
var tempTableName = FillTempTable(conn, entities, tableName,
columnMappings, pkColumnMappings, modifiedColumnMappings, transaction);
//
// Update the target table using the temp table we just created.
//
var setStatements = modifiedColumnMappings.Select
(c => $"t0.{c.ColumnProperty.Name} = t1.{c.ColumnProperty.Name}");
var setStatementsSql = string.Join(" , ", setStatements);
var conditionStatements = pkColumnMappings.Select
(c => $"t0.{c.ColumnProperty.Name} = t1.{c.ColumnProperty.Name}");
var conditionStatementsSql = string.Join(" AND ", conditionStatements);
var cmdBody = $@"UPDATE t0 SET {setStatementsSql}
FROM {tableName.Fullname} AS t0
INNER JOIN #{tableName.Name} AS t1 ON {conditionStatementsSql}
";
var cmd = new SqlCommand(cmdBody, conn, transaction);
cmd.ExecuteNonQuery();
//
// Clean up. Delete the temp table.
//
var cmdFooter = $@"DROP TABLE {tempTableName}";
cmd = new SqlCommand(cmdFooter, conn, transaction);
cmd.ExecuteNonQuery();
}
}
private static void DoBulkInsertAll(this DbContext ctx, IList entities,
SqlTransaction transaction, bool recursive, Dictionary<object, object> savedEntities)
{
if (entities.Count == 0) return;
Type t = entities[0].GetType();
var mappings = GetMappings(ctx, t);
if (recursive)
{
foreach (var fkMapping in mappings.ToForeignKeyMappings)
{
var navProperties = new HashSet<object>();
var modifiedEntities = new List<object[]>();
foreach (var entity in entities)
{
var navProperty = GetProperty(fkMapping.NavigationPropertyName, entity);
if (navProperty != null)
{
foreach (var foreignKeyRelation in fkMapping.ForeignKeyRelations)
{
var navPropertyKey =
GetProperty(foreignKeyRelation.ToProperty, entity);
if (navPropertyKey == 0)
{
var currentValue = GetProperty(foreignKeyRelation.FromProperty,
navProperty);
if (currentValue > 0)
{
SetProperty
(foreignKeyRelation.ToProperty, entity, currentValue);
}
else
{
if (navProperty != entity)
{
navProperties.Add(navProperty);
modifiedEntities.Add(new object[]
{ entity, navProperty });
}
}
}
}
}
}
if (!navProperties.Any()) continue;
DoBulkInsertAll(ctx, navProperties.ToArray(), transaction, true, savedEntities);
foreach (var modifiedEntity in modifiedEntities)
{
var e = modifiedEntity[0];
var p = modifiedEntity[1];
foreach (var foreignKeyRelation in fkMapping.ForeignKeyRelations)
{
SetProperty(foreignKeyRelation.ToProperty, e,
GetProperty(foreignKeyRelation.FromProperty, p));
}
}
}
}
var validEntities = new ArrayList();
var ignoredEntities = new ArrayList();
foreach (dynamic entity in entities)
{
if (savedEntities.ContainsKey(entity))
{
ignoredEntities.Add(entity);
continue;
}
validEntities.Add(entity);
savedEntities.Add(entity, entity);
}
DoBulkInsertAll(ctx, validEntities, t, mappings, transaction);
if (recursive)
{
foreach (var fkMapping in mappings.FromForeignKeyMappings)
{
var navigationPropertyName = fkMapping.NavigationPropertyName;
var navPropertyEntities = new List<dynamic>();
var navPropertySelfReferences = new List<SelfReference>();
foreach (var entity in entities)
{
if (fkMapping.BuiltInTypeKind == BuiltInTypeKind.CollectionType ||
fkMapping.BuiltInTypeKind == BuiltInTypeKind.CollectionKind)
{
var navProperties = GetProperty(navigationPropertyName, entity);
if (fkMapping.ForeignKeyRelations != null)
{
foreach (var navProperty in navProperties)
{
foreach (var foreignKeyRelation in fkMapping.ForeignKeyRelations)
{
SetProperty(foreignKeyRelation.ToProperty, navProperty,
GetProperty(foreignKeyRelation.FromProperty, entity));
}
navPropertyEntities.Add(navProperty);
}
}
else if (fkMapping.AssociationMapping != null)
{
foreach (var navProperty in navProperties)
{
dynamic np = new ExpandoObject();
AddProperty(np,
fkMapping.AssociationMapping.Source.ColumnProperty.Name,
GetProperty(fkMapping.AssociationMapping.Source.CLRProperty.Name,
entity));
AddProperty(np,
fkMapping.AssociationMapping.Target.ColumnProperty.Name,
GetProperty(fkMapping.AssociationMapping.Target.CLRProperty.Name,
navProperty));
navPropertyEntities.Add(np);
}
}
}
else
{
var navProperty = GetProperty(navigationPropertyName, entity);
if (navProperty != null)
{
foreach (var foreignKeyRelation in fkMapping.ForeignKeyRelations)
{
SetProperty(foreignKeyRelation.ToProperty, navProperty,
GetProperty(foreignKeyRelation.FromProperty, entity));
}
if (navProperty != entity)
navPropertyEntities.Add(navProperty);
else
navPropertySelfReferences.Add(new SelfReference
{
Entity = entity,
ForeignKeyProperties = fkMapping.ForeignKeyRelations.Select
(p => p.ToProperty).ToArray()
});
}
}
}
if (navPropertySelfReferences.Any())
{
DoBulkUpdateAll(
ctx,
navPropertySelfReferences.Select(e => e.Entity).Distinct().ToArray(),
navPropertySelfReferences.SelectMany
(e => e.ForeignKeyProperties).Distinct().ToArray(),
transaction);
}
if (navPropertyEntities.Any())
{
if (navPropertyEntities.First() is ExpandoObject)
{
// We have to create our own mappings for this one. Nothing
// available in our context. There should be something in there
// we could use but I cannot find it.
var expandoMappings = new Mappings
{
TableName = fkMapping.AssociationMapping.TableName,
ColumnMappings = new Dictionary<string, CLR2ColumnMapping>()
};
expandoMappings.ColumnMappings.Add(
fkMapping.AssociationMapping.Source.ColumnProperty.Name,
new CLR2ColumnMapping
{
CLRProperty =
fkMapping.AssociationMapping.Source.ColumnProperty,
ColumnProperty =
fkMapping.AssociationMapping.Source.ColumnProperty
});
expandoMappings.ColumnMappings.Add(
fkMapping.AssociationMapping.Target.ColumnProperty.Name,
new CLR2ColumnMapping
{
CLRProperty =
fkMapping.AssociationMapping.Target.ColumnProperty,
ColumnProperty =
fkMapping.AssociationMapping.Target.ColumnProperty
});
DoBulkInsertAll(ctx, navPropertyEntities.ToArray(),
typeof(ExpandoObject), expandoMappings, transaction);
}
else
DoBulkInsertAll(ctx, navPropertyEntities.ToArray(), transaction,
true, savedEntities);
}
}
}
}
private static void DoBulkInsertAll(this DbContext ctx,
IList entities, Type t, Mappings mappings, SqlTransaction transaction)
{
// If we for some reason are called with an empty list we return immediately.
if (entities.Count == 0) return;
var tableName = mappings.TableName;
var columnMappings = mappings.ColumnMappings;
var conn = GetSqlConnection(ctx);
var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, transaction)
{ DestinationTableName = tableName.Fullname };
var table = new DataTable();
var properties = GetProperties(entities[0])
.Where(p => columnMappings.ContainsKey(p.Name)).ToArray();
foreach (var property in properties)
{
Type propertyType = property.Type;
// Nullable properties need special treatment.
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
// Ignore all properties that we have no mappings for.
if (columnMappings.ContainsKey(property.Name))
{
// Since we cannot trust the CLR type properties to be in the same order as
// the table columns we use the SqlBulkCopy column mappings.
table.Columns.Add(new DataColumn(property.Name, propertyType));
var clrPropertyName = property.Name;
var tableColumnName = columnMappings[property.Name].ColumnProperty.Name;
bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping
(clrPropertyName, tableColumnName));
}
}
// Check to see if the table has a primary key.
dynamic declaringType = columnMappings
.Values
.First().ColumnProperty.DeclaringType;
var keyMembers = declaringType.KeyMembers;
var pkColumnMappings = columnMappings.Values
.Where(m => keyMembers.Contains(m.ColumnProperty.Name))
.ToArray();
var pkColumns = pkColumnMappings.Select(m => m.ColumnProperty).ToArray();
// We have no primary key/s. Just add it all.
if (pkColumns.Length == 0)
{
foreach (var entity in entities)
{
var e = entity;
table.Rows.Add(properties.Select(p => GetProperty
(p.Name, t, e, DBNull.Value)).ToArray());
}
bulkCopy.BulkCopyTimeout = 5 * 60;
bulkCopy.WriteToServer(table);
}
// We have a non composite primary key that is either computed or an identity key
else if (pkColumns.Length == 1 &&
(pkColumns[0].IsStoreGeneratedIdentity || pkColumns[0].IsStoreGeneratedComputed))
{
var pkColumn = pkColumns[0];
var newEntities = new ArrayList();
foreach (var entity in entities)
{
var pk = GetProperty(pkColumn.Name, entity);
if (pk == 0)
newEntities.Add(entity);
}
if (newEntities.Count > 0)
{
var pkColumnType =
Type.GetType(pkColumn.PrimitiveType.ClrEquivalentType.FullName);
var cmd = conn.CreateCommand();
cmd.CommandTimeout = (int)TimeSpan.FromMinutes(30).TotalSeconds;
cmd.Transaction = transaction;
// Get the number of existing rows in the table.
cmd.CommandText = $@"SELECT COUNT(*) FROM {tableName.Fullname}";
var result = cmd.ExecuteScalar();
var count = Convert.ToInt64(result);
// Get the identity increment value
cmd.CommandText = $"SELECT IDENT_INCR('{tableName.Fullname}')";
result = cmd.ExecuteScalar();
dynamic identIncrement = Convert.ChangeType(result, pkColumnType);
// Get the last identity value generated for our table
cmd.CommandText = $"SELECT IDENT_CURRENT('{tableName.Fullname}')";
result = cmd.ExecuteScalar();
dynamic identcurrent = Convert.ChangeType(result, pkColumnType);
var nextId = identcurrent + (count > 0 ? identIncrement : 0);
// Add all our new entities to our data table
foreach (var entity in newEntities)
{
var e = entity;
table.Rows.Add
(properties.Select(p => GetProperty(p.Name, t, e, DBNull.Value))
.ToArray());
}
bulkCopy.BulkCopyTimeout = 5 * 60;
bulkCopy.WriteToServer(table);
cmd.CommandText = $"SELECT SCOPE_IDENTITY()";
result = cmd.ExecuteScalar();
dynamic lastId = Convert.ChangeType(result, pkColumnType);
cmd.CommandText =
$"SELECT {pkColumn.Name} From {tableName.Fullname}
WHERE {pkColumn.Name} >= {nextId} and {pkColumn.Name} <= {lastId}";
var reader = cmd.ExecuteReader();
var ids = (from IDataRecord r in reader
let pk = r[pkColumn.Name]
select pk)
.OrderBy(i => i)
.ToArray();
if (ids.Length != newEntities.Count)
throw new ArgumentException(
"More id values generated than we had entities.
Something went wrong, try again.");
for (int i = 0; i < newEntities.Count; i++)
{
SetProperty(pkColumn.Name, newEntities[i], ids[i]);
}
}
}
// We have a composite primary key.
else
{
var nonPrimaryKeyColumnMappings = columnMappings
.Values
.Except(pkColumnMappings)
.ToArray();
var tempTableName = FillTempTable(conn, entities, tableName,
columnMappings, pkColumnMappings, nonPrimaryKeyColumnMappings, transaction);
var conditionStatements =
pkColumnMappings.Select(c => $"t0.{c.ColumnProperty.Name} =
t1.{c.ColumnProperty.Name}");
var conditionStatementsSql = string.Join(" AND ", conditionStatements);
string cmdBody;
SqlCommand cmd;
//
// Update existing entities in the target table using the temp
// table we just created.
//
if (nonPrimaryKeyColumnMappings.Any())
{
var setStatements = nonPrimaryKeyColumnMappings.Select(c =>
$"t0.{c.ColumnProperty.Name} = t1.{c.ColumnProperty.Name}");
var setStatementsSql = string.Join(" , ", setStatements);
cmdBody = $@"UPDATE t0 SET {setStatementsSql}
FROM {tableName.Fullname} AS t0
INNER JOIN #{tableName.Name} AS t1 ON {conditionStatementsSql}
";
cmd = new SqlCommand(cmdBody, conn, transaction);
cmd.ExecuteNonQuery();
}
//
// Insert any new entities.
//
string listOfPrimaryKeyColumns = string.Join(",",
pkColumnMappings.Select(c => c.ColumnProperty));
string listOfColumns = string.Join(",",
pkColumnMappings.Concat
(nonPrimaryKeyColumnMappings).Select(c => c.ColumnProperty));
cmdBody = $@"INSERT INTO {tableName.Fullname} ({listOfColumns})
SELECT {listOfColumns}
FROM #{tableName.Name} AS t0
WHERE NOT EXISTS (
SELECT {listOfPrimaryKeyColumns}
FROM {tableName.Fullname} AS t1
WHERE {conditionStatementsSql}
)
";
cmd = new SqlCommand(cmdBody, conn, transaction);
cmd.ExecuteNonQuery();
//
// Clean up. Delete the temp table.
//
var cmdFooter = $@"DROP TABLE {tempTableName}";
cmd = new SqlCommand(cmdFooter, conn, transaction);
cmd.ExecuteNonQuery();
}
}
private static string CreateGlobalId(DbContext ctx)
{
var ds = ctx.Database.Connection.DataSource.Replace(@"\", "_");
var dbname = ctx.Database.Connection.Database.Replace(@"\", "_");
var globalId = $@"Global\{ds}_{dbname}";
return globalId;
}
private static string FillTempTable(
SqlConnection conn,
IList entities,
TableName tableName,
Dictionary<string, CLR2ColumnMapping> columnMappings,
CLR2ColumnMapping[] primaryKeyColumnMappings,
CLR2ColumnMapping[] nonPrimaryKeyColumnMappings,
SqlTransaction sqlTransaction)
{
var tempTableName = $@"#{tableName.Name}";
var columns = primaryKeyColumnMappings.Select(m => m.ColumnProperty.Name)
.Concat(nonPrimaryKeyColumnMappings.Select(m => m.ColumnProperty.Name)).ToArray();
var columnNames = string.Join(",", columns.Select(c => c));
var cmdHeader = $@"
IF OBJECT_ID('tempdb..#{tableName.Name}')
IS NOT NULL DROP TABLE #{tableName.Name}
SELECT {columnNames}
INTO #{tableName.Name}
FROM {tableName.Fullname}
WHERE 1=0
";
if (primaryKeyColumnMappings.Length == 1 &&
(primaryKeyColumnMappings[0].ColumnProperty.IsStoreGeneratedIdentity ||
primaryKeyColumnMappings[0].ColumnProperty.IsStoreGeneratedComputed))
{
cmdHeader += $@"SET IDENTITY_INSERT #{tableName.Name} ON";
}
var cmd = new SqlCommand(cmdHeader, conn, sqlTransaction);
cmd.ExecuteNonQuery();
//
// Setup a bulk copy instance to populate the temp table.
//
var bulkCopy =
new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, sqlTransaction)
{
DestinationTableName = tempTableName,
BulkCopyTimeout = 5 * 60,
};
var allProperties = GetProperties(entities[0]);
//
// Select the primary key clr properties
//
var pkColumnProperties = allProperties
.Where(p => primaryKeyColumnMappings.Any(m => m.CLRProperty.Name == p.Name))
.ToArray();
//
// Select the clr properties for the selected non primary key columns.
//
var selectedColumnProperties = allProperties
.Where(p => nonPrimaryKeyColumnMappings.Any(m => m.CLRProperty.Name == p.Name))
.ToArray();
var properties = pkColumnProperties.Concat(selectedColumnProperties).ToArray();
//
// Configure a data table to use for the bulk copy
// operation into the temp table.
//
var table = new DataTable();
foreach (var property in properties)
{
Type propertyType = property.Type;
// Nullable properties need special treatment.
if (propertyType.IsGenericType &&
propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = Nullable.GetUnderlyingType(propertyType);
}
// Ignore all properties that we have no mappings for.
if (columnMappings.ContainsKey(property.Name))
{
// Since we cannot trust the CLR type properties to be in the same order as
// the table columns we use the SqlBulkCopy column mappings.
table.Columns.Add(new DataColumn(property.Name, propertyType));
var clrPropertyName = property.Name;
var tableColumnName = columnMappings[property.Name].ColumnProperty.Name;
bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping
(clrPropertyName, tableColumnName));
}
}
//
// Fill the data table with our entities.
//
foreach (var entity in entities)
{
var e = entity;
table.Rows.Add(properties.Select
(p => GetProperty(p.Name, e, DBNull.Value)).ToArray());
}
//
// Fill the temp table.
//
bulkCopy.WriteToServer(table);
return tempTableName;
}
private static PropInfo[] GetProperties(object o)
{
if (o is ExpandoObject)
{
var props = new List<PropInfo>();
var dict = (IDictionary<string, object>)o;
foreach (var kvp in dict)
{
props.Add(new PropInfo
{
Name = kvp.Key,
Type = kvp.Value.GetType()
});
}
return props.ToArray();
}
Type t = o.GetType();
return t.GetProperties().Select(p => new PropInfo
{
Type = p.PropertyType,
Name = p.Name,
}).ToArray();
}
private static void AddProperty
(ExpandoObject expando, string propertyName, object propertyValue)
{
// ExpandoObject supports IDictionary so we can extend it like this
var expandoDict = expando as IDictionary<string, object>;
if (expandoDict.ContainsKey(propertyName))
expandoDict[propertyName] = propertyValue;
else
expandoDict.Add(propertyName, propertyValue);
}
private static SqlConnection GetSqlConnection(this DbContext ctx)
{
var conn = (SqlConnection)ctx.Database.Connection;
if (conn.State == ConnectionState.Closed)
conn.Open();
return conn;
}
private static TableName GetTableName(this DbContext ctx, Type t)
{
var dbSet = ctx.Set(t);
var sql = dbSet.ToString();
var regex = new Regex(@"FROM (?<table>.*) AS");
var match = regex.Match(sql);
var name = match.Groups["table"].Value;
var n = name.Replace("[", "").Replace("]", "");
var m = Regex.Match(n, @"(.*)\.(.*)");
if (m.Success)
{
return new TableName { Schema = m.Groups[1].Value, Name = m.Groups[2].Value };
}
m = Regex.Match(n, @"(.*)");
if (m.Success)
{
return new TableName { Schema = "dbo", Name = m.Groups[1].Value };
}
throw new ArgumentException
($"Failed to parse tablename {name}. Bulk operation failed.");
}
private static Mappings GetMappings(DbContext ctx, Type t)
{
var objectContext = ((IObjectContextAdapter)ctx).ObjectContext;
var workspace = objectContext.MetadataWorkspace;
var containerName = objectContext.DefaultContainerName;
var entityName = t.Name;
var storageMapping = (EntityContainerMapping)workspace.GetItem<GlobalItem>
(containerName, DataSpace.CSSpace);
var entitySetMaps = storageMapping.EntitySetMappings.ToList();
var associationSetMaps = storageMapping.AssociationSetMappings.ToList();
//
// Add mappings for all scalar properties. That is, for all properties
// that do not represent other entities (navigation properties).
//
var entitySetMap = entitySetMaps.Single
(m => m.EntitySet.ElementType.Name == entityName);
var typeMappings = entitySetMap.EntityTypeMappings;
EntityTypeMapping typeMapping = typeMappings[0];
var fragments = typeMapping.Fragments;
var fragment = fragments[0];
var properties = fragment.PropertyMappings;
var columnMappings = new Dictionary<string, CLR2ColumnMapping>();
foreach (var property in properties.Where
(p => p is ScalarPropertyMapping).Cast<ScalarPropertyMapping>())
{
var clrProperty = property.Property;
var columnProperty = property.Column;
columnMappings.Add(clrProperty.Name, new CLR2ColumnMapping
{
CLRProperty = clrProperty,
ColumnProperty = columnProperty,
});
}
//
// Add mappings for all navigation properties.
//
//
var foreignKeyMappings = new List<ForeignKeyMapping>();
var navigationProperties =
typeMapping.EntityType.DeclaredMembers.Where
(m => m.BuiltInTypeKind == BuiltInTypeKind.NavigationProperty)
.Cast<NavigationProperty>()
.Where(p => p.RelationshipType is AssociationType)
.ToArray();
foreach (var navigationProperty in navigationProperties)
{
var relType = (AssociationType)navigationProperty.RelationshipType;
// Only bother with unknown relationships
if (foreignKeyMappings.All(m => m.Name != relType.Name))
{
var fkMapping = new ForeignKeyMapping
{
NavigationPropertyName = navigationProperty.Name,
BuiltInTypeKind = navigationProperty.TypeUsage.EdmType.BuiltInTypeKind,
Name = relType.Name,
};
//
// Many-To-Many
//
if (associationSetMaps.Any() &&
associationSetMaps.Any(m => m.AssociationSet.Name == relType.Name))
{
var map = associationSetMaps.Single
(m => m.AssociationSet.Name == relType.Name);
var sourceMapping =
new CLR2ColumnMapping
{
ColumnProperty = map.SourceEndMapping.PropertyMappings[0].Column,
CLRProperty = map.SourceEndMapping.PropertyMappings[0].Property,
};
var targetMapping =
new CLR2ColumnMapping
{
ColumnProperty = map.TargetEndMapping.PropertyMappings[0].Column,
CLRProperty = map.TargetEndMapping.PropertyMappings[0].Property,
};
fkMapping.FromType = (map.SourceEndMapping.AssociationEnd.TypeUsage.EdmType
as RefType)?.ElementType.Name;
fkMapping.ToType = (map.TargetEndMapping.AssociationEnd.TypeUsage.EdmType
as RefType)?.ElementType.Name;
fkMapping.AssociationMapping = new AssociationMapping
{
TableName = new TableName
{
Name = map.StoreEntitySet.Table,
Schema = map.StoreEntitySet.Schema,
},
Source = sourceMapping,
Target = targetMapping
};
}
//
// One-To-One or One-to-Many
//
else
{
fkMapping.FromType =
relType.Constraint.FromProperties.First().DeclaringType.Name;
fkMapping.ToType =
relType.Constraint.ToProperties.First().DeclaringType.Name;
var foreignKeyRelations = new List<ForeignKeyRelation>();
for (int i = 0; i < relType.Constraint.FromProperties.Count; i++)
{
foreignKeyRelations.Add(new ForeignKeyRelation
{
FromProperty = relType.Constraint.FromProperties[i].Name,
ToProperty = relType.Constraint.ToProperties[i].Name,
});
}
fkMapping.ForeignKeyRelations = foreignKeyRelations.ToArray();
}
foreignKeyMappings.Add(fkMapping);
}
}
var tableName = GetTableName(ctx, t);
return new Mappings
{
TableName = tableName,
ColumnMappings = columnMappings,
ToForeignKeyMappings =
foreignKeyMappings.Where(m => m.ToType == entityName).ToArray(),
FromForeignKeyMappings = foreignKeyMappings.Where
(m => m.FromType == entityName).ToArray()
};
}
/// <summary>
/// Use reflection to get the property value by its property
/// name from an object instance.
/// </summary>
/// <param name="property"></param>
/// <param name="instance"></param>
/// <param name="def"></param>
/// <returns></returns>
private static dynamic GetProperty(string property, object instance, object def = null)
{
var type = instance.GetType();
return GetProperty(property, type, instance, def);
}
private static dynamic GetProperty(string property, Type type,
object instance, object def = null)
{
if (instance is ExpandoObject)
{
var dict = (IDictionary<string, object>)instance;
return dict[property];
}
var p = type.InvokeMember(property, BindingFlags.GetProperty |
BindingFlags.Public |
BindingFlags.Instance, Type.DefaultBinder, instance, null);
if (p == null) return def;
return p;
}
/// <summary>
/// Use reflection to set a property value by its property
/// name to an object instance.
/// </summary>
/// <param name="property"></param>
/// <param name="instance"></param>
/// <param name="value"></param>
private static void SetProperty(string property, object instance, object value)
{
var type = instance.GetType();
type.InvokeMember(property, BindingFlags.SetProperty |
BindingFlags.Public | BindingFlags.Instance,
Type.DefaultBinder, instance, new[] { value });
}
#endregion
}
internal class PropInfo
{
public Type Type { get; set; }
public string Name { get; set; }
}
internal class TableName
{
public string Schema { get; set; }
public string Name { get; set; }
public string Fullname => $"[{Schema}].[{Name}]";
}
internal class Mappings
{
public TableName TableName { get; set; }
public Dictionary<string, CLR2ColumnMapping> ColumnMappings { get; set; }
public ForeignKeyMapping[] ToForeignKeyMappings { get; set; }
public ForeignKeyMapping[] FromForeignKeyMappings { get; set; }
}
/// <summary>
/// The EdmProperty class is used both to represent entity properties
/// and columns properties, it contains more information than just the
/// property/column name. This class is used to map an entity property
/// to a table column.
/// </summary>
internal class CLR2ColumnMapping
{
public EdmProperty CLRProperty { get; set; }
public EdmProperty ColumnProperty { get; set; }
}
internal class AssociationMapping
{
public TableName TableName { get; set; }
public CLR2ColumnMapping Source { get; set; }
public CLR2ColumnMapping Target { get; set; }
}
/// <summary>
///
/// We support two kinds of foreign key mappings.
/// (1) One-To-One, One-To-Many
/// (2) Many-To-Many
///
/// The property ForeignKeyRelations holds mapping data used for (1)
/// and the property AssociationMapping holds data used for (2).
///
/// We do not support Many-To-Many relationships with compound keys.
///
/// </summary>
internal class ForeignKeyMapping
{
public BuiltInTypeKind BuiltInTypeKind { get; set; }
public string NavigationPropertyName { get; set; }
public string Name { get; set; }
public string FromType { get; set; }
public string ToType { get; set; }
public ForeignKeyRelation[] ForeignKeyRelations { get; set; }
public AssociationMapping AssociationMapping { get; set; }
}
internal class ForeignKeyRelation
{
/// <summary>
/// This is the primary key property
/// </summary>
public string FromProperty { get; set; }
/// <summary>
/// This is the foreign key property
/// </summary>
public string ToProperty { get; set; }
}
internal class SelfReference
{
public dynamic Entity { get; set; }
public string[] ForeignKeyProperties { get; set; }
}
internal class IdentityEqualityComparer<T> : IEqualityComparer<T> where T : class
{
public int GetHashCode(T value)
{
return RuntimeHelpers.GetHashCode(value);
}
public bool Equals(T left, T right)
{
return left == right; // Reference identity comparison
}
}
}
Nuget
这段代码可以通过 nuget 获得。包名为 Tanneryd.BulkInsert
,包的 URL 为 https://nuget.net.cn/packages/Tanneryd.BulkOperations.EF6。