65.9K
CodeProject 正在变化。 阅读更多。
Home

SqlBulkCopy异常后检索失败的记录

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (18投票s)

2012年5月18日

CPOL

5分钟阅读

viewsIcon

122854

如何获取导致SqlBulkCopy操作异常(或几个异常)的数据行列表

引言

首先声明,我在本文中使用的方法并非我原创,但由于我只听说过而未能在任何地方找到实际实现示例,因此我编写了代码来处理它。

言归正传——这篇文说的是:任何使用过 .NET 的 SqlBulkCopy 类的人都知道它有多快、多强大。它在将大量数据泵入 SQL Server 数据库方面,比其他机制要快很多,而且它之所以如此之快,原因之一就是它不会记录任何操作。

缺乏日志记录确实加快了速度,但当您正在泵送数十万行数据,突然因为约束而导致其中一行失败时,您就陷入了困境。所有 SqlException 会告诉您的是,给定的约束出了问题(至少您会得到约束的名称),但仅此而已。然后,您将被迫返回源数据,对其运行单独的 SELECT 语句(或进行手动搜索),并自己找到有问题的行。

更糟糕的是,如果您处理的数据包含多个潜在的失败项,这可能是一个漫长而反复的过程,因为 SqlBulkCopy 一旦遇到第一个失败就会停止。纠正了第一个错误后,您需要重新运行加载以查找第二个错误,依此类推。

本文介绍的方法有以下优点: 

  • 报告 SqlBulkCopy 可能遇到的所有错误
  • 报告所有有问题的具体数据行,以及该行将导致的异常
  • 整个过程在一个事务中运行,最后事务会被回滚,因此没有更改会被提交。

……以及缺点

  • 对于非常大的数据量,可能需要几分钟时间。
  • 此解决方案是反应式的;也就是说,错误不会作为 SqlBulkCopy.WriteToServer() 进程引发的异常的一部分返回。相反,这个辅助方法在引发异常后执行,以尝试捕获所有可能的错误及其相关数据。这意味着,在发生异常的情况下,您的进程运行时间会比仅仅运行批量复制更长。
  • 您无法重新使用来自失败的 SqlBulkCopy 的同一个 DataReader 对象,因为读取器是仅向前推进的、一次性的数据流,无法重置。您需要创建一个类型相同的新读取器(例如,重新发出原始 SqlCommand,或基于同一个 DataTable 重建读取器等)。

背景

主要思路很简单。重新运行批量复制,但一次只处理一行。在处理行的过程中,捕获复制它们时引发的单个异常(如果有),并将消息和行的数据添加到增量消息中,但不要停止将数据复制到服务器。完成后,您最终的错误消息将是一个很好的日志,显示所有问题以及导致它们的数据。从那时起,很容易回到源头,找到那些记录,修复问题,然后重新发出批量复制。

使用代码

重要的是要注意,并非所有批量复制的失败都是由数据引起的。您可能会遇到连接问题、身份验证失败、超时等。这些情况都不会由您的数据解释,因此在这种情况下调用此辅助方法没有意义。您需要在调用辅助方法时考虑到这一点,并且只针对特定类型的异常调用它(下面的示例代码会处理这一点)。

另外,请注意您捕获的异常不一定是由 SqlServer 引发的,它可能包含在内部异常中。因此,如果您计划仅在发生与数据相关的错误时调用辅助方法,则需要检查该异常(及其所有内部异常)是否包含此信息。下面的示例代码会处理这一点,尽管异常是直接来自服务器的;在您的实际情况中,您可能在异常被其他异常包装后,在更高级别处理它。

测试批量复制方法

下面的 TestMethod() 是一个简单的设置批量复制操作并将其包含在 try/catch 块中的方法。这个批量复制被假定因数据问题而失败,因此在 catch 块中,我们会检查异常(及其所有内部异常)的消息是否包含单词“constraint”(这似乎是查找约束失败的唯一方法,因为所有来自 SqlServer 的异常都是 SqlException 类型)。如果找到这样的异常消息,我们就会调用 GetBulkCopyFailedData() 来获取失败的行。后者方法最好放在一个单独的辅助类中。

诚然,这种检查本可以在辅助方法内完成,但我试图让它足够通用,以便显示所有异常,而不假设调用者想要过滤掉什么。

private void TestMethod()
{
   // new code
   SqlConnection connection = null;
   SqlBulkCopy bulkCopy = null;
   
   DataTable dataTable = new DataTable();
   // load some sample data into the DataTable
   IDataReader reader = dataTable.CreateDataReader();
 
   try 
   {
      connection = new SqlConnection("connection string goes here ...");
      connection.Open();
      bulkCopy = new SqlBulkCopy(connection); 
      bulkCopy.DestinationTableName = "Destination table name";
      bulkCopy.WriteToServer(reader);
   }
   catch (Exception exception)
   {
      // loop through all inner exceptions to see if any relate to a constraint failure
      bool dataExceptionFound = false;
      Exception tmpException = exception;
      while (tmpException != null)
      {
         if (tmpException is SqlException
            && tmpException.Message.Contains("constraint"))
         {
            dataExceptionFound = true;
            break;
         }
         tmpException = tmpException.InnerException;
      }

      if (dataExceptionFound)
      {
         // call the helper method to document the errors and invalid data
         string errorMessage = GetBulkCopyFailedData(
            connection.ConnectionString,
            bulkCopy.DestinationTableName,
            dataTable.CreateDataReader());
         throw new Exception(errorMessage, exception);
      }
   }
   finally
   {
      if (connection != null && connection.State == ConnectionState.Open)
      {
         connection.Close();
      }
   }
}

记录错误和有问题的具体数据行

GetBulkCopyFailedData() 然后打开一个新的数据库连接,创建一个事务,并开始逐行进行批量复制。它通过读取提供的 DataReader,并将每一行复制到一个空的 DataTable 中来实现。然后,该 DataTable 被批量复制到目标数据库,并捕获由此产生的任何异常,记录下来(以及导致它的 DataRow),然后使用下一行重复该过程。

在 DataReader 结束时,我们回滚事务并返回完整的错误消息。现在修复数据源中的问题应该很容易了。

/// <summary>
/// Build an error message with the failed records and their related exceptions.
/// </summary>
/// <param name="connectionString">Connection string to the destination database</param>
/// <param name="tableName">Table name into which the data will be bulk copied.</param>
/// <param name="dataReader">DataReader to bulk copy</param>
/// <returns>Error message with failed constraints and invalid data rows.</returns>
public static string GetBulkCopyFailedData(
   string connectionString,
   string tableName,
   IDataReader dataReader)
{
   StringBuilder errorMessage = new StringBuilder("Bulk copy failures:" + Environment.NewLine);
   SqlConnection connection = null;
   SqlTransaction transaction = null;
   SqlBulkCopy bulkCopy = null;
   DataTable tmpDataTable = new DataTable();
   
   try
   { 
      connection = new SqlConnection(connectionString); 
      connection.Open();
      transaction = connection.BeginTransaction();
      bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints, transaction);
      bulkCopy.DestinationTableName = tableName;
      
      // create a datatable with the layout of the data.
      DataTable dataSchema = dataReader.GetSchemaTable();
      foreach (DataRow row in dataSchema.Rows)
      {
         tmpDataTable.Columns.Add(new DataColumn(
            row["ColumnName"].ToString(), 
            (Type)row["DataType"]));
      }
      
      // create an object array to hold the data being transferred into tmpDataTable 
      //in the loop below.
      object[] values = new object[dataReader.FieldCount];

      // loop through the source data
      while (dataReader.Read())
      {
         // clear the temp DataTable from which the single-record bulk copy will be done
         tmpDataTable.Rows.Clear();

         // get the data for the current source row
         dataReader.GetValues(values);

         // load the values into the temp DataTable
         tmpDataTable.LoadDataRow(values, true);

         // perform the bulk copy of the one row
         try
         {
            bulkCopy.WriteToServer(tmpDataTable);
         }
         catch (Exception ex)
         {
            // an exception was raised with the bulk copy of the current row. 
            // The row that caused the current exception is the only one in the temp 
            // DataTable, so document it and add it to the error message.
            DataRow faultyDataRow = tmpDataTable.Rows[0];
            errorMessage.AppendFormat("Error: {0}{1}", ex.Message, Environment.NewLine);
            errorMessage.AppendFormat("Row data: {0}", Environment.NewLine);
            foreach (DataColumn column in tmpDataTable.Columns)
            {
               errorMessage.AppendFormat(
                  "\tColumn {0} - [{1}]{2}", 
                  column.ColumnName, 
                  faultyDataRow[column.ColumnName].ToString(), 
                  Environment.NewLine);
            }
         }
      }
   }
   catch (Exception ex) 
   { 
      throw new Exception(
         "Unable to document SqlBulkCopy errors. See inner exceptions for details.", 
         ex); 
   }
   finally
   {
      if (transaction != null)
      {
         transaction.Rollback();
      }
      if (connection.State != ConnectionState.Closed)
      {
         connection.Close();
      }
   }
   return errorMessage.ToString();
}

结论

我肯定浪费了不少时间试图弄清楚我的数据出了什么问题,因为批量复制操作在那方面帮不了我,所以我希望这能帮助其他人避免浪费时间。

一如既往——评论、问题和建议随时欢迎。请不要忘记投票! 

© . All rights reserved.