SqlBulkCopy异常后检索失败的记录






4.98/5 (18投票s)
如何获取导致SqlBulkCopy操作异常(或几个异常)的数据行列表
引言
首先声明,我在本文中使用的方法并非我原创,但由于我只听说过而未能在任何地方找到实际实现示例,因此我编写了代码来处理它。
言归正传——这篇文说的是:任何使用过 .NET 的 SqlBulkCopy
类的人都知道它有多快、多强大。它在将大量数据泵入 SQL Server 数据库方面,比其他机制要快很多,而且它之所以如此之快,原因之一就是它不会记录任何操作。
缺乏日志记录确实加快了速度,但当您正在泵送数十万行数据,突然因为约束而导致其中一行失败时,您就陷入了困境。所有 SqlException
会告诉您的是,给定的约束出了问题(至少您会得到约束的名称),但仅此而已。然后,您将被迫返回源数据,对其运行单独的 SELECT 语句(或进行手动搜索),并自己找到有问题的行。
更糟糕的是,如果您处理的数据包含多个潜在的失败项,这可能是一个漫长而反复的过程,因为 SqlBulkCopy
一旦遇到第一个失败就会停止。纠正了第一个错误后,您需要重新运行加载以查找第二个错误,依此类推。
本文介绍的方法有以下优点:
- 报告
SqlBulkCopy
可能遇到的所有错误 - 报告所有有问题的具体数据行,以及该行将导致的异常
- 整个过程在一个事务中运行,最后事务会被回滚,因此没有更改会被提交。
……以及缺点
- 对于非常大的数据量,可能需要几分钟时间。
- 此解决方案是反应式的;也就是说,错误不会作为
SqlBulkCopy.WriteToServer()
进程引发的异常的一部分返回。相反,这个辅助方法在引发异常后执行,以尝试捕获所有可能的错误及其相关数据。这意味着,在发生异常的情况下,您的进程运行时间会比仅仅运行批量复制更长。 - 您无法重新使用来自失败的
SqlBulkCopy
的同一个DataReader
对象,因为读取器是仅向前推进的、一次性的数据流,无法重置。您需要创建一个类型相同的新读取器(例如,重新发出原始SqlCommand
,或基于同一个 DataTable 重建读取器等)。
背景
主要思路很简单。重新运行批量复制,但一次只处理一行。在处理行的过程中,捕获复制它们时引发的单个异常(如果有),并将消息和行的数据添加到增量消息中,但不要停止将数据复制到服务器。完成后,您最终的错误消息将是一个很好的日志,显示所有问题以及导致它们的数据。从那时起,很容易回到源头,找到那些记录,修复问题,然后重新发出批量复制。
使用代码
重要的是要注意,并非所有批量复制的失败都是由数据引起的。您可能会遇到连接问题、身份验证失败、超时等。这些情况都不会由您的数据解释,因此在这种情况下调用此辅助方法没有意义。您需要在调用辅助方法时考虑到这一点,并且只针对特定类型的异常调用它(下面的示例代码会处理这一点)。
另外,请注意您捕获的异常不一定是由 SqlServer 引发的,它可能包含在内部异常中。因此,如果您计划仅在发生与数据相关的错误时调用辅助方法,则需要检查该异常(及其所有内部异常)是否包含此信息。下面的示例代码会处理这一点,尽管异常是直接来自服务器的;在您的实际情况中,您可能在异常被其他异常包装后,在更高级别处理它。
测试批量复制方法
下面的 TestMethod()
是一个简单的设置批量复制操作并将其包含在 try/catch 块中的方法。这个批量复制被假定因数据问题而失败,因此在 catch 块中,我们会检查异常(及其所有内部异常)的消息是否包含单词“constraint”(这似乎是查找约束失败的唯一方法,因为所有来自 SqlServer 的异常都是 SqlException
类型)。如果找到这样的异常消息,我们就会调用 GetBulkCopyFailedData() 来获取失败的行。后者方法最好放在一个单独的辅助类中。
诚然,这种检查本可以在辅助方法内完成,但我试图让它足够通用,以便显示所有异常,而不假设调用者想要过滤掉什么。
private void TestMethod() { // new code SqlConnection connection = null; SqlBulkCopy bulkCopy = null; DataTable dataTable = new DataTable(); // load some sample data into the DataTable IDataReader reader = dataTable.CreateDataReader(); try { connection = new SqlConnection("connection string goes here ..."); connection.Open(); bulkCopy = new SqlBulkCopy(connection); bulkCopy.DestinationTableName = "Destination table name"; bulkCopy.WriteToServer(reader); } catch (Exception exception) { // loop through all inner exceptions to see if any relate to a constraint failure bool dataExceptionFound = false; Exception tmpException = exception; while (tmpException != null) { if (tmpException is SqlException && tmpException.Message.Contains("constraint")) { dataExceptionFound = true; break; } tmpException = tmpException.InnerException; } if (dataExceptionFound) { // call the helper method to document the errors and invalid data string errorMessage = GetBulkCopyFailedData( connection.ConnectionString, bulkCopy.DestinationTableName, dataTable.CreateDataReader()); throw new Exception(errorMessage, exception); } } finally { if (connection != null && connection.State == ConnectionState.Open) { connection.Close(); } } }
记录错误和有问题的具体数据行
GetBulkCopyFailedData()
然后打开一个新的数据库连接,创建一个事务,并开始逐行进行批量复制。它通过读取提供的 DataReader,并将每一行复制到一个空的 DataTable 中来实现。然后,该 DataTable 被批量复制到目标数据库,并捕获由此产生的任何异常,记录下来(以及导致它的 DataRow),然后使用下一行重复该过程。
在 DataReader 结束时,我们回滚事务并返回完整的错误消息。现在修复数据源中的问题应该很容易了。
/// <summary> /// Build an error message with the failed records and their related exceptions. /// </summary> /// <param name="connectionString">Connection string to the destination database</param> /// <param name="tableName">Table name into which the data will be bulk copied.</param> /// <param name="dataReader">DataReader to bulk copy</param> /// <returns>Error message with failed constraints and invalid data rows.</returns> public static string GetBulkCopyFailedData( string connectionString, string tableName, IDataReader dataReader) { StringBuilder errorMessage = new StringBuilder("Bulk copy failures:" + Environment.NewLine); SqlConnection connection = null; SqlTransaction transaction = null; SqlBulkCopy bulkCopy = null; DataTable tmpDataTable = new DataTable(); try { connection = new SqlConnection(connectionString); connection.Open(); transaction = connection.BeginTransaction(); bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints, transaction); bulkCopy.DestinationTableName = tableName; // create a datatable with the layout of the data. DataTable dataSchema = dataReader.GetSchemaTable(); foreach (DataRow row in dataSchema.Rows) { tmpDataTable.Columns.Add(new DataColumn( row["ColumnName"].ToString(), (Type)row["DataType"])); } // create an object array to hold the data being transferred into tmpDataTable //in the loop below. object[] values = new object[dataReader.FieldCount]; // loop through the source data while (dataReader.Read()) { // clear the temp DataTable from which the single-record bulk copy will be done tmpDataTable.Rows.Clear(); // get the data for the current source row dataReader.GetValues(values); // load the values into the temp DataTable tmpDataTable.LoadDataRow(values, true); // perform the bulk copy of the one row try { bulkCopy.WriteToServer(tmpDataTable); } catch (Exception ex) { // an exception was raised with the bulk copy of the current row. // The row that caused the current exception is the only one in the temp // DataTable, so document it and add it to the error message. DataRow faultyDataRow = tmpDataTable.Rows[0]; errorMessage.AppendFormat("Error: {0}{1}", ex.Message, Environment.NewLine); errorMessage.AppendFormat("Row data: {0}", Environment.NewLine); foreach (DataColumn column in tmpDataTable.Columns) { errorMessage.AppendFormat( "\tColumn {0} - [{1}]{2}", column.ColumnName, faultyDataRow[column.ColumnName].ToString(), Environment.NewLine); } } } } catch (Exception ex) { throw new Exception( "Unable to document SqlBulkCopy errors. See inner exceptions for details.", ex); } finally { if (transaction != null) { transaction.Rollback(); } if (connection.State != ConnectionState.Closed) { connection.Close(); } } return errorMessage.ToString(); }
结论
我肯定浪费了不少时间试图弄清楚我的数据出了什么问题,因为批量复制操作在那方面帮不了我,所以我希望这能帮助其他人避免浪费时间。
一如既往——评论、问题和建议随时欢迎。请不要忘记投票!