65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SSIS 对象模型导入数据

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.74/5 (8投票s)

2007年2月3日

CPOL

6分钟阅读

viewsIcon

129920

downloadIcon

1062

介绍了如何使用 SSIS 对象模型将数据导入到 SQL Server 2005

Sample image

引言

SQL Server 2005 Integration Service 提供了一套丰富的 API,在本文中,我将向您展示一种通过 C# 代码以编程方式创建和执行 SSIS 包的方法。

背景

几周前,我需要将一个巨大的 CSV(逗号分隔值)文件导入 SQL Server 数据库。嗯,这并不是一件非常困难的事情,我本不必为此写一篇文章。任何人都可以使用 SQL Server 2005 导入向导来完成这项工作,它是一个非常灵活的工具,可以处理此类任务。但我需要以编程方式完成这项工作。这促使我思考并在网上搜索解决方案。经过几个小时的研究,我发现 SQL Server 2005 提供了 SSIS(SQL Server 2005 Integration Service)丰富的 API,任何人都可以轻松地创建 SSIS 包(以前称为 DTS 包)并执行该包。不幸的是,我没有找到创建从 CSV 文件导入数据到 SQL Server 2005 数据库的包的代码。在我搜索过程中发现的大多数演示代码都是将数据从 SQL Server 导入到 SQL Server,或者从 SQL Server 导入到任何平面文件。通过遵循现有的演示代码,为我的目的实现一个包应该相对简单。但很快我发现这与那些示例代码略有不同。经过两天的深思熟虑,我创建了一个让我满意的代码库。在本文中,我将与您分享我的代码和想法。

使用代码

在我的代码库中,我为这个导入业务创建了两个项目。第一个项目(ImportLib)是一个类库,实现了 SSIS 包创建任务;第二个项目(Import)是一个 Windows 窗体应用程序,实际上是一个 DEMO,它以第一个项目为引用并启动一个导入作业。因此,研究第一个项目就足以对我的实际工作有一个很好的了解。

我创建的对象模型基本上基于几个接口,即 ImportLib.ISourceStorageImportLib.IDestinationStorageImportLib.IntegrationService.ISourceImportLib.IntegrationService.IDestination。在深入细节之前,我想表达一下我的想法。

在探索 SSIS 后,您会发现将数据从任何类型的数据源导入任何数据目标都非常容易。SSIS 对象模型实现了一些定义良好的接口来支持这个出色的功能。既然我正在以编程方式创建包,为什么不以一种可以更好地扩展的方式来实现我的代码呢?目前它将能够从 CSV 文件导入数据到 SQL 数据库,但谁知道明天我可能需要将数据从 Excel 文件导入到 SQL Server 或其他任何数据目标呢?这个想法促使我编写了 ImportLib.ISourceStorageImportLib.IDestinationStorage 接口。我想您已经对这些接口的用途有了一个大致的了解——对吗?是的,对于所有类型的源存储,我都定义了 ImportLib.ISourceStorage 接口。目前我只为 CSV 文件源实现了这个接口,但将来当我编写 Excel 源时,我将再次实现这个接口。同样的想法也适用于目标存储,接口是 ImportLib.IDestinationStorage

让我们来看看这些接口

/// <summary>
/// Defines a contact for the data source
/// </summary>
public interface ISourceStorage
{
    /// <summary>
    /// Initializes the data source
    /// </summary>
    /// <param name="properties">
    /// A <see cref="T:IDictionary<string,
    /// string>"/> that contains
    /// the initialization properties</param>
    void InitializeSource(IDictionary<string, string> properties);

    /// <summary>
    /// Get the schema table for the data source
    /// </summary>
    /// <returns>An instance of <
    /// see cref="T:DataTable"/> that contains the schema
    /// information of the data source</returns>
    DataTable GetSchemaTable();

    /// <summary>
    /// Get the Category of the underlying persistent storage
    /// </summary>
    StorageMedium StorageMedium { get; }

    /// <summary>
    /// Get or set the mapping manager
    /// </summary>
    ColumnMappingController MapManager { get; set; }
}

并且

/// <summary>
///     Defines a contact for the Data destination
/// </summary>
public interface IDestinationStorage
{
    /// <summary>
    /// Initializes the data destination
    /// </summary>
    /// <param name="properties">
    /// A <see cref="T:IDictionary<string,
    /// string>"/> that contains
    /// the initialization properties</param>
    void InitializeDestination(IDictionary<string,
        string> properties);

    /// <summary>
    /// Get the Category of the underlying persistent storage
    /// </summary>
    StorageMedium StorageMedium { get; }

    /// <summary>
    /// Get or set the mapping manager
    /// </summary>
    ColumnMappingController MapManager { get; set; }
}

现在让我们看看具体的实现。ImportLib.Delimited.DelimitedDataSource 是实现了 ImportLib.ISourceStorage 的具体类。这个类简单地封装了接口定义的、针对 CSV 文件的功能。它使用 System.Data.OleDb.OleDbConnection 类读取 CSV 文件以获取源架构。另一个我命名为 ImportLib.Sql.SqlStorageDestination 的具体类实现了 ImportLib.IDestinationStorage 接口。这个类提供了接口定义的功能,并且还包含用于创建目标数据存储(数据库和表)的例程。

到目前为止一切顺利。现在,让我们专注于 SSIS 特定的类和接口。嗯,在这里,我再次定义了之前提到的两个接口:ImportLib.IntegrationService.ISourceImportLib.IntegrationService.IDestination。让我们来看看它们。

/// <summary>
/// Defines a contact for the SQL Server
/// Integration Service source objects.
/// </summary>
/// <remarks>
/// Different SSIS source objects should satisfy this interface.
/// Like
/// Flat file data source, OleDB data source etc.
/// </remarks>
public interface ISource
{
    /// <summary>
    /// get or set the <
    /// see cref="T:ImportLib.IDataSource"/> instance
    /// that represents the physical data source.
    /// </summary>
    ISourceStorage DataSource
    {
        get;
        set;
    }

    /// <summary>
    /// Creates a connection manager instance for a given source file
    /// into the context of the given <
    /// see cref="T:ImportLib.IntegrationService
    /// .SsisPackage"/>
    /// object.
    /// </summary>
    /// <param name="package">
    /// An instance of <
    /// see
    /// cref="T:ImportLib.IntegrationService
    /// .SsisPackage"/>.</param>
    void CreateConnection(DTSPackage package);

    /// <summary>
    /// Creates the source dataflow component.
    /// </summary>
    /// <param name="package">The package
    /// instance.</param>
    /// <param name="dataflowTask">
    /// The dataflow task under which the component will be
    /// created.</param>
    /// <returns>An instance of <see
    /// cref="T:SsisDataflowComponent"/>.</returns>
    DataflowComponent CreateSourceDataFlowComponent(
        DTSPackage package, DTSExecutable dataflowTask);

    /// <summary>
    /// Initialize the source dataflow component
    /// </summary>
    /// <param name="sourceDataFlowComponent">
    /// An instance of <see cref="T:
    /// SsisDataflowComponent"/></param>
    void InitializeDataflowComponent(DataflowComponent
        sourceDataFlowComponent);
}

还有另一个是

/// <summary>
/// Defines a contact for the SQL Server Integration
/// Service Destination objects.
/// </summary>
/// <remarks>
/// Different data destinations should satisfy
/// this interface. such as Flat file
/// destinations, OleDB destination etc.
/// </remarks>
public interface IDestination
{
    /// <summary>
    /// Get or set the <see cref="T:ImportLib.
    /// IDataDestination"/> instance
    /// that represents the physical data destination.
    /// </summary>
    IDestinationStorage DataDestination
    {
        get;
        set;
    }

    /// <summary>
    /// Get or set the <see cref="T:ImportLib.
    /// ISsisSource"/> instance
    /// </summary>
    ISource SsisSource
    {
        get;
        set;
    }

    /// <summary>
    /// Creates a connection manager instance for a given source file
    /// into the context of the
    /// given <see cref="T:ImportLib.IntegrationService.
    /// SsisPackage"/>
    /// object.
    /// </summary>
    /// <param name="package">
    /// An instance
    /// of <see cref="T:ImportLib.
    /// IntegrationService.SsisPackage"/>.</param>
    void CreateConnection(DTSPackage package);

    /// <summary>
    /// Creates the task that will create the
    /// destination storage (ex. database, table etc);
    /// </summary>
    /// <param name="package">The
    /// package where the task should be created</param>
    /// <returns>
    /// An instance of <see cref="T:ImportLib.IntegrationService.
    /// SsisExecutable"/></returns>
    DTSExecutable CreateStorageCreationTask(DTSPackage package);

    /// <summary>
    /// Creates the destination dataflow component.
    /// </summary>
    /// <param name="package">The package
    /// instance.</param>
    /// <param name="dataflowTask">
    /// The dataflow task under which the component will
    /// be created.</param>
    /// <returns>
    /// An instance of <see cref="T:SsisDataflowComponent
    /// "/>.</returns>
    DataflowComponent CreateDestinationDataFlowComponent
        (DTSPackage package, DTSExecutable dataflowTask);

    /// <summary>
    /// Initialize the destination dataflow component
    /// </summary>
    /// <param name="destinationDataFlowComponent">
    /// An instance of <see
    /// cref="T:SsisDataflowComponent"/></param>
    void InitializeDataflowComponent(DataflowComponent
        destinationDataFlowComponent);
}

这些接口严格定义了 SSIS 特定的例程,例如创建连接、创建数据流组件以及一些初始化例程。定义 SSIS 数据流超出了本文的范围,有关这些 SSIS 组件的详细信息,请参阅 MSDN。现在我们将看一下具体的实现。ImportLib.IntegrationService.Specialized.FlatFileSourceImportLib.IntegrationService.ISource 接口的具体实现。看看这个类。这里重要的部分是我们必须显式创建源列(CreateSourceColumns 方法),SSIS 永远不会为您创建这些列。

namespace ImportLib.IntegrationService.Specialized
{
    /// <summary>
    /// Contains the implementations of the
    /// <see
    /// cref="T:ImportLib.IntegrationService.
    /// ISsisSource"/>
    /// interface.
    /// </summary>
    public class FlatFileSource : ISource
    {
        // The delimited data source instance
        private DelimitedDataSource delimitedDataSource;
        // The Moniker text for the flatfile source
        public const string FlatFileMoniker = @"FLATFILE";
        // The source data flow component GUID
        public const string SourceDataFlowComponentID
          = "{90C7770B-DE7C-435E-880E-E718C92C0573}";
        // schema table
        private DataTable schemaTable = null;
        // The connection manager instance
        private ConnectionManager connectionManager;

        /// <summary>
        /// Creates a new instance
        /// </summary>
        public FlatFileSource()
        {

        }

        #region ISsisSource Members

        /// <summary>
        /// get or set the <see
        /// cref="T:ImportLib.IDataSource"/> instance
        /// that represents the physical data source.
        /// </summary>
        public ISourceStorage DataSource
        {
            get { return delimitedDataSource; }
            set
            {
                // initializing
                delimitedDataSource = value as DelimitedDataSource;
                // Get the schema table
                schemaTable = value.GetSchemaTable();
                // Assert
                Debug.Assert(delimitedDataSource != null);
            }
        }

        /// <summary>
        /// Creates a connection manager instance for a given source file
        /// into the context of the given <see
        /// cref="T:ImportLib.IntegrationService.SsisPackage"/>
        /// object.
        /// </summary>
        /// <param name="package">An instance of
        /// <see cref="T:ImportLib.IntegrationService.
        /// SsisPackage"/>.</param>
        public void CreateConnection(DTSPackage package)
        {
            #region Logging
            Logger.WriteInformation(
              "Creating connection to the source file.");
            #endregion
            // creating a connection manager
            // instance using the FLATFILE moniker
            connectionManager =
              package.InnerObject.Connections.Add(FlatFileMoniker);
            connectionManager.ConnectionString =
                     delimitedDataSource.FileName;
            connectionManager.Name =
               "SSIS Connection Manager for Files";
            connectionManager.Description =
            string.Concat("SSIS Connection Manager");
            // Setting some common properties of the connection manager object
            connectionManager.Properties
            ["ColumnNamesInFirstDataRow"].
                 SetValue(connectionManager,
              delimitedDataSource.FirstRowIsHeader);
            connectionManager.Properties["Format"].SetValue
               (connectionManager, "Delimited");
            connectionManager.Properties["HeaderRowDelimiter"].

                 SetValue(connectionManager,
              delimitedDataSource.HeaderRowDelimiter);
            if (delimitedDataSource.TextQualifier != null)
            {   // If user has been specified a text qualifier then put
               // it into the connection string property
                connectionManager.Properties["TextQualifier"]

                 .SetValue(connectionManager,
                    delimitedDataSource.TextQualifier);
            }
            // create the source columns into the connection manager
            CreateSourceColumns();

            #region Logging
            Logger.WriteInformation(
                "Creating connection to the source file.....Completed");
            #endregion
        }

        /// <summary>
        /// Creates the source dataflow component.
        /// </summary>
        /// <param name="package">The package
        /// instance.</param>
        /// <param name="dataflowTask">
        /// The dataflow task under which the component will be
        /// created.</param>
        /// <returns>An instance of <see
        /// cref="T:SsisDataflowComponent"/>.</returns>
        public DataflowComponent CreateSourceDataFlowComponent
                  (DTSPackage package, DTSExecutable dataflowTask)
        {
            // create the component
            DataflowComponent sourceDataFlowComponent =
             new DataflowComponent(dataflowTask, SourceDataFlowComponentID,
                        "Source Data Flow component");

            return sourceDataFlowComponent;
        }

        /// <summary>
        /// Initialize the source dataflow component
        /// </summary>
        /// <param name="sourceDataFlowComponent">
        /// An instance of <see
        /// cref="T:SsisDataflowComponent"/>
        /// </param>
        public void InitializeDataflowComponent(
            DataflowComponent sourceDataFlowComponent)
        {
            #region Logging
            Logger.WriteInformation(
            "Initializing the managed instance for the source file.");
            #endregion
            // load the COM for the given GUID
            CManagedComponentWrapper managedFlatFileInstance =
                  sourceDataFlowComponent.ComponentInstance;
            // get the populate the properties
            managedFlatFileInstance.ProvideComponentProperties();
            // putting the connection
            if (
              sourceDataFlowComponent.InnerObject.
            RuntimeConnectionCollection.Count > 0)
            {   // If connection is necessary
                sourceDataFlowComponent.InnerObject.
              RuntimeConnectionCollection[0].ConnectionManagerID =
                    connectionManager.ID;
                    sourceDataFlowComponent.InnerObject.
            RuntimeConnectionCollection[0].ConnectionManager =
                                DtsConvert.ToConnectionManager90
                                      (connectionManager);

            }
            // establish a connection
            managedFlatFileInstance.AcquireConnections(null);
            // Initialize the metadata
            managedFlatFileInstance.ReinitializeMetaData();
            // create the mapping now
            IDTSExternalMetadataColumn90 exOutColumn;
            foreach (IDTSOutputColumn90 outColumn in
                sourceDataFlowComponent.InnerObject.
                      OutputCollection[0].OutputColumnCollection)
            {   // create the MAP
                exOutColumn =
                    sourceDataFlowComponent.InnerObject.
                    OutputCollection[0].
                 ExternalMetadataColumnCollection[outColumn.Name];
                // map it
                managedFlatFileInstance.MapOutputColumn(
                    sourceDataFlowComponent.InnerObject.
                      OutputCollection[0].ID, outColumn.ID,
                    exOutColumn.ID, true);
            }
            // Release the connection now
            managedFlatFileInstance.ReleaseConnections();

            #region Logging
            Logger.WriteInformation
                 ("Initializing the managed instance for the source
                   file......completed");
            #endregion
        }

        #endregion

        /// <summary>
        /// Creates the source columns for the flat file connection manager
        /// instance
        /// </summary>
        private void CreateSourceColumns()
        {
            // get the actual connection manger instance
            RuntimeWrapper.IDTSConnectionManagerFlatFile90
            flatFileConnection =
                connectionManager.InnerObject
               as RuntimeWrapper.IDTSConnectionManagerFlatFile90;

            RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
            RuntimeWrapper.IDTSName90 name;

            // trace the current count
            Debug.WriteLine(flatFileConnection.Columns.Count);

            DataTable schemaTable
             = DataSource.GetSchemaTable(); // get the schema table

            foreach (DataRow row in schemaTable.Rows)
            {   // iterate
                string colName
                = row["ColumnName"] as string;
                     // get the col name
                // now create a new column for the connection manager
                column
                 = flatFileConnection.Columns.Add();
                        // if this is the last row

                if (schemaTable.Rows.IndexOf(row)
                     == (schemaTable.Rows.Count - 1))
                    column.ColumnDelimiter =
                        delimitedDataSource.HeaderRowDelimiter;
                // add the row delimiter
                else
                    column.ColumnDelimiter = delimitedDataSource.Delimiter;

                column.TextQualified =
                delimitedDataSource.TextQualifier != null;
                column.ColumnType = "Delimited";
                column.DataType =
                 RuntimeWrapper.DataType.DT_WSTR;
                column.DataPrecision = 0;
                column.DataScale = 0;
                name = (RuntimeWrapper.IDTSName90)column;
                name.Name = colName;
            }
        }
    }
}

现在让我们看看对应的目标实现。

namespace ImportLib.IntegrationService.Specialized
{
    /// <summary>
    /// Contains the implementation of the IDataDestination interface.
    /// </summary>
    public class SqlServerDataDestination : IDestination
    {
        // The SQL server data destination
        private SqlStorageDestination sqlDataDestination;
        // The Moniker text for the Ole DB
        public const string OleDBMoniker = "OLEDB";
        // the component ID
        public const string OleDBDestinationDataFlowComponentID
                 = "{E2568105-9550-4F71-A638-B7FE42E66922}";
        // source
        private ISource ssisSource;
        // connection manager
        private ConnectionManager connectionManager;

        #region ISsisDestination Members

        /// <summary>
        /// Get or set the <see
        /// cref="T:ImportLib.IDataDestination"/> instance
        /// that represents the physical data destination.
        /// </summary>
        public IDestinationStorage DataDestination
        {
            get { return sqlDataDestination; }
            set
            {
                // initializing
                sqlDataDestination = value as SqlStorageDestination;
                // Assert
                Debug.Assert(sqlDataDestination != null);
            }
        }

        /// <summary>
        /// Get or set the <see
        /// cref="T:ImportLib.ISsisSource"/> instance
        /// </summary>
        public ISource SsisSource
        {
            get { return ssisSource; }
            set { ssisSource = value; }
        }

        /// <summary>
        /// Creates a connection manager instance for a given source file
        /// into the context of the given <see
        /// cref="T:ImportLib.IntegrationService.SsisPackage"/>
        /// object.
        /// </summary>
        /// <param name="package">An instance of
        ///  <see cref="T:ImportLib.IntegrationService.SsisPackage
        /// "/>.</param>
        public void CreateConnection(DTSPackage package)
        {
            // Creating a connection using the oledb moniker
            connectionManager =
              package.InnerObject.Connections.Add(OleDBMoniker);
            connectionManager.ConnectionString =
                    GetSsisConnectionString();
            connectionManager.Name =
               "SSIS Connection Manager for Oledb";
            connectionManager.Description =
          string.Concat(
             "SSIS Connection Manager for ",
                   sqlDataDestination.DatabaseName);
        }

        /// <summary>
        /// Creates the task that will create the destination
        /// storage (ex. database, table etc);
        /// </summary>
        /// <param name="package">The package
        /// where the task should be created</param>
        /// <returns>An instance of
        /// <see cref="T:ImportLib.
        /// IntegrationService.SsisExecutable"/></returns>
        public DTSExecutable CreateStorageCreationTask(DTSPackage package)
        {
            // get the SQL task type
            Type taskType = typeof(ExecuteSQLTask);
            // create a task of type ExecuteSQLTask
            DTSExecutable executable = new DTSExecutable(package, taskType);

            // now configuring the new task
            TaskHost taskHost = executable.InnerObject as TaskHost;
                     // get the Task host instance
            ExecuteSQLTask sqlTask = taskHost.InnerObject as ExecuteSQLTask;
                     // get the SQL task from the host
            sqlTask.Connection = connectionManager.Name;
                     // set the connection manager
            sqlTask.SqlStatementSource =
                sqlDataDestination.GetDestinationTableCreationSql
                    (ssisSource.DataSource.GetSchemaTable());
                     // set the SQL that generates the table

            return executable;
        }

        /// <summary>
        /// Creates the destination dataflow component.
        /// </summary>
        /// <param name="package">The package
        ///  instance.</param>
        /// <param name="dataflowTask">The dataflow task
        /// under which
        /// the component will be created.</param>
        /// <returns>An instance of <see
        /// cref="T:SsisDataflowComponent"/>.</returns>
        public DataflowComponent CreateDestinationDataFlowComponent
                  (DTSPackage package, DTSExecutable dataflowTask)
        {
            #region Logging
            Logger.WriteInformation(
              "Creating managed instances for the destination database");
            #endregion
            // create the component now
            DataflowComponent destinationDataFlowComponent
             = new DataflowComponent(dataflowTask,
                OleDBDestinationDataFlowComponentID,
            "Destination Oledb Component");

            // Before going thru the initialization we need
            // to create the destination table
            // because the SSIS object model will try to
            // access that table fore reading the metadata
            sqlDataDestination.CreateDataStore
                    (ssisSource.DataSource.GetSchemaTable());

            // get the COM instance
            CManagedComponentWrapper managedOleInstance =
                        destinationDataFlowComponent.ComponentInstance;
            // populate the properties
            managedOleInstance.ProvideComponentProperties();
            // setting the connection
            if (destinationDataFlowComponent.InnerObject.
                      RuntimeConnectionCollection.Count > 0)
            {   // If connection is necessary
                destinationDataFlowComponent.InnerObject.
                     RuntimeConnectionCollection[0].
                    ConnectionManagerID =
                    connectionManager.ID;
                destinationDataFlowComponent.InnerObject.
                     RuntimeConnectionCollection[0].
                        ConnectionManager =
                                DtsConvert.ToConnectionManager90
                                    (connectionManager);

            }
            // Set the custom properties.
            managedOleInstance.SetComponentProperty(
                  "AccessMode", 0);
                          // Table of View mode
            managedOleInstance.SetComponentProperty
                ("AlwaysUseDefaultCodePage", false);
                          // Default Codepage
            managedOleInstance.SetComponentProperty
                        ("DefaultCodePage", 1252);
                          // Set it
            managedOleInstance.SetComponentProperty
                          ("FastLoadKeepIdentity", false);
                          // Fast load
            managedOleInstance.SetComponentProperty
                           ("FastLoadKeepNulls", false);
            managedOleInstance.SetComponentProperty
                          ("FastLoadMaxInsertCommitSize", 0);
            managedOleInstance.SetComponentProperty
                                 ("FastLoadOptions",
                            "TABLOCK,CHECK_CONSTRAINTS");
            managedOleInstance.SetComponentProperty("OpenRowset",
                string.Format("[{0}].[dbo].[{1}]",
                   sqlDataDestination.DatabaseName,
                        sqlDataDestination.TableName));



            #region Logging
            Logger.WriteInformation(
          "Creating managed instances for the destination
            database....completed");
            #endregion
            return destinationDataFlowComponent;
        }

        /// <summary>
        /// Initialize the destination dataflow component
        /// </summary>
        /// <param name="destinationDataFlowComponent">
        /// An instance of <see
        /// cref="T:SsisDataflowComponent"/></param>
        public void InitializeDataflowComponent(DataflowComponent
                  destinationDataFlowComponent)
        {
            #region Logging
            Logger.WriteInformation(
                "Creating the destination columns and their mappings");
            #endregion
            // Get the COM instance
            CManagedComponentWrapper managedOleInstance

                     = destinationDataFlowComponent.ComponentInstance;

            // Now activate a connection and create the mappings

            // Establish a connection
            managedOleInstance.AcquireConnections(null);
            // initialize the metadata
            managedOleInstance.ReinitializeMetaData();
            // Get the destination's default input and virtual input.
            IDTSInput90 input = destinationDataFlowComponent.
                      InnerObject.InputCollection[0];
            IDTSVirtualInput90 vInput = input.GetVirtualInput();

            // Iterate through the virtual input column collection.
            foreach (IDTSVirtualInputColumn90 vColumn
                   in vInput.VirtualInputColumnCollection)
            {
                bool res = sqlDataDestination.MapManager.
              IsSuppressedSourceColumn
                  (vColumn.Name,ssisSource.DataSource.GetSchemaTable());
                if (!res)
                {
                    // Call the SetUsageType method of the destination
                    //  to add each available virtual input column as
                    // an input column.
                    managedOleInstance.SetUsageType(
                       input.ID, vInput, vColumn.LineageID,
                    DTSUsageType.UT_READONLY);
                }
            }

            IDTSExternalMetadataColumn90 exColumn;
            foreach (IDTSInputColumn90 inColumn in
                destinationDataFlowComponent.InnerObject.
                  InputCollection[0].InputColumnCollection)
            {   // create the map
                exColumn = destinationDataFlowComponent.InnerObject.
                      InputCollection[0].
                 ExternalMetadataColumnCollection[inColumn.Name];
                string destName = sqlDataDestination.MapManager.
                      GetDestinationColumn(exColumn.Name).ColumnName;
                exColumn.Name = destName;

                managedOleInstance.MapInputColumn(
                           destinationDataFlowComponent.
                   InnerObject.InputCollection[0].ID,
                           inColumn.ID, exColumn.ID);
            }
            // Now release the connection
            managedOleInstance.ReleaseConnections();

            // Now remove the table that we did create
            // for the SSIS object model
            sqlDataDestination.DeleteDataStore();

            #region Logging
            Logger.WriteInformation

              ("Creating the destination columns and their
                 mappings.....completed");
            #endregion
        }

        #endregion

        /// <summary>
        /// Get the SSIS compatible connection string.
        /// </summary>
        /// <returns>A Connection string that is compatible with
        /// SSIS</returns>
        /// <remarks>
        ///     The SSIS Oledb connections uses a provider different than
        /// usual SQL client provider.
        /// It is "SQLNCLI". Without this provider the SSIS cant
        /// create a connection. (!)
        /// On the other hand this provider (SQLNCLI) cant be used as SQL
        /// Client (.NET class) connections.
        /// So this procedure converts a SQL client connection string to SSIS
        /// compatible connection
        /// string by modifying the provider.
        /// </remarks>
        private string GetSsisConnectionString()
        {
            //connectionManager.ConnectionString =
              "Data Source=VSTS;
            Initial Catalog=TEST;Provider=SQLNCLI;Integrated
                 Security=SSPI;Auto Translate=false;";
            //ConMgr.ConnectionString = "
            // Data Source=VSTS;Initial Catalog=TEST;Integrated
            // Security=True";
            string connectionString
                  = sqlDataDestination.ConnectionString;
                 // get the SQL connection string
            Dictionary<string,
              string> connectionProperties =
                 new Dictionary<string, string>();

            foreach( string part in
            connectionString.Split(";".ToCharArray()))
            {   // Iterate thru the properties of the connection string
                string[] keyValue = part.Split("=".ToCharArray(),
                    StringSplitOptions.RemoveEmptyEntries);
                if (keyValue != null && keyValue.Length == 2)
                {
                    string propertyName = keyValue[0].Trim();
                              // the name of the property
                    string valueName = keyValue[1].Trim();
                              // the value of the property
                    // create the entry
                    connectionProperties.Add(propertyName, valueName);
                }
            }
            // Now update these followings
            connectionProperties["Provider"]
                        = "SQLNCLI";
            connectionProperties["Integrated Security"]
                      = "SSPI";
            connectionProperties["Auto Translate"]
                         = "false";

            // Now we are going to create the SSIS compatible
            // connectionstring

            StringBuilder ssisCompatibaleConnectionString
                     = new StringBuilder();
            for (Dictionary<string, string>.Enumerator iterator
               =
            connectionProperties.GetEnumerator();
                   iterator.MoveNext(); )
            {   // Iterate
                if (ssisCompatibaleConnectionString.Length > 0)
                {   // If already there is some properties added
                    ssisCompatibaleConnectionString.Append(";");
                }
                ssisCompatibaleConnectionString.Append(
                    string.Format("{0}={1}",
                     iterator.Current.Key, iterator.Current.Value));
            }

            return ssisCompatibaleConnectionString.ToString();
        }
    }
}

除了这些核心的导入业务类之外,我还为 SSIS 包编写了一个日志提供程序,通过它可以在包执行期间将日志记录到 GUI。创建日志提供程序意味着您必须扩展 Microsoft 提供的 LogProviderBase 类。这是我的日志提供程序类。

namespace ImportLib.IntegrationService.Logging
{
    /// <summary>
    /// A custom log provider class
    /// </summary>
    /// <author>
    ///     Moim Hossain
    /// </author>
    [DtsLogProvider(DisplayName = "LogProvider",

  Description = "
    Log provider for DTS packages.",
               LogProviderType = "Custom")]
    public class EventLogProvider : LogProviderBase
    {
        /// <summary>
        /// Configuration string
        /// </summary>
        public override string ConfigString
        {
            get
            {
                return string.Empty;
            }
            set
            {

            }
        }

        /// <summary>
        /// Open log
        /// </summary>
        public override void OpenLog()
        {
            base.OpenLog();
        }

        /// <summary>
        /// Closing log
        /// </summary>
        public override void CloseLog()
        {
            base.CloseLog();
        }

        /// <summary>
        /// Initializations
        /// </summary>
        /// <param name="connections"></param>
        /// <param name="events"></param>
        /// <param name="refTracker"></param>
        public override void InitializeLogProvider
      (Connections connections,
      IDTSInfoEvents events,
        ObjectReferenceTracker refTracker)
        {
            base.InitializeLogProvider(connections, events, refTracker);
        }

        /// <summary>
        /// Write a log
        /// </summary>
        /// <param name="logEntryName"></param>
        /// <param name="computerName"></param>
        /// <param name="operatorName"></param>
        /// <param name="sourceName"></param>
        /// <param name="sourceID"></param>
        /// <param name="executionID"></param>
        /// <param name="messageText"></param>
        /// <param name="startTime"></param>
        /// <param name="endTime"></param>
        /// <param name="dataCode"></param>
        /// <param name="dataBytes"></param>
        public override void Log(string logEntryName,
               string computerName,
               string operatorName,
               string sourceName,
               string sourceID,
               string executionID, string messageText,
               DateTime startTime,
               DateTime endTime, int dataCode, byte[] dataBytes)
        {
            //base.Log(logEntryName, computerName, operatorName,
            // sourceName, sourceID, executionID,
            // messageText, startTime, endTime, dataCode, dataBytes);

            LogEventArgs e = new
                   LogEventArgs(logEntryName,computerName,messageText);
            //
            OnLogCreated(e);
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnLogCreated(LogEventArgs e)
        {
            LogCreatedDelegate mLogCreated = this.LogCreated;

            if (mLogCreated != null)
            {
                mLogCreated(this, e);
            }
        }

        /// <summary>
        ///
        /// </summary>
        public event LogCreatedDelegate LogCreated;
    }

    /// <summary>
    ///
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    public delegate void LogCreatedDelegate ( object sender , LogEventArgs e );
}

好了,最后但同样重要的一点是映射。这意味着,在导入任务期间,可以将一个列的数据导入到另一个列。因此,您可以提供源列和目标列之间的映射。您甚至可以在导入任务期间忽略一些源列的数据。尽管我没有创建用于定义映射的 GUI,但我通过代码创建了映射。但要创建一个交互式 GUI 让用户指定映射,这仅仅是时间问题。这是封装映射信息的类。

namespace ImportLib.Mappings
{
    /// <summary>
    ///     Encapsulates the mapping information for a source data source
    /// and a destination data source
    /// </summary>
    [Serializable()]
    public class ColumnMappingController
    {
        /// <summary>
        /// Creates a new instance
        /// </summary>
        public ColumnMappingController()
        {
            mappings = new List<Map>();
        }

        private List<Map> mappings;

        /// <summary>
        /// Get or set the mappings
        /// </summary>
        public List<Map> Mappings
        {
            get { return mappings; }
            set { mappings = value; }
        }

        private Column[] destinationColumns;

        /// <summary>
        /// Get or set the destination columns
        /// </summary>
        public Column[] DestinationColumns
        {
            get { return destinationColumns; }
            set { destinationColumns = value; }
        }

        /// <summary>
        /// Get the destination columns array where the
        /// columns are not bind to any source columns
        /// </summary>
        public Column[] UnmappedDestinationColumns
        {
            get
            {
                List<Column> unmappedColumns = new List<Column>();
                foreach (Column destinationColumn in destinationColumns)
                {   // iterate
                    if (!ContainsInDestinationMap(destinationColumn))
                    {   // if no mapping found
                        unmappedColumns.Add(destinationColumn);
                    }
                }
                return unmappedColumns.ToArray();
            }
        }

        /// <summary>
        /// Get the source columns where the columns are not bind with any
        /// destination columns
        /// </summary>
        public Column[] SuppressedSourceColumns(DataTable srcSchemaTable)
        {
            List<Column> suppressedColumns = new List<Column>();
            foreach (DataRow row in srcSchemaTable.Rows)
            {   // iterate
                string columnName = row["columnName"] as string;
                if (!ContainsInSourceMap(columnName))
                {   // if no mapping found
                    suppressedColumns.Add(new Column(columnName));
                }
            }
            return suppressedColumns.ToArray();
        }

        /// <summary>
        /// Determine if the specified source column is in suppressed
        /// list or not
        /// </summary>
        public bool IsSuppressedSourceColumn(
               string sourceColumnName, DataTable srcSchemaTable)
        {
            return Array.IndexOf<Column>(
                SuppressedSourceColumns(srcSchemaTable),
                new Column(sourceColumnName)) > -1;
        }

        /// <summary>
        /// Get the destination column for a given source column
        /// </summary>
        /// <param name="sourceColumnName">The
        /// specified source column</param>
        /// <returns>An instance of <see
        /// cref="T:ImportLib.Mappings.Column"/>
        /// which is the destination for the given source column
        /// name.</returns>
        public Column GetDestinationColumn(string sourceColumnName)
        {
            foreach (Map map in mappings)
            {   // iterate
                if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
                    return map.DestinationColumn;
            }
            throw new ApplicationException
            ("No mapping defined for the source column " +
                  sourceColumnName);
        }

        /// <summary>
        /// Determines if the specified column is contains into the
        /// mapping as a source
        /// </summary>
        /// <param name="destinationColumn">The source
        /// column</param>
        /// <returns>
        /// <c>true</c> if the source column found into
        /// the mapping, <c>false</c> otherwise.</returns>
        private bool ContainsInSourceMap(string sourceColumnName)
        {
            foreach (Map map in mappings)
            {   // iterate
                if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
                    return true;
            }
            return false;
        }

        /// <summary>
        /// Determines if the specified column is contains into the
        /// mapping as a destination
        /// </summary>
        /// <param name="destinationColumn">
        /// The destination column</param>
        /// <returns><c>true</c>
        /// if the destination column found into the mapping,
        ///  <c>false</c> otherwise.</returns>
        private bool ContainsInDestinationMap(Column destinationColumn)
        {
            foreach (Map map in mappings)
            {   // iterate
                if (map.DestinationColumn.Equals(destinationColumn))
                    return true;
            }
            return false;
        }

        /// <summary>
        /// Validate the content
        /// </summary>
        private void Validate()
        {
            //if (sourceColumns == null)
             throw
              new NullReferenceException
          ("SourceColumns is not set to an instance of an object");
            if (destinationColumns == null)
               throw
            new NullReferenceException(
             "DestinationColumns is
                 not set to an instance of an object");
        }
    }
}

您会看到,当我创建一个导入作业时,我会将这个类的实例传递给导入管理器。因此,您可以从代码中创建这个实例(就像我所做的那样),或者让您的用户从 GUI 中创建它。

希望您会喜欢这个!

关注点

我还没有编写任何可以执行数据转换的代码,例如,您需要在数据被导入到目标数据存储之前对其进行一些处理。我希望很快能在这方面编写一些代码,然后我将尝试发布一篇关于该主题的文章。

© . All rights reserved.