65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SQL Bulk Load 上传定宽文本文件

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.20/5 (3投票s)

2009 年 3 月 3 日

CPOL

3分钟阅读

viewsIcon

45672

downloadIcon

421

本项目演示了将那些定宽报表从大型机上传到 SQL Server 表中最快的方法。

引言

该类需要 SQL Server 中的某个表用于导入,需要连接到该数据库,需要加载的文件,需要加载数据的字段名称以及文件中需要加载的字段。我将所有内容设置为从应用程序设置中读取,并快速构建了一个 WPF 文件浏览器前端来演示如何使用该类。

背景

在我工作过的所有地方,能够加载来自大型机报表的定宽文件都很重要,这些报表来自无法(或至少不提供)Web 服务、ODBC 或其他来源的数据。每次 Visual Studio 有新版本时,编写起来都会变得更容易。这是我最新的用于导入的类。

Using the Code

使用我的 BulkLoad 类的最简单方法是设置要加载的内容的字符串,然后将其传递给 BulkLoadFromStrings 函数。在此示例中,我将所有内容都设置为应用程序设置。

配置文件的外观如下

<configuration>
  <configsections>
    <sectiongroup name=""userSettings"" publickeytoken="b77a5c561934e089""
          culture="neutral," version="2.0.0.0,"
          type=""System.Configuration.UserSettingsGroup,">
      <section name=""Bulk_Import.Properties.Settings""
          publickeytoken="b77a5c561934e089"" culture="neutral," version="2.0.0.0,"
          type=""System.Configuration.ClientSettingsSection,"
          requirepermission=""false"" allowexedefinition=""MachineToLocalUser"">
    </sectiongroup>
  </configsections>
  <usersettings>
    <bulk_import.properties.settings>
      <setting name=""TableName"" serializeas=""String"">
        <value>drcAlden</value>
      </setting>
      <setting name=""FieldNamesCSV"" serializeas=""String"">
        <value>CLSNumber,YrPeriod,Vendor,Department,BoxCode,StoreNumber,StoreName,
         TypeScan,UPC   , Manufac,Description,Qty     ,Base   ,Credit  ,GSNumber</value>
      </setting>
      <setting name=""DefaultDir"" serializeas=""String"">
        <value>C:\</value>
      </setting>
      <setting name=""DefaultFile"" serializeas=""String"">
        <value>FixedWidthFile.txt</value>
      </setting>
      <setting name=""FormatCSV"" serializeas=""String"">
        <value>4-8     ,10-14    ,15-22  ,23-26    ,27-36   ,38-48      ,48-77   ,
           78-80    ,80-94  ,94-103 ,103-123   ,124-133 ,133-146 ,147-160,161-170</value>
      </setting>
    </bulk_import.properties.settings>
  </usersettings>
  <connectionstrings>
    <add name=""DamageReclaimEntities"" security="True;MultipleActiveResultSets=False"""
        catalog="Database;Integrated" source="Server;Initial"
        string=""Data" providername=""System.Data.EntityClient""
        connectionstring=
        ""metadata=res://*/Model1.csdl|res://*/Model1.ssdl|res://*/
        Model1.msl;provider=System.Data.SqlClient;provider">
    <add name=""Bulk_Import.Properties.Settings.DamageReclaimConnectionString""
        security="True"" catalog="Database;Integrated"
        source="DBServer;Initial" providername=""System.Data.SqlClient""
        connectionstring=""Data">
  </connectionstrings>
</configuration>

应用程序设置只是通过右键单击项目文件添加的,所以,除非你喜欢 XML,否则不用担心需要重新键入一堆完全有效的 XML。

这个 BulkLoadFromStrings 函数基本上会解析这些 strings 并使用它们来加载文件,调用类的解析例程。

/// <summary>
/// Does bulk load based on settings for file and table to be loaded.
/// </summary>
/// <param name=""TableName"">Name of table to be loaded.</param>
/// <param name=""FieldNamesCSV"">Comma Separated list of field names </param>
/// <param name=""FieldPositionCSV"">Comma separated list of field positions
/// separated by dashes</param>
/// <param name=""FileNameAndPath"">Path to fixed width file with data to be
/// loaded.</param>
/// <param name=""SQLConnectionString"">Connection string for DB to load
/// data in.</param>
/// <returns></returns>
public int BulkLoadFromStrings (   string TableName_in
                                ,   string FieldNamesCSV_in
                                ,   string FieldPositionCSV_in
                                ,   string FileNameAndPath_in
                                ,   string SQLConnectionString_in
                                )
{
    int iRecordsLoadedOut = 0;
    BulkLoadFields blfldsToLoad;
    BulkLoadTable bltblToLoad;
    SqlConnection con = null;
    Stream strmToLoad = null;
    DataTable dtToLoad = null;
    int iRecordsPreLoaded = 0;
    con = MakeConnection(SQLConnectionString_in);
    blfldsToLoad = new BulkLoadFields(FieldNamesCSV_in, FieldPositionCSV_in);
    bltblToLoad = MakeBulkLoadTable(TableName_in, con, blfldsToLoad.FieldNames);
    strmToLoad = FileToStream(FileNameAndPath_in);
    dtToLoad = bltblToLoad.Table;
    iRecordsPreLoaded = LoadTable(ref dtToLoad, blfldsToLoad.FieldPositions,
        strmToLoad);
    strmToLoad.Close();
    strmToLoad = null;
    bltblToLoad.Table = dtToLoad;
    iRecordsLoadedOut = DoBulkLoad(bltblToLoad, con);
    AddExportDate(FileNameAndPath_in, con.Database.ToString(),
        con.DataSource.ToString());
    return iRecordsLoadedOut;
}

该类使用 2 个结构来传递数据。BulkLoadFields 结构用于存储字段位置和名称的集合,而 BulkLoadTable 则是一个带有为每个列设置的 SQLBulkInsert 映射的 DataTable。这些映射对于 Bulk insert 的成功至关重要。

由于 SQLBulkCopy 对象似乎对数据类型和数据大小的差异非常敏感,因此最佳解决方案似乎是创建一个表来基于现有表进行加载。我看到其他示例中使用了数据读取器,似乎应该有更好的方法(或者至少一种不那么特定于 SQL Server 的方法)来获取表定义,但从 SQL 的系统表中选择描述对我来说效果很好。

/// <summary>
/// Creates BulkLoad table from a table definition from SQL system
/// objects and the list of column names
/// </summary>
/// <param name=""TableName"">Name of table to be loaded or copied from</param>
/// <param name=""con"">Database to get table def from</param>
/// <param name=""aFilesColumns"">Columns to define.</param>
/// <returns>BulkLoad table from a table definition from SQL system
/// objects</returns>
public BulkLoadTable MakeBulkLoadTable  (   string TableName
                                    ,   System.Data.SqlClient.SqlConnection con
                                    ,   string[] aFilesColumns
                                    )
{
    BulkLoadTable bltOut;
    bltOut = new BulkLoadTable();
    DataTable dtOut = null;
    SqlBulkCopyColumnMappingCollection sbcMapsOut = null;
    System.Data.SqlClient.SqlBulkCopy sbc = null;
    SortedList slFields = null;
    string[] aFilesFormat = null;
    string strFieldDFormatCSV = string.Empty;
    string strColumnName = string.Empty;
    string strColumnType = string.Empty;
    string strFromFile = string.Empty;
    string strFile = string.Empty;
    SqlDataReader sdr = null;
    SqlDbType FieldType = SqlDbType.Variant;
    SqlCommand sqlCmd = null;
    int iCharacterMaximumLength = 0;
    StringBuilder sbInsertFields = null;
    StringBuilder sbSelectFields = null;
    //Get a data reader to peek at the column types that we will be
    //adding rows to.
    sqlCmd = new SqlCommand(" select Column_NAme, Data_Type, ISNULL(
        Character_Maximum_Length, 0) Max " +
                            " from information_schema.columns " +
                            " where table_name like '" + TableName + "'", con);
    sqlCmd.CommandType = CommandType.Text;
    if (sqlCmd != null)
    {
        aFilesFormat = strFieldDFormatCSV.Split(char.Parse(","));
        if (aFilesFormat != null)
        {
            if (con.State != ConnectionState.Open)
            {
                con.Open();
            }
            sdr = sqlCmd.ExecuteReader(CommandBehavior.CloseConnection);
            if (sdr != null)
            {
                sbSelectFields = new StringBuilder();
                foreach (string strColName in aFilesColumns)
                {
                    if (sbSelectFields.Length!=0)
                    {
                        sbSelectFields.Append(",");
                    }
                    sbSelectFields.Append("[");
                    sbSelectFields.Append(strColName.Trim());
                    sbSelectFields.Append("]");
                }
                SqlDataAdapter sdaForWrite = new SqlDataAdapter(
                    "Select " + sbSelectFields.ToString() + " from " +
                    TableName + " where 1= -1", con);
                sbc = new System.Data.SqlClient.SqlBulkCopy(con);
                sbcMapsOut = sbc.ColumnMappings;
                sbInsertFields = new StringBuilder("INSERT INTO ");
                sbInsertFields.Append(TableName);
                sbInsertFields.Append(" (");
                sbInsertFields.Append(sbSelectFields.ToString());
                sbInsertFields.Append(") VALUES (");
                sbInsertFields.Append(sbSelectFields.Replace(
                    "[", "@").Replace("]", ""));
                sbInsertFields.Append(")");
                sdaForWrite.InsertCommand = new SqlCommand();
                sdaForWrite.InsertCommand.CommandText = sbInsertFields.ToString();
                sbInsertFields = null;
                slFields = new SortedList();
                if (sdr.HasRows)
                {
                    if (sdr.Read())
                    {
                        do
                        {
                            strColumnName = sdr["Column_NAme"].ToString().Trim();
                            if (HasColumn(aFilesColumns, strColumnName))
                            {
                                strColumnType = sdr["Data_Type"].ToString();
                                if (sdr["Max"] != null)
                                {
                                    iCharacterMaximumLength = (int)sdr["Max"];
                                }
                                else
                                {
                                    iCharacterMaximumLength = 0;
                                }
                                 switch (strColumnType)
                                {
                                    case "bigint":
                                        FieldType = SqlDbType.BigInt;
                                        break;
                                    case "int":
                                        FieldType = SqlDbType.Int;
                                        break;
                                    case "smallint":
                                        FieldType = SqlDbType.SmallInt;
                                        break;
                                    case "tinyint":
                                        FieldType = SqlDbType.TinyInt;
                                        break;
                                    case "bit":
                                        FieldType = SqlDbType.Bit;
                                        break;
                                    case "decimal":
                                        FieldType = SqlDbType.Decimal;
                                        break;
                                    case "money":
                                        FieldType = SqlDbType.Money;
                                        break;
                                    case "smallmoney":
                                        FieldType = SqlDbType.SmallMoney;
                                        break;
                                    case "nchar":
                                        FieldType = SqlDbType.NChar;
                                        break;
                                    case "ntext":
                                        FieldType = SqlDbType.NText;
                                        break;
                                    case "nvarchar":
                                        FieldType = SqlDbType.NVarChar;
                                        break;
                                    case "datetime":
                                        FieldType = SqlDbType.DateTime;
                                        break;
                                    case "smalldatetime":
                                        FieldType = SqlDbType.SmallDateTime;
                                        break;
                                    case "char":
                                        FieldType = SqlDbType.Char;
                                        break;
                                    case "varchar":
                                        FieldType = SqlDbType.VarChar;
                                        break;
                                }
                                sdaForWrite.InsertCommand.Parameters.Add(
                                    "@" + strColumnName, FieldType,
                                    iCharacterMaximumLength, strColumnName);
                                sbcMapsOut.Add(new SqlBulkCopyColumnMapping(
                                    strColumnName, strColumnName));
                            }
                        }
                        while (sdr.Read());
                    }
                }
                //Done with the data reader now that we have the field types.
                sdr.Close();
                DataSet ds = new DataSet();
                sdaForWrite.Fill(ds, TableName);
                dtOut = ds.Tables[TableName];
                sdr = null;
                bltOut = new BulkLoadTable(dtOut, sbcMapsOut);
            }
        }
    }
    return bltOut;
}

定义了表和字段后,下一步就是从平面文件中加载数据。我已经将文件加载到流中,我基本上按行结束符将其拆分,然后遍历结果字符串数组以从每行中解析字段。

/// <summary>
/// Loads a datatable by parsing a fixed width file.
/// </summary>
/// <param name=""TableToLoad"">Data table to load</param>
/// <param name=""FieldPositions_in"">Field positions that the file will
/// be parsed with</param>
/// <param name=""FiletoLoad"">Stream to read data from.</param>
/// <returns></returns>
public int LoadTable    (   ref DataTable TableToLoad
                        ,   SortedList<string,> FieldPositions_in
                        ,   Stream FiletoLoad
                        )
{
    int iRecordsLoadedOut = 0;
    string strSQLCon = string.Empty;
    string strTableName = string.Empty;
    DataRow dr = null;
    string[] astrFile = null;
    string strFromFile = string.Empty;
    string strFld = string.Empty;
    string strFile = string.Empty;
    int iStart = 0;
    int iEnd = 0;
    int iFileLen = 0;
    StreamReader sr = null;
    iFileLen = (int)FiletoLoad.Length;
    sr = new StreamReader(FiletoLoad,System.Text.Encoding.UTF8,false,iFileLen);
    if (FiletoLoad != null)
    {
        if (sr != null)
        {
            //FiletoLoad.Seek(1, SeekOrigin.Begin);
            strFile = sr.ReadToEnd();
            sr.Close();
            sr = null;
            if (strFile.Contains("\r"))
            {
                strFile.Replace("\n", "");
                astrFile = strFile.Split(Char.Parse("\r"));
            }
            if (astrFile != null)
            {
                foreach (string strLine in astrFile)
                {
                    if (strLine != "\n")
                    {
                        dr = TableToLoad.NewRow();
                        foreach (DataColumn col in TableToLoad.Columns)
                        {
                            strFld = col.ColumnName;
                            dr[strFld.Trim()] = DBNull.Value;
                            iStart = FieldPositions_in[strFld].Start;
                            iEnd = FieldPositions_in[strFld].End;
                            if ((iStart <= strLine.Replace("\n", "").Length) &
                                (iStart + (iEnd - iStart) <=
                                strLine.Replace("\n", "").Length))
                            {
                                strFromFile = strLine.Replace("\n",
                                    "").Substring(iStart, iEnd - iStart).Trim();
                                if (strFromFile.Length > 0)
                                {
                                    if (TableToLoad.Columns[strFld.Trim()] != null)
                                    {
                                        if (strFromFile != string.Empty)
                                        {
                                            strFromFile = strFromFile.Replace(
                                                "'", "''");
                                            if (strFromFile != ".")
                                            {
                                                dr[strFld.Trim()] =
                                                    strFromFile.Trim().ToString();
                                            }
                                        }
                                    }
                                }
                            }
                         }
                        TableToLoad.Rows.Add(dr);
                    }
                }
            }
        }
    }
    //TableToLoad.AcceptChanges();
    return iRecordsLoadedOut;
}

LoadTable 加载的 DataTable 然后会传递回 BulkLoadTable 结构,以便其列可以与相应的 SqlBulkCopyColumnMappingCollection 保持关联,并在调用 WriteToServer 后,记录就会出现在新表中。

/// <summary>
/// Bulk loads a BulkLoadTable with SQL Server's BCP components.
/// </summary>
/// <param name=""bct_in"">Table to load</param>
/// <param name=""Con_in"">Open Connection to use.</param>
/// <returns></returns>
public int DoBulkLoad(BulkLoadTable bct_in, SqlConnection Con_in)
{
    int iRecordsLoadeOut = 0;
    SqlBulkCopy sbc = null;
    SqlBulkCopyColumnMappingCollection sbcMaps = null;
    DataTable dtToLoad =null;
    string strTableName = null;
    dtToLoad = bct_in.Table;
    strTableName = dtToLoad.TableName;
    sbc = new SqlBulkCopy(Con_in);
    sbcMaps = bct_in.Maps;
    foreach (SqlBulkCopyColumnMapping map in sbcMaps)
    {
        sbc.ColumnMappings.Add(map);
    }
    sbc.BatchSize = dtToLoad.Rows.Count;
    sbc.DestinationTableName = strTableName;
    sbc.BulkCopyTimeout = 0;
    if (Con_in.State != ConnectionState.Open)
    {
        Con_in.Open();
    }
    sbc.WriteToServer(dtToLoad, DataRowState.Added);
    if (Con_in.State != ConnectionState.Closed)
    {
        Con_in.Close();
    }
    iRecordsLoadeOut = dtToLoad.Rows.Count;
    return iRecordsLoadeOut;
}

关注点

我将 load 函数分解出来,以便如果需要在单个字段上进行一些特殊的验证和/或转换,可以在数据加载到 ADO 数据表中之后,但在发送到服务器之前完成。此外,在某个时候,为特殊表或其他文件格式覆盖 load 或 make table 函数可能是有意义的。

尽管本示例通过解析字符串中的所有设置来演示,但没有理由说明另一个应用程序可能想以不同的方式加载字段位置结构,或者使用 FTP 套接字流来获取文件而不是从磁盘读取。

希望在未来的 ADO 版本中,这会变得更容易,并且表将能更好地了解其自身模式信息。

历史

  • 2009 年 3 月 3 日:初始发布
© . All rights reserved.