65.9K
CodeProject 正在变化。 阅读更多。
Home

批量数据管理

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (6投票s)

2012 年 11 月 19 日

CPOL

6分钟阅读

viewsIcon

33276

downloadIcon

599

这是处理或管理批量数据插入 SQL Server 的最佳方法,摘自我的技术博客 http://www.srinetinfo.com/2012/11/bulk-data-management.html

引言

Microsoft .NET 提供了非常好的功能来实现。它提供了各种功能来满足我们的功能需求。唯一的问题是我们不知道应该使用哪个命名空间和哪个类。Microsoft 提供的其中一个非常有用的类是 SqlBulkCopy,它位于 System.Data.SqlClient 命名空间下。

这个类在将数据从应用程序传输到 SQL Server 时非常有用。这是每个 .NET 开发者都应该实践的最佳实践之一。

背景

我四年前在 ValueLabs 面试时,面试官问了我一个简单的问题。

问。你如何将数据插入表?
答。我回答说使用存储过程。
    
问。如果你有大约十条记录要一次性插入,你会怎么做?
答。我会准备一个 XML 并将其发送给存储过程。在 SP 内部,我会编写代码来处理 XML。

问。如果你有一个 Excel 文件要插入大约 1000 条记录。准备 XML 并发送会花费大量时间,有时你 VARCHAR 参数类型甚至可能不支持,在这种情况下你会怎么做?
答。XML 是我到目前为止使用的唯一一种插入一组记录的方法。那样它就能工作,并且从未给我任何错误。

问。如果我们想更新一组记录,你的 XML 场景有效吗?
答。我没有答案,因为我从未遇到过这种情况。所以我保持沉默。然后他回答并解释说,你听说过 SqlBulkCopy 吗?它可以在几分之一秒内插入任意数量的记录?然后我惊叹不已,也很高兴知道这一点。我虽然没有通过那次面试,但我了解了一个新概念。于是我在公司里实现了同样的功能,并得到了高层领导的赞赏。现在我想与您分享我的经验和 SqlBulkCopy 类的用法。

SqlBulkCopy - 不同场景

场景一:联系人文件上传

在你的应用程序中,你可能会遇到一个需求,比如将 EXCEL、CSV 或 XML 文件中的联系人数量上传到数据库表。这是最常见和重复性的工作,每个开发者一生都会做这类应用程序。

场景二:生产者-消费者问题

这也是一个应用程序问题,需求是所有生产者(用户或服务)将消息或请求发送到服务器队列。服务器队列会定期通过处理或解析每条消息并将其插入数据库表来清理。

场景三:从 UI 插入多条记录

假设你的需求是在一个页面中添加多个客户记录。提供了多个行来在单个页面中添加多个客户。在这种情况下你会怎么做?你是逐条插入记录,这需要大量的数据库调用并会降低应用程序性能吗?还有许多类似的场景可以讨论。对于所有这些将一条以上记录插入数据库表的操作,我们需要一种机制来非常高效地处理并获得最佳性能。 .NET Framework 有一个非常好的类来处理这个问题。现在我们将在本文中讨论它。你可以 在这里 参考 SqlBulkCopy 类。

使用代码 

让我们从一个例子开始。只需以 3 种不同的方式将多行插入数据库表并检查结果

软件要求

Visual Studio 2010 / 2012

SQL Server 2008 R2 / 2012

使用以下脚本在数据库上创建 Customers 表

USE [BlogSamples] 
GO
/****** Object:  Table [dbo].[Customers]    Script Date: 11/17/2012 1:07:38 PM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[Customers](
 [ID] [bigint] IDENTITY(1,1) NOT NULL,
 [FirstName] [varchar](50) NOT NULL,
 [LastName] [varchar](50) NULL,
 [Address1] [varchar](50) NOT NULL,
 [Address2] [varchar](50) NULL,
 [City] [varchar](50) NOT NULL,
 [State] [varchar](50) NOT NULL,
 [Country] [varchar](50) NOT NULL,
 [ZipCode] [varchar](50) NULL,
 CONSTRAINT [PK_Customers] PRIMARY KEY CLUSTERED 
(
 [ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, 
   IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, 
   ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO

在 Visual Studio 中创建一个控制台应用程序,如下图所示

 

Bulk Data Management Application

 

现在我们将从三种不同的方式插入行到表中,并检查性能

多行多次数据库调用

创建一个存储过程以将单行插入数据库

       -- ================================================ 
-- Template generated from Template Explorer using:
-- Create Procedure (New Menu).SQL
--
-- Use the Specify Values for Template Parameters 
-- command (Ctrl-Shift-M) to fill in the parameter 
-- values below.
--
-- This block of comments will not be included in
-- the definition of the procedure.
-- ================================================
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:  <Author,,Name>
-- Create date: <Create Date,,>
-- Description: <Description,,>
-- =============================================
CREATE PROCEDURE Customers_Insert 
 -- Add the parameters for the stored procedure here
 @firstname varchar(50),
 @lastname varchar(50),
 @address1 varchar(50),
 @address2 varchar(50),
 @city varchar(50),
 @state varchar(50),
 @country varchar(50),
 @zipcode varchar(50)
AS
BEGIN
 -- SET NOCOUNT ON added to prevent extra result sets from
 -- interfering with SELECT statements.
 SET NOCOUNT ON;
    INSERT INTO [dbo].[Customers]
           ([FirstName]
           ,[LastName]
           ,[Address1]
           ,[Address2]
           ,[City]
           ,[State]
           ,[Country]
           ,[ZipCode])
     VALUES
           (@firstname,@lastname,@address1,@address2,@city,@state,@country,@zipcode)
END
GO

创建一个新类来处理所有数据库调用,并将其命名为 BulkDataManagement.cs。添加以下代码以逐行插入。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Data.SqlClient;
namespace BulkDataManagement
{
    public class BulkDataManagement
    {
        public BulkDataManagement()
        {
            //strConnectionString=@"Data Source=WLW7-SRINIVASUP\PEMMA;
            //Initial Catalog=Pagination;User ID=sa;Password=srijailu123*";
        }
        public static void AddCustomer(string strConnectionString, string firstname, 
          string lastname, string address1, string address2, 
          string city, string state, string country, string zipcode)
        {
            using (SqlConnection con = new SqlConnection(strConnectionString))
            {
                con.Open();
                SqlCommand cmd = new SqlCommand("Customers_Insert", con);
                cmd.CommandType = System.Data.CommandType.StoredProcedure;
                cmd.Parameters.Add(new SqlParameter("@firstname", firstname));
                cmd.Parameters.Add(new SqlParameter("@lastname", lastname));
                cmd.Parameters.Add(new SqlParameter("@address1", address1));
                cmd.Parameters.Add(new SqlParameter("@address2", address2));
                cmd.Parameters.Add(new SqlParameter("@city", city));
                cmd.Parameters.Add(new SqlParameter("@state", state));
                cmd.Parameters.Add(new SqlParameter("@country", country));
                cmd.Parameters.Add(new SqlParameter("@zipcode", zipcode));
                cmd.ExecuteNonQuery();
            }
        }
    }
}

现在准备你的 program.cs 文件,如以下所示将值输入应用程序

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
namespace BulkDataManagement
{
    class Program
    {
        static string connectionString = @"Data Source=WLW7-SRINIVASUP\" + 
          @"PEMMA;Initial Catalog=<database name>;User ID=<userid>;Password=<password>";
        static void Main(string[] args)
        {
            Console.WriteLine("Enter Number of Records to be Inserted: ");
            int noofrecords = int.Parse(Console.ReadLine());
            Stopwatch sw = new Stopwatch();
            sw.Start();
            for (int i = 0; i < noofrecords; i++)
                BulkDataManagement.AddCustomer(connectionString, "First Name - " + i,
                    "Last Name - " + i, "Address1 - " + i, 
                    "Address2 - " + i, "City - I" + i, 
                    "State - " + i, "Country - " + i, "Zip Code - " + i);
            sw.Stop();
            Console.WriteLine("Total elapsed time in milli seconds : " + 
              sw.ElapsedMilliseconds.ToString());            Console.ReadLine();
        }
    }
}

现在运行应用程序并输入你的数字,假设 10000,并测量应用程序性能,它需要多长时间将在应用程序中记录。它给了我大约 13 秒的时间,如下图所示。

Bulk Data Management with 10000 records

图。10000 条记录

现在运行 100,000 条记录,并按照上一步测量性能。结果显示插入这么多记录需要 129.23 秒。

Bulk Data Management With 100000 records

图。100000 条记录

通过单个 XML 数据插入多条记录

现在我们将尝试另一种方法,通过准备一个 XML 并将其发送到数据库存储过程来插入数据。

XML 可以是以下格式。

Bulk Data Management Customer XML Format

图。XML 格式

现在修改你的代码并根据以下内容添加新的存储过程

存储过程 

-- ================================================
-- Template generated from Template Explorer using:
-- Create Procedure (New Menu).SQL
-- 
-- Use the Specify Values for Template Parameters 
-- command (Ctrl-Shift-M) to fill in the parameter 
-- values below.
--
-- This block of comments will not be included in
-- the definition of the procedure.
-- ================================================
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:  <Author,,Name>
-- Create date: <Create Date,,>
-- Description: <Description,,>
-- =============================================
CREATE PROCEDURE Customer_Insert_XML 
 -- Add the parameters for the stored procedure here
 @customerxml varchar(max)
AS
BEGIN
 -- SET NOCOUNT ON added to prevent extra result sets from
 -- interfering with SELECT statements.
 SET NOCOUNT ON;
 DECLARE @idoc int
    EXEC sp_xml_preparedocument @idoc OUTPUT, @customerxml
 Insert into Customers(FirstName,LastName,Address1,Address2,City,State,Country,ZipCode) 
 select * from OPENXML (@idoc,'/Root/Customer') WITH
 (
  firstname varchar(50),
  lastname varchar(50),
  address1 varchar(50),
  address2 varchar(50),
  city varchar(50),
  state varchar(50),
  country varchar(50),
  zipcode varchar(50)
 )
END
GO

BulkDataManagement.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Data.SqlClient;
namespace BulkDataManagement
{
    public class BulkDataManagement
    {
        public BulkDataManagement()
        {
            //strConnectionString=@"Data Source=WLW7-SRINIVASUP\PEMMA;Initial Catalog=Pagination;User ID=sa;Password=srijailu123*";
        }
        public static void AddCustomer(string strConnectionString, 
               string firstname, string lastname, string address1, 
               string address2, string city, string state, string country, string zipcode)
        {
            using (SqlConnection con = new SqlConnection(strConnectionString))
            {
                con.Open();
                SqlCommand cmd = new SqlCommand("Customers_Insert", con);
                cmd.CommandType = System.Data.CommandType.StoredProcedure;
                cmd.Parameters.Add(new SqlParameter("@firstname", firstname));
                cmd.Parameters.Add(new SqlParameter("@lastname", lastname));
                cmd.Parameters.Add(new SqlParameter("@address1", address1));
                cmd.Parameters.Add(new SqlParameter("@address2", address2));
                cmd.Parameters.Add(new SqlParameter("@city", city));
                cmd.Parameters.Add(new SqlParameter("@state", state));
                cmd.Parameters.Add(new SqlParameter("@country", country));
                cmd.Parameters.Add(new SqlParameter("@zipcode", zipcode));
                cmd.ExecuteNonQuery();
            }
        }
        public static void AddCustomerWithXML(string strConnectionString, string customerxml)
        {
            using (SqlConnection con = new SqlConnection(strConnectionString))
            {
                con.Open();
                SqlCommand cmd = new SqlCommand("Customer_Insert_XML", con);
                cmd.CommandType = System.Data.CommandType.StoredProcedure;
                SqlParameter param = new SqlParameter("@customerxml", System.Data.SqlDbType.VarChar);
                param.Value = customerxml;
                cmd.Parameters.Add(param);
                cmd.ExecuteNonQuery();
            }
        }
    }
}

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace BulkDataManagement
{
    class Program
    {
        static string connectionString = 
          @"Data Source=WLW7-SRINIVASUP\PEMMA;Initial Catalog" + 
          @"=BlogSamples;User ID=sa;Password=srijailu123*";
        static void Main(string[] args)
        {
            Console.WriteLine("Enter Number of Records to be Inserted: ");
            int noofrecords = int.Parse(Console.ReadLine());
            Console.WriteLine("Enter Which Method to Execute /nNormal Insert->1/nXML Insert->2/nSqlBulkCopy->3");
            int choice = int.Parse(Console.ReadLine());
            Stopwatch sw = new Stopwatch();
            sw.Start();
            if (choice == 1)
                Customer_Insert(noofrecords);
            else if (choice == 2)
                Customer_XML_Insert(noofrecords);
           sw.Stop();
            Console.WriteLine("Total elapsed time in milli seconds : " + sw.ElapsedMilliseconds.ToString());
            Console.ReadLine();
        }
        static void Customer_Insert(int num)
        {
            for (int i = 0; i < num; i++)
                BulkDataManagement.AddCustomer(connectionString, "First Name - " + i,
                    "Last Name - " + i, "Address1 - " + i, 
                    "Address2 - " + i, "City - I" + i, 
                    "State - " + i, "Country - " + i, "Zip Code - " + i);
        }
        static void Customer_XML_Insert(int num)
        {
            StringBuilder sb = new StringBuilder("<Root>");
            for (int i = 0; i < num; i++)
            {
                sb.Append("<Customer firstname='First Name - XML -" + i + "' ");
                sb.Append("lastname='Last Name -  XML -" + i + "' ");
                sb.Append("address1='Address1 -  XML -" + i + "' ");
                sb.Append("address2='Address2 -  XML -" + i + "' ");
                sb.Append("city='City -  XML -" + i + "' ");
                sb.Append("state='State -  XML -" + i + "' ");
                sb.Append("country='Country -  XML -" + i + "' ");
                sb.Append("zipcode='Zipcode -  XML -" + i + "' />");
            }
            sb.Append("</Root>");
            BulkDataManagement.AddCustomerWithXML(connectionString, sb.ToString());
        }
        static void Customer_Insert_With_SqlBulkCopy(int num)
        {
        }
    }
}

我在这里添加了选择部分,这样在运行时你可以选择合适的并记下时间

1-> 正常插入

2-> XML 插入

3-> 使用 SqlBulkCopy

现在执行应用程序并记下 XML 插入的结果,就像我们在上一节中所做的那样。你可以检查插入行到数据库表的速度。

Bulk Data Management with XML for 10000 Records

图。XML 10000 条记录

现在为 100,000 条记录执行相同的操作。查看应用程序的性能有多快,应用程序有多么简洁。100,000 条记录也只需要 15 秒,性能惊人。

Bulk Data Management With XML 100000 Records

图。XML 100000 条记录

XML 的性能真是惊人。

使用 SqlBulkCopy 类进行批量数据处理

现在将你的 BulkDataManagement.cs 代码更新如下。这里我们不需要任何存储过程。我们可以直接执行命令。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Data.SqlClient;
using System.Data;
namespace BulkDataManagement
{
    public class BulkDataManagement
    {
        public BulkDataManagement()
        {
            //strConnectionString=@"Data Source=WLW7-SRINIVASUP\PEMMA;Initial Catalog=Pagination;User ID=sa;Password=srijailu123*";
        }
        public static void AddCustomer(string strConnectionString, 
          string firstname, string lastname, string address1, string address2, 
          string city, string state, string country, string zipcode)
        {
            using (SqlConnection con = new SqlConnection(strConnectionString))
            {
                con.Open();
                SqlCommand cmd = new SqlCommand("Customers_Insert", con);
                cmd.CommandType = System.Data.CommandType.StoredProcedure;
                cmd.Parameters.Add(new SqlParameter("@firstname", firstname));
                cmd.Parameters.Add(new SqlParameter("@lastname", lastname));
                cmd.Parameters.Add(new SqlParameter("@address1", address1));
                cmd.Parameters.Add(new SqlParameter("@address2", address2));
                cmd.Parameters.Add(new SqlParameter("@city", city));
                cmd.Parameters.Add(new SqlParameter("@state", state));
                cmd.Parameters.Add(new SqlParameter("@country", country));
                cmd.Parameters.Add(new SqlParameter("@zipcode", zipcode));
                cmd.ExecuteNonQuery();
            }
        }
        public static void AddCustomerWithXML(string strConnectionString, string customerxml)
        {
            using (SqlConnection con = new SqlConnection(strConnectionString))
            {
                con.Open();
                SqlCommand cmd = new SqlCommand("Customer_Insert_XML", con);
                cmd.CommandType = System.Data.CommandType.StoredProcedure;
                SqlParameter param = new SqlParameter("@customerxml", System.Data.SqlDbType.VarChar);
                param.Value = customerxml;
                cmd.Parameters.Add(param);
                cmd.ExecuteNonQuery();
            }
        }
        public static void CustomersBulkCopy(string connectionstring, DataTable dtCustomers)
        {
            using (SqlBulkCopy scopy = new SqlBulkCopy(connectionstring, SqlBulkCopyOptions.TableLock))
            {
                scopy.DestinationTableName = "dbo.Customers";
                scopy.WriteToServer(dtCustomers);
            }
        }
    }
}

并像下面一样更改你的 program.cs 文件代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Data;
namespace BulkDataManagement
{
    class Program
    {
        static string connectionString = 
           @"Data Source=WLW7-SRINIVASUP\PEMMA;Initial Catalog=BlogSamples;User ID=sa;Password=srijailu123*";
        static void Main(string[] args)
        {
            Console.WriteLine("Enter Number of Records to be Inserted: ");
            int noofrecords = int.Parse(Console.ReadLine());
            Console.WriteLine("Enter Which Method to Execute /nNormal Insert->1/nXML Insert->2/nSqlBulkCopy->3");
            int choice = int.Parse(Console.ReadLine());
            Stopwatch sw = new Stopwatch();
            sw.Start();
            if (choice == 1)
                Customer_Insert(noofrecords);
            else if (choice == 2)
                Customer_XML_Insert(noofrecords);
            else if (choice == 3)
                Customer_Insert_With_SqlBulkCopy(noofrecords);
            sw.Stop();
            Console.WriteLine("Total elapsed time in milli seconds : " + sw.ElapsedMilliseconds.ToString());
            Console.ReadLine();
        }
        static void Customer_Insert(int num)
        {
            for (int i = 0; i < num; i++)
                BulkDataManagement.AddCustomer(connectionString, "First Name - " + i,
                    "Last Name - " + i, "Address1 - " + i, "Address2 - " + i, 
                    "City - I" + i, "State - " + i, "Country - " + i, "Zip Code - " + i);
        }
        static void Customer_XML_Insert(int num)
        {
            StringBuilder sb = new StringBuilder("<Root>");
            for (int i = 0; i < num; i++)
            {
                sb.Append("<Customer firstname='First Name - XML -" + i + "' ");
                sb.Append("lastname='Last Name -  XML -" + i + "' ");
                sb.Append("address1='Address1 -  XML -" + i + "' ");
                sb.Append("address2='Address2 -  XML -" + i + "' ");
                sb.Append("city='City -  XML -" + i + "' ");
                sb.Append("state='State -  XML -" + i + "' ");
                sb.Append("country='Country -  XML -" + i + "' ");
                sb.Append("zipcode='Zipcode -  XML -" + i + "' />");
            }
            sb.Append("</Root>");
            BulkDataManagement.AddCustomerWithXML(connectionString, sb.ToString());
        }
        static void Customer_Insert_With_SqlBulkCopy(int num)
        {
            DataTable dtCustomers = new DataTable("Customers");
            DataColumn dc = new DataColumn("FirstName");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("LastName");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("Address1");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("Address2");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("City");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("State");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("Country");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            dc = new DataColumn("ZipCode");
            dc.DataType = System.Type.GetType("System.String");
            dtCustomers.Columns.Add(dc);
            for (int i = 0; i < num; i++)
            {
                DataRow dr = dtCustomers.NewRow();
                dr["FirstName"] = "First Name sc - " + i;
                dr["LastName"] = "Last Name sc - " + i;
                dr["Address1"] = "Address1 sc - " + i;
                dr["Address2"] = "Address2 sc - " + i;
                dr["City"] = "City sc - " + i;
                dr["State"] = "State sc - " + i;
                dr["Country"] = "Country sc - " + i;
                dr["ZipCode"] = "ZipCode sc - " + i;
                dtCustomers.Rows.Add(dr);
            }
            BulkDataManagement.CustomersBulkCopy(connectionString, dtCustomers);
        }
    }
}

现在,就像在最后 2 个部分中所做的那样,执行 sqlbulkcopy 模式下的应用程序并检查应用程序性能。

哇,真是太棒了,10000 条记录的插入时间似乎非常不错,而且我们实现了这三种方法中的最佳。

Bulk Data Management with SqlBulkCopy 10000 records

图。SqlBulkCopy 10000 条记录

现在检查 100,000 条记录。应用程序性能非常惊人,我们现在构建了最高效的解决方案。看看性能,它可以在 2 秒内插入 100,000 条记录。这就是我们一直以来试图证明的

 

 

结论

     从上面的实验可以看出,这三种方法都表现良好,尽管 SqlBulkCopy 在处理海量数据集方面是最好的。下面是我实验的简单比较。

Comparison for Normal, XML and SqlBulkCopy bulk data management 

 

值得关注的点  

在这里,我们可以学习如何在各种场景下以最佳的优化性能将批量数据插入数据库。处理批量数据的方法。

© . All rights reserved.