65.9K
CodeProject 正在变化。 阅读更多。
Home

深入研究 SSIS 对象模型

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (11投票s)

2007 年 5 月 19 日

CPOL

14分钟阅读

viewsIcon

277184

downloadIcon

1012

描述 SQL Server 2005 Integration Service 对象模型和不同的数据流组件。

引言

Microsoft SQL Server 2005 Integration Services (SSIS) 是一个用于构建高性能数据集成解决方案的平台,其中包括用于数据仓库的提取、转换和加载 (ETL) 包。(更多信息请阅读 MSDN)

Business Intelligence (BI) Studio 提供用于设计和调试包的图形化工具;用于执行 FTP 操作、执行 SQL 语句或发送电子邮件消息等工作流功能的任务;用于提取和加载数据的源和目标;用于清理、聚合、合并和复制数据的转换。SSIS 附带一套丰富的应用程序接口 (API),可用于编程 Integration Services 对象模型。

Integration Services 取代了 Data Transformation Services (DTS),DTS 最早作为 SQL Server 7.0 的组件引入。

在本文中,我们将探讨 SSIS API 以及如何将其用于我们的目的。

背景

几个月前,我不得不处理 SSIS API 以实现一个自定义导入向导(类似于 SQL Server 导入向导),在那段时间里我探索了这些 API。我发现它比其前一版本(又名 DTS)有了很大的改进。

不幸的是,我没有找到足够多的关于这些 API 的代码示例,这些示例可以帮助我更快地完成业务工作。因此,我写了一篇 文章,解释了如何以编程方式在 Code Project 中创建 SSIS 包。我个人收到了几封要求我进一步解释的电子邮件。因此,我正在写另一篇文章。

快速架构概述

SSIS Architecture (source MSDN)

Integration Services (SSIS) 架构 (图片来源:MSDN)

Integration Services (SSIS) 架构由几个容器和任务组件组成。这些元素可以分为四个部分

  • Integration Services 服务
  • Integration Services 对象模型
  • Integration Services 运行时
  • Integration Services 数据流

第一个是 Integration Services 服务,它是一个 Windows 服务,用于监视正在运行的 SSIS 包并管理包的存储。它在 SQL Server Management Studio 中可用。

SSIS 对象模型是一套托管的 API,允许我们编写自定义应用程序、命令行实用程序等。

SSIS Runtime 是一个运行 SSIS 包、管理事务等的环境。运行时可执行文件是运行在该环境中的最小部分。例如,包、容器、任务、事件处理程序,所有这些都是运行时可执行文件。

数据流是 SSIS 架构中最重要(也是最关键)的部分。它包含一个数据流引擎,该引擎管理数据流组件。SSIS 安装附带了大量数据流组件。这些可以分为三类——源组件(从系统中提取数据)、转换组件(对提取的数据执行转换、修改)和加载组件(仅将数据加载到目标系统中)。除了可用的数据流组件外,我们还可以编写自己的自定义数据流组件来满足任何自定义需求。

虽然架构代码是用原生语言(C++、COM)编写的,但我们可以使用任何 CLR 语言来开始使用这些 API 进行编程。

有关 SSIS 架构的详细知识,请访问 MSDN

开始编写一个试点项目

我相信,为了理解细节,最好先实现一个“试点”应用程序。因此,让我们实现一个简单的包。对于这个试点项目,我们将使用 C#。

Pilot Package

设计器中的试点包

我将创建一个简单的 SSIS 包,其中包含一个数据流任务。该数据流任务包含两个数据流组件。一个源数据流组件读取源(CSV 文件),一个目标组件将数据加载到目标(SQL Server 数据库 - 在本示例中)。

Dataflow task details

数据流组件

在编写任何代码之前,让我们创建一个逗号分隔文件,我们将用它作为数据源。所以创建一个名为 *Sample.csv* 的文件,并将以下数据粘贴到文件中,然后保存。

"Name","Age","JoinDate","Salary","Retired" 
"Jon Tuite","45","2004-01-03 00:00:00","55.265","False" 
"Linda Hamilton","25","2002-01-03 00:00:00","155.265","False" 
"Sen Seldiver","42","2002-01-03 00:00:00,","458.2","True" 
"Rose Dalson","25","2004-01-03 00:00:00","55.265","False" 

这是一个由逗号分隔的 CSV 文件。文件用双引号 (") 作为文本限定符。

打开 Visual Studio .NET 2005 并创建一个新应用程序(你可以使用 Windows 应用程序或控制台应用程序,随你喜欢)。现在添加一些程序集引用,这些是针对 SSIS 对象模型的。程序集是

  • Microsoft.SQLServer.DTSPipelineWrap
  • Microsoft.SQLServer.DTSRuntimeWrap
  • Microsoft.SQLServer.ManagedDTS
  • Microsoft.SQLServer.PipelineHost

我们的项目引用将看起来像这样

References

创建一个名为 ImportPackage 的新类,并在 *ImportPackage.cs* 代码文件中添加以下指令。

using Microsoft.SqlServer.Dts.Runtime;
using PipeLineWrapper = Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;

我们现在将声明一些稍后将需要的成员变量。

private Package package;
private ConnectionManager flatFileConnectionManager;
private ConnectionManager destinationDatabaseConnectionManager;
private Executable dataflowTask;

在类中创建一个名为 CreatePackage 的方法。

package = new Package();       // create a new package
package.CreationDate = DateTime.Now;// set the creation time stamp
package.ProtectionLevel = DTSProtectionLevel.DontSaveSensitive;
package.Name = "Some name";
package.Description = "A simple package";
package.DelayValidation = true;
package.PackageType =
Microsoft.SqlServer.Dts.Runtime.
DTSPackageType.DTSDesigner90;

所以这个方法简单地实例化一个新包变量并为一些初始属性设置值。

根据 SSIS 设计器中创建的图(我们之前看到过),我们现在将创建一个数据流任务。数据流任务将包含两个数据流组件(源和目标)——这两个组件都需要一个连接组件来提取和加载数据到源和目标。所以,让我们先创建源的连接组件。

这里有一点值得一提,对于平面文件连接管理器,我们需要显式创建源列(CSV 文件中可用的列)。我们将编写一个名为 CreateSourceColumns() 的方法来完成此目的。你可能已经猜到,我们需要从源文件中读取可用的列名来实现这一点。对于这个试点应用程序,我们只是在代码中硬编码了这些。但它应该使用 .NET IO API 从文件中读取(你可以使用 Microsoft.Jet.OLEDB.4.0 对象作为 .NET IO API 的替代)。为了硬编码可用的列名,我在类中使用了一个 string 集合作为成员变量。

private List<string> srcColumns = new List<string>();

并在构造函数方法中填充其值

public ImportPackage()
{
     srcColumns.Clear();
     // Apparently we are hardcoding this. But it is supposed to be
     // read from the source file using IO APIs
     srcColumns.Add("\"Name\"");
     srcColumns.Add("\"Age\"");
     srcColumns.Add("\"JoinDate\"");
     srcColumns.Add("\"Salary\"");
     srcColumns.Add("\"Retired\"");
}

private void CreateFlatFileConnection()
{
     string flatFileName = @"C:\Sample.csv";
     string FlatFileMoniker = @"FLATFILE";
     flatFileConnectionManager =
         package.Connections.Add(FlatFileMoniker);
     flatFileConnectionManager.ConnectionString =
         flatFileName;
     flatFileConnectionManager.Name =
         "SSIS Connection Manager for Files";
     flatFileConnectionManager.Description =
         string.Concat("SSIS Connection Manager");
     // Setting some common properties of the
     // connection manager object
     flatFileConnectionManager.Properties
              ["ColumnNamesInFirstDataRow"]
              .SetValue(flatFileConnectionManager,
              true);
     flatFileConnectionManager.
        Properties["Format"].
         SetValue(flatFileConnectionManager, "Delimited");
     flatFileConnectionManager.
        Properties["HeaderRowDelimiter"].
         SetValue(flatFileConnectionManager, "\r\n");
     // If user has been specified a text qualifier
     // then put it into the connection string property
     flatFileConnectionManager.Properties
      ["TextQualifier"].
       SetValue(flatFileConnectionManager, "\"");

     // create the source columns into the connection manager
     CreateSourceColumns();
}

private void CreateSourceColumns()
{
     // get the actual connection manager instance
     RuntimeWrapper.IDTSConnectionManagerFlatFile90
        flatFileConnection =
        flatFileConnectionManager.InnerObject as
      RuntimeWrapper.IDTSConnectionManagerFlatFile90;

     RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
     RuntimeWrapper.IDTSName90 name;

     // trace the current count
     Debug.WriteLine(flatFileConnection.Columns.Count);

     foreach (String colName in srcColumns)
     {   // iterate
         // now create a new column for the connection manager
         column =
        flatFileConnection.Columns.Add();
          // if this is the last row
         if (srcColumns.IndexOf(colName) == (srcColumns.Count - 1))
             column.ColumnDelimiter =
             "\r\n";// add the row delimiter
         else
             column.ColumnDelimiter = ",";

         name = (RuntimeWrapper.IDTSName90)column;
         name.Name = colName.Replace("\"","");
         column.TextQualified = true;
         column.ColumnType =
            "Delimited";
         column.DataType =
         Microsoft.SqlServer.Dts.Runtime.
             Wrapper.DataType.DT_STR;
         column.ColumnWidth = 0;
         column.MaximumWidth = 255;
         column.DataPrecision = 0;
         column.DataScale = 0;
     }
}

你可能已经注意到我们正在使用 monikers 创建 COM 对象。现在我们将为目标数据流组件创建连接管理器对象。

private void CreateDestinationDatabaseConnection()
{
    string OleDBMoniker =
         @"OLEDB";
    string ConnectionString =
        @"Data Source=MOIM023;
       Initial Catalog=ImportDatabase;
       Integrated Security=SSPI;Provider=SQLNCLI;
       Auto Translate=false;";
    // Creating a connection using the oledb moniker
    destinationDatabaseConnectionManager
        = package.Connections.Add(OleDBMoniker);
    destinationDatabaseConnectionManager.ConnectionString
            = ConnectionString;
    destinationDatabaseConnectionManager.Name
            = "SSIS Connection Manager for Oledb";
    destinationDatabaseConnectionManager.Description
       = string.Concat("SSIS Connection Manager for OLEDB");
}

到目前为止一切顺利,现在是时候创建数据流组件了。我正在编写一个名为 CreateDataFlowTask() 的方法,它将创建和初始化数据流组件。

在编写这个数据流任务创建方法之前,让我们创建一个像下面这样的 Column 类

public class Column
{
    private string name;

    public string Name
    {
        get { return name; }
        set { name = value; }
    }
    private Microsoft.SqlServer.Dts.
        Runtime.Wrapper.DataType dataType;

    public Microsoft.SqlServer.Dts.
        Runtime.Wrapper.DataType DataType
    {
        get { return dataType; }
        set { dataType = value; }
    }
    private int length;

    public int Length
    {
        get { return length; }
        set { length = value; }
    }

    private int precision;

    public int Precision
    {
        get { return precision; }
        set { precision = value; }
    }

    private int scale;

    public int Scale
    {
        get { return scale; }
        set { scale = value; }
    }

    private int codePage = 0;

    public int CodePage
    {
        get { return codePage; }
        set { codePage = value; }
    }
}

让我们在 ImportPackage 类中创建一个方法,这是一个实用方法,用于帮助确定目标列的数据类型长度等。

private Column GetTargetColumnInfo(string sourceColumnName)
{
    Column cl = new Column();
    if( sourceColumnName.Contains("Name"))
    {
        cl.Name = "Name";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.
            Wrapper.DataType.DT_STR;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 255;
        cl.CodePage = 1252;
    }
    else if(
        sourceColumnName.Contains("Age"))
    {
        cl.Name = "Age";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("JoinDate"))
    {
        cl.Name = "JoinDate";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("Salary"))
    {
        cl.Name = "Salary";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC;
        cl.Precision = 6;
        cl.Scale = 3;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("Retired"))
    {
        cl.Name = "Retired";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    return cl;
}

现在我们可以编写数据流任务创建方法了。

private void CreateDataFlowTask()
{
    string dataFlowTaskMoniker = "DTS.Pipeline.1";
    dataflowTask = package.Executables.Add(dataFlowTaskMoniker);

这里我们使用 moniker 创建一个数据流任务可执行文件。数据流任务是一个可执行文件,它可以包含一个或多个数据流组件(即源、转换或目标组件)。

通常,所有数据流组件都有两个集合属性。一个是输入集合,另一个是输出集合。顾名思义,输入集合包含一个或多个输入(可以来自不同的源),输出集合对象包含一个或多个输出(可以为不同的目标提供数据)和一个错误输出——其中包含组件在数据操作期间发生的错误。

现在我们可以在此 dataflowTask 对象下创建这些数据流组件。

PipeLineWrapper.IDTSComponentMetaData90 sourceComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
sourceComponent.Name
        = "Source File Component";
string SourceDataFlowComponentID =
        "{90C7770B-DE7C-435E-880E-E718C92C0573}";
// using the CLSID to instantiate
sourceComponent.ComponentClassID =
        SourceDataFlowComponentID;

数据流组件是 COM 对象。当我们调用 TaskHostComponentMetaDataCollection.New() 时,它会创建一个通用的组件元数据对象,但它还没有填充源组件所需的所有特定属性。接下来的两行将实际创建一个特定数据流组件的实例(在本例中是一个源组件 - 根据给定的 CLSID)。

// load the COM for the given GUID
PipeLineWrapper.CManagedComponentWrapper
    managedFlatFileInstance
        = sourceComponent.Instantiate();
// get the populate the properties
managedFlatFileInstance.ProvideComponentProperties();

源组件需要引用我们之前创建的连接管理器对象。

// putting the connection
if (sourceComponent.RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    sourceComponent.RuntimeConnectionCollection[0]
        .ConnectionManagerID =
        flatFileConnectionManager.ID;
    sourceComponent.RuntimeConnectionCollection[0].
        ConnectionManager =
        DtsConvert.ToConnectionManager90
        (flatFileConnectionManager);
}
// establish a connection
managedFlatFileInstance.AcquireConnections(null);
// Initialize the metadata
managedFlatFileInstance.ReinitializeMetaData();

managedFlatFileInstance.ReinitializeMetaData() 将从连接管理器对象读取输入列名,并自动在其输入集合对象中填充一个输入对象。你可以检查 sourceComponent.InputCollectionCount 属性,你会发现它是 1。现在我们需要创建输入和输出列之间的映射——这样组件的输出就包含了它将从源文件中读取的值。

重要的是要理解,包中的所有对象都有一个 ID。事实上,每个列(输出或输入)都有一个在包范围内唯一的 ID。我将使用一个字典对象来跟踪每个输出列 ID 和其名称。稍后我们将需要此信息。

Dictionary<string, int> outputColumnLineageIDs
    = new Dictionary<string,int>();

// create the mapping now
PipeLineWrapper.IDTSExternalMetadataColumn90 exOutColumn;
foreach (PipeLineWrapper.IDTSOutputColumn90 outColumn in
    sourceComponent.OutputCollection[0].OutputColumnCollection)
{   // create the MAP
    exOutColumn =
        sourceComponent.OutputCollection
        [0].ExternalMetadataColumnCollection[outColumn.Name];
    // map it
    managedFlatFileInstance.MapOutputColumn(
        sourceComponent.OutputCollection[0].ID,
        outColumn.ID, exOutColumn.ID, true);

    outputColumnLineageIDs.Add(outColumn.Name, outColumn.ID);
}
// Release the connection now
managedFlatFileInstance.ReleaseConnections();

因此,源数据流组件现在可以使用了。让我们现在创建目标数据流组件。在创建数据目标组件之前,我们需要创建将作为目标的数据库和数据表。所以,让我们使用以下 SQL 脚本创建数据表。(为了简单起见,我们手动创建数据库——但这应该以编程方式创建。但在调用目标数据流组件的 ReinitializeMetaData() 之前它必须存在,否则它将生成一个错误。)

/****** Object:  Table [dbo].[DataTable]    ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[DataTable](
    [Name] [varchar](50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Age] [varchar](50),
    [JoinDate] [varchar](50),
    [Salary] [varchar](50) NULL,
    [Retired] [varchar](50) NULL
) ON [PRIMARY]

GO
SET ANSI_PADDING OFF

现在我们可以创建数据目标组件了。

string OleDBDestinationDataFlowComponentID
    = "{E2568105-9550-4F71-A638-B7FE42E66922}";
PipeLineWrapper.IDTSComponentMetaData90
    datadestinationComponent =
    ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
datadestinationComponent.Name =
    "Data Destination Component";
datadestinationComponent.ComponentClassID =
    OleDBDestinationDataFlowComponentID;
// get the COM instance
managedOleInstanceDestinationComponent =
    datadestinationComponent.Instantiate();
// populate the properties
managedOleInstanceDestinationComponent
    .ProvideComponentProperties();

数据目标组件已创建。我们需要在源组件的输出和目标组件的输入之间创建一个路径,以便从源组件输出的数据直接馈送到目标组件的输入。

PipeLineWrapper.IDTSPath90 pathDD
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDD.AttachPathAndPropagateNotifications(
    sourceComponent.OutputCollection[0],
    destinationComponent.InputCollection[0]);

我们需要准备目标组件的输出,以便可以将数据写入目标数据库。

string DatabaseName = "ImportDatabase";
string TableName = "DataTable";
// setting the connection
if (destinationComponent
    .RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    destinationComponent
    .RuntimeConnectionCollection[0].ConnectionManagerID =
        destinationDatabaseConnectionManager.ID;
    destinationComponent
    .RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert
        .ToConnectionManager90(destinationDatabaseConnectionManager);
}
// Set the custom properties
managedOleInstanceDestinationComponent.
SetComponentProperty(
    "AccessMode", 0);// Table of View mode
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("AlwaysUseDefaultCodePage", false);
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadKeepNulls", false);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadMaxInsertCommitSize", 0);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, TableName));

// Establish a connection
managedOleInstanceDestinationComponent.
    AcquireConnections(null);
// initialize the metadata
managedOleInstanceDestinationComponent.
    ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    destinationComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn
    in vInput.VirtualInputColumnCollection)
{
    if (outputColumnLineageIDs
        .ContainsKey(vColumn.LineageID))
    {   // if the column came from the derived
        // column dataflow component
        managedOleInstanceDestinationComponent.
            SetUsageType(
            input.ID, vInput,
            vColumn.LineageID,
            PipeLineWrapper.DTSUsageType.UT_READONLY);
    }
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    destinationComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn
        = destinationComponent.
        InputCollection[0]
        .ExternalMetadataColumnCollection[
        inColumn.Name.Replace(
        "\"","")];

    // our mapped column
    Column mappedColumn
        = GetTargetColumnInfo(exColumn.Name);
    string destName = mappedColumn.Name;
    // setting the new name
    exColumn.Name = destName;

    // creating the mapping
    managedOleInstanceDestinationComponent
        .MapInputColumn(
        destinationComponent.InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceDestinationComponent.ReleaseConnections();

试点包项目够了。如果我们运行应用程序,它将把数据从 CSV 移动到目标数据库。

使用数据转换组件

到目前为止,我们的包假定目标列与源列具有相同的数据类型(这就是为什么我们的数据表的列被定义为 varchar)。现在我们将修改包,以便在将数据库移动到目标组件之前更改数据类型。

为了更改数据类型,我们需要使用另一个数据流组件——数据转换组件。所以我们的目标包将如下所示

Incorporating data conversion component

我们需要创建一个新的数据流组件(数据转换组件),并将其放置在源和目标之间。因此,我们现在将修改 CreateDataFlowTask() 方法。让我们将以下行插入到 managedFlatFileInstance.ReleaseConnections(); // close 行之后。

string DataConversionDataflowComponentID
    = "{C3BF62C8-7C5C-4F85-83C3-E0B6F6BE267C}";
PipeLineWrapper.IDTSComponentMetaData90
    dataconversionComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe)
        .ComponentMetaDataCollection.New();
dataconversionComponent.Name
    = "Data conversion Component";
dataconversionComponent.ComponentClassID =
    DataConversionDataflowComponentID;
managedOleInstanceDataConversionComponent =
    dataconversionComponent.Instantiate();
managedOleInstanceDataConversionComponent
    .ProvideComponentProperties();
dataconversionComponent.InputCollection
    [0].ExternalMetadataColumnCollection.IsUsed
        = false;
dataconversionComponent
    .InputCollection[0].HasSideEffects = false;

我们需要在源组件的输出和数据转换组件的输入之间创建一个路径。

PipeLineWrapper.IDTSPath90 path =
    ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).PathCollection.New();
path.AttachPathAndPropagateNotifications(
    sourceComponent.OutputCollection[0],
        dataconversionComponent.InputCollection[0]);

现在是时候配置数据转换组件了。

// Get the derived's default input and virtual input.
PipeLineWrapper.IDTSInput90 input =
    dataconversionComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();

// Iterate through the virtual input column collection.
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn
    in vInput.VirtualInputColumnCollection)
{
    managedOleInstanceDataConversionComponent
        .SetUsageType(
        input.ID, vInput, vColumn.LineageID,
        PipeLineWrapper.DTSUsageType.UT_READONLY);
}

// putting the truncation row disposition
dataconversionComponent.OutputCollection[0]
    .TruncationRowDisposition =
        PipeLineWrapper.DTSRowDisposition.RD_NotUsed;
// putting the error row disposition
dataconversionComponent
    .OutputCollection[0].ErrorRowDisposition =
        PipeLineWrapper.DTSRowDisposition.RD_NotUsed;
// get the output column collection reference
PipeLineWrapper.IDTSOutput90 output =
    dataconversionComponent.OutputCollection[0];

foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    dataconversionComponent.InputCollection
    [0].InputColumnCollection)
{   // create the map
    // get the target column from the mapping information
    PipeLineWrapper.IDTSOutputColumn90 outputColumn =
        dataconversionComponent.OutputCollection[0]
        .OutputColumnCollection.New();
        outputColumn.Name = inColumn.Name;
    Column targetColumn
        = GetTargetColumnInfo(inColumn.Name);
    // find out the length of the column
    int length = targetColumn.Length;
    // get the precision of the target column
    int precision = targetColumn.Precision;
    // get the scale of the target column
    int scale = targetColumn.Scale;
    // get the SSSIS complaint datatype from the given mappings
    Microsoft.SqlServer.Dts.Runtime.
        Wrapper.DataType dataType
            = targetColumn.DataType;

    // setting the data properties
    outputColumn.SetDataTypeProperties
        (dataType, length, precision,
        scale, targetColumn.CodePage);
    // putting the external metadata column id to zero
    outputColumn.ExternalMetadataColumnID = 0;
    outputColumn.ErrorRowDisposition
        = PipeLineWrapper.
        DTSRowDisposition.RD_RedirectRow;
    outputColumn.TruncationRowDisposition
        = PipeLineWrapper.DTSRowDisposition.RD_RedirectRow;

    PipeLineWrapper.IDTSCustomProperty90 property
        = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "SourceInputColumnLineageID";
    property.Value
        = GetSourceColumnLineageID(targetColumn.Name);
    property
        = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "FastParse";
    property.Value = false;

    // Now we are preserving the Lineage id into a list.
    // you know, when later we will configure the
    // dataflowcomponent of SQL destination
    // then, we will find all
    // the inputs (the input came from flat file
    // and the inputs
    // came from the derived columns output).
    // And we need to distinguish among them.
    // we will only consider those inputs into the data
    // destination component, where the
    // inputs are coming from
    // the output of the derived column component
    // which is actually here.
    derivedLineageIdentifiers[outputColumn.LineageID] =
        outputColumn.Name;
}

数据转换组件配置现已准备就绪。让我们在数据转换组件和数据目标组件之间创建一个路径以完成数据流。

PipeLineWrapper.IDTSPath90 pathBetweenDCandDD
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDD.AttachPathAndPropagateNotifications(
    dataconversionComponent.OutputCollection[0],
    destinationComponent.InputCollection[0]);

我们还需要更新数据目标配置代码。基本上,我们需要使用 derivedLineageIdentifiers 来检索源 lineage id 并更新目标组件的外部元数据列的数据类型。

在此之前,让我们快速将 SQL 数据库中的数据表结构更改为如下所示

CREATE TABLE [dbo].[DataTable](
    [Name] [varchar](50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Age] [int] NULL,
    [JoinDate] [datetime] NULL,
    [Salary] [numeric](18, 9) NULL,
    [Retired] [bit] NULL
) ON [PRIMARY]

请注意,我们现在使用的是不同的数据类型,而不是所有列都使用 varchar

// setting the connection
if (destinationComponent.RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    destinationComponent.RuntimeConnectionCollection
        [0].ConnectionManagerID =
        destinationDatabaseConnectionManager.ID;
    destinationComponent
        .RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert.ToConnectionManager90
        (destinationDatabaseConnectionManager);
}
// Set the custom properties.
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("AccessMode", 0);// Table of View mode
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("AlwaysUseDefaultCodePage",
    false); // Default Codepage
managedOleInstance.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceDestinationComponent
    .SetComponentProperty("FastLoadKeepNulls", false);
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("FastLoadMaxInsertCommitSize", 0);
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceDestinationComponent
    .SetComponentProperty("OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, TableName));
// Establish a connection
managedOleInstanceDestinationComponent
    .AcquireConnections(null);
// initialize the metadata
managedOleInstanceDestinationComponent
    .ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    destinationComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    // Remember, we will find all columns here into
    // the 'vInput.VirtualInputColumnCollection'
    // Which is, if the total columns count into the flat file is
    // 6 then here you will
    // get 12 ( 6 * 2 ) columns into
    // the 'vInput.VirtualInputColumnCollection'. Because,
    // the data derived column usually provides all the inputs as
    // its outputs along with the outputs
    // that it really creates. And here we need to consider
    // only those inputs which came
    // from the derived column component (not from the flat file
    // source component).
    // How can we do that? we can do this by checking the
    // lineageid that we did populate during
    // the derived column creation process.
    if (derivedLineageIdentifiers.ContainsKey(vColumn.LineageID))
    {   // if the column came from the derived column dataflow
        // component
        managedOleInstanceDestinationComponent.
            SetUsageType(
            input.ID, vInput,
            vColumn.LineageID,
            PipeLineWrapper.DTSUsageType.UT_READONLY);
    }
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    destinationComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn = destinationComponent
        .InputCollection[0]
        .ExternalMetadataColumnCollection
        [inColumn.Name
        .Replace("\"","")];
    // our mapped column
    Column mappedColumn = GetTargetColumnInfo(exColumn.Name);
    string destName = mappedColumn.Name;
    // setting the new name
    exColumn.Name = destName;
    // setting the datatype
    exColumn.DataType =
        mappedColumn.DataType;  // Now we are
    // changing the data types here!
    // creating the mapping
    managedOleInstanceDestinationComponent.
        MapInputColumn(destinationComponent
        .InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceDestinationComponent.ReleaseConnections();

完成!现在我们的包可以在数据导入过程中更改数据类型。

处理错误数据行

现在我们来考虑错误数据行。假设 CSV 包含以下数据

"Name","Age","JoinDate","Salary","Retired"
"Jon Tuite","45","","25.3","False"
"Linda Hamilton","25",2002-01-03 00:00:00,"BAD DATA","True"
"Sen Seldiver","42","2002-01-03 00:00:00,","458.2","True"
"Jon Clerk","DAMAGE","2001-01-03 00:00:00","455.265","False"
"Rose Dalson","25","2004-01-03 00:00:00","55.265","False" 

这里请注意,第一行在 JoinDate 字段中缺少数据,第二行在 Salary 字段中包含错误数据。因此,数据转换组件将无法转换这两行。现在,如果我们想在数据流任务处理过程中报告这些错误行,那么我们需要在包中创建一个额外的目标数据流组件。这个额外的目标组件将跟踪错误行并将它们移到一个 SQL 数据库。为了实现这一点,让我们再次修改代码库。现在我们期望的包将如下所示

Error Component

你可能已经猜到,我们将使用另一个 OLEDB 目标组件来跟踪错误行。所以,让我们为新的错误目标组件创建一个连接管理器(我们可以使用以前的——但我们创建一个新的,以便错误可以移到任何其他数据库而不是目标数据库)。

private void CreateErrorDatabaseConnection()
{
    string OleDBMoniker = @"OLEDB";
    errorDatabaseConnectionManager
        = package.Connections.Add(OleDBMoniker);
    errorDatabaseConnectionManager.ConnectionString
        = ConnectionString;
    errorDatabaseConnectionManager.Name
        = "SSIS error Connection Manager for Oledb";
    errorDatabaseConnectionManager.Description
        = string.Concat("SSIS Connection Manager for ");
}

我们需要创建另一个表来跟踪错误行。所以,让我们使用以下 SQL 脚本创建一个表

CREATE TABLE [ErrorTable] (
    [Name] VARCHAR(50),
    [Age] VARCHAR(50),
    [JoinDate] VARCHAR(50),
    [Salary] VARCHAR(50),
    [Retired] VARCHAR(50),
    [ErrorCode] INTEGER,
    [ErrorColumn] INTEGER
)

我们将修改 CreateDataFlowTask() 来实现这个错误处理功能。让我们将以下代码附加到该方法中。

PipeLineWrapper.IDTSComponentMetaData90 errorTrackerComponent =
    ((dataflowTask as TaskHost).InnerObject
    as PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
errorTrackerComponent.ComponentClassID =
    OleDBDestinationDataFlowComponentID;

// get the COM instance
managedOleInstanceErrorComponent =
    errorTrackerComponent.Instantiate();
// populate the properties
managedOleInstanceErrorComponent
    .ProvideComponentProperties();
errorTrackerComponent.Name
    = "Error Tracker component";

自然,我们需要在数据转换错误输出和错误目标组件的输入之间建立一条路径。所以这样做

PipeLineWrapper.IDTSPath90 pathDE
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDE.AttachPathAndPropagateNotifications(
    dataconversionComponent.OutputCollection[1],
    errorTrackerComponent.InputCollection[0]);

正如我们在文章前面讨论过的,输出集合包含两个输出;一个是数据,另一个是错误输出。所以我们创建了带有 dataconversionComponent.OutputCollection[1] 的新路径。

现在我们将配置错误目标组件。

// setting the connection
if (errorTrackerComponent.RuntimeConnectionCollection.
    Count > 0)
{   // If connection is necessary
    errorTrackerComponent
        .RuntimeConnectionCollection[0].ConnectionManagerID =
        errorDatabaseConnectionManager.ID;
    errorTrackerComponent.
        RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert.ToConnectionManager90
        (errorDatabaseConnectionManager);
}
// Set the custom properties
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("AccessMode",
    0);// Table of View mode
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("AlwaysUseDefaultCodePage",
    false); // Default Codepage
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceErrorComponent.
    SetComponentProperty(
    "FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("FastLoadKeepNulls", false);
managedOleInstanceErrorComponent.
    SetComponentProperty(
    "FastLoadMaxInsertCommitSize", 0);
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, ErrorTableName));
// Establish a connection
managedOleInstanceErrorComponent.AcquireConnections(null);
// initialize the metadata
managedOleInstanceErrorComponent.ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    errorTrackerComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    managedOleInstanceErrorComponent.SetUsageType(
        input.ID, vInput, vColumn.LineageID,
        PipeLineWrapper. DTSUsageType.UT_READONLY);
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    errorTrackerComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn
        = errorTrackerComponent.InputCollection[0].
        ExternalMetadataColumnCollection
        [inColumn.Name.Replace("\"","")];

    exColumn.DataType = Microsoft.SqlServer.
        Dts.Runtime.Wrapper.DataType.DT_STR;

    // creating the mapping
    managedOleInstanceErrorComponent.
        MapInputColumn(errorTrackerComponent.
        InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceErrorComponent.ReleaseConnections();

错误组件通常将错误代码和错误列 lineage id 以及整个行数据存储在一起。因此,错误数据表包含 CSV 文件中的列,以及两个额外的列(errorCodeerrorColumn)。

这样,我们只会找到错误代码和输出列的错误行 lineage ID(它只是一个标识符)。但我们可以编写一些额外的代码来找出错误描述以及错误行的名称而不是其标识符。我在这上面发现了一篇有趣的 文章

现在,如果我们运行应用程序,它将把那两行错误移动到错误表中,并将所有其他行移动到数据表中。

使用派生列组件

我们上次讨论的 CSV 文件包含两行错误。我们目前的实现会将两行都移到错误目标表。但是,如果我们希望将第一行移到目标数据库而不是将其移到错误目标呢?例如,假设我们的目标数据表架构规定 JoinDatesalary 字段可以包含 null。但我们目前的实现永远不会将缺失数据行移到目标数据库。它总是会因为转换失败而将它们视为错误行。因此,对于一些业务来说,这可能确实是一个问题。为了解决这个问题,我们可以用派生列组件替换数据转换组件来解决这个问题。派生列组件是一个很棒的组件,主要用于转换数据。派生列转换通过对转换输入列应用表达式来创建新的列值。表达式可以包含变量、函数、运算符和来自转换输入的列的任何组合。结果可以添加为新列,也可以作为替换值插入到现有列中。派生列转换可以定义多个派生列,任何变量或输入列都可以出现在多个表达式中。有关更多详细信息,请阅读 MSDN

为了用派生列组件替换数据转换组件,我们需要对我们到目前为止开发的现有代码进行一些小的更改。

Derived Column Component

创建派生列组件与创建数据转换组件完全相同,除了组件类 ID。派生列组件的组件类 ID 是 "{9CF90BF0-5BCC-4C63-B91D-1F322DC12C26}"。

string DerivedColumnDataflowComponentID
    = "{9CF90BF0-5BCC-4C63-B91D-1F322DC12C26}";
PipeLineWrapper.IDTSComponentMetaData90
    derivedColumnComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
derivedColumnComponent.Name
    = "Derived Column Component";
        derivedColumnComponent.ComponentClassID =
        DerivedColumnDataflowComponentID;
managedOleInstanceDerivedColumnComponent =
    derivedColumnComponent.Instantiate();

managedOleInstanceDerivedColumnComponent.
    ProvideComponentProperties();
derivedColumnComponent.InputCollection[0].
    ExternalMetadataColumnCollection.IsUsed =
        false;
derivedColumnComponent.
    InputCollection[0].HasSideEffects = false;

正如你所看到的,组件创建与我们为数据转换组件创建的几乎相同。现在我们将编写派生列组件的配置代码。这与我们配置数据转换组件的方式相同——除了我们将在这里创建一个新属性用于表达式。配置代码如下

CManagedComponentWrapper managedOleInstance =
    derivedColumnDataFlowComponent.ComponentInstance;
// Get the derived's default input and virtual input.
IDTSInput90 input =
    derivedColumnDataFlowComponent.
InnerObject.InputCollection[0];
IDTSVirtualInput90 vInput = input.GetVirtualInput();

Dictionary<string, int> lineAgeIDs
    = new Dictionary<string, int>();

// Iterate through the virtual input column collection
foreach (IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    managedOleInstance.SetUsageType(
    input.ID, vInput, vColumn.LineageID,
    DTSUsageType.UT_READONLY);
    lineAgeIDs[vColumn.Name] = vColumn.LineageID;
}
// putting the truncation row disposition
derivedColumnDataFlowComponent
    .InnerObject.OutputCollection[0].TruncationRowDisposition =
    DTSRowDisposition.RD_NotUsed;
// putting the error row disposition
derivedColumnDataFlowComponent.
    InnerObject.OutputCollection[0].ErrorRowDisposition =
    DTSRowDisposition.RD_NotUsed;
// get the output column collection reference
IDTSOutput90 output = derivedColumnDataFlowComponent
    .InnerObject.OutputCollection[0];
foreach (IDTSInputColumn90 inColumn in
    derivedColumnDataFlowComponent.
    InnerObject.InputCollection[0].InputColumnCollection)
{   // create the map
    IDTSOutputColumn90 outputColumn =
        derivedColumnDataFlowComponent.
        InnerObject.
        OutputCollection[0].OutputColumnCollection.New();
    outputColumn.Name = inColumn.Name;
    Column targetColumn = GetTargetColumnInfo(inColumn.Name);
    // find out the length of the column
    int length = targetColumn.ColumnLength;
    // get the precision of the target column
    int precision = targetColumn.Precision;
    // get the scale of the target column
    int scale = targetColumn.Scale;
    // get the SSSIS complaint datatype from the given mappings
    Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType
        dataType = targetColumn.DataType;
    int codePage = 0;
    if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR)
    {
        precision = 0;
        scale = 0;
        codePage = 1252;
    }
    else if(dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC)
    {
        length = 0;
    }
    else if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE)
    {
        length = 0;
        precision = 0;
        scale = 0;
    }
    else if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL)
    {
        length = 0;
        precision = 0;
        scale = 0;
    }

    // setting the data properties
    outputColumn.SetDataTypeProperties
        (dataType, length, precision, scale, codePage);
    // putting the external metadata column id to zero
    outputColumn.ExternalMetadataColumnID = 0;
    outputColumn.ErrorRowDisposition =
        DTSRowDisposition.RD_RedirectRow;
    outputColumn.TruncationRowDisposition =
        DTSRowDisposition.RD_RedirectRow;

    string expression = string.Empty;
    string friendlyExpression = string.Empty;

    GetExpressionString(ref expression,
        ref friendlyExpression,
        dataType, length, precision,
        scale,
        lineAgeIDs,  outputColumn.Name);

    IDTSCustomProperty90 property =
        outputColumn.CustomPropertyCollection.New();
    property.Name = "Expression";
    property.Value = expression;
    property = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "FriendlyExpression";
    property.Value
        = friendlyExpression;// "getdate()";

    // Now we are preserving the Lineage id into a list.
    // You know, when later we will configure the
    // dataflowcomponent of SQL destination
    // then, we will find all the inputs (the input came from
    // flat file and the inputs
    // came from the derived columns output). And we need is to
    // distinguish among them.
    // We will only consider those inputs into the data
    // destination component, where the
    // inputs are coming from the output of the derived column
    // component which is actually here.
    derivedLineageIdentifiers[outputColumn.LineageID]
        = outputColumn.Name;

我们在这里定义了一个名为 GetExpressionString() 的方法,它实际上是创建表达式字符串。现在让我们来实现这个方法。

private void GetExpressionString(ref string expression, ref string
    friendlyExpression,
    Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType dataType,
    int length, int precision, int scale,
    Dictionary<string, int> lineAgeIDs,
    string columnName)
{
    expression = string.Empty;
    friendlyExpression = string.Empty;
    // get the lineage id for the column name
    int lineageID = lineAgeIDs[columnName];

    switch (dataType)
    {
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR:
        expression =
            string.Format("#{0}",lineageID);
        friendlyExpression = string.Format("{0}", columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4:
        // 4 byte signed integer
        expression =
            string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_I4) : (DT_I4)#{2}",
            lineageID, lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_I4) : (DT_I4)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DECIMAL:

        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC:
        // Should we handle precision here?
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_NUMERIC,{2},{3}) : (DT_NUMERIC,{4},{5})#
            {6}", lineageID, lineageID, precision, scale, precision,
            scale, lineageID);
        friendlyExpression =
            string.Format("[ISNULL]([{0}]) || [{1}] ==
            \"\" ? NULL(DT_NUMERIC,{2},{3}) : (DT_NUMERIC,{4},
            {5})[{6}]", columnName, columnName, precision, scale,
            precision, scale, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_DATE) : (DT_DATE)#{2}", lineageID,
            lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_DATE) : (DT_DATE)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I8:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_I8) : (DT_I8)#{2}", lineageID,
            lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_I8) : (DT_I8)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_BOOL) : (DT_BOOL)#{2}",
            lineageID, lineageID, lineageID);
        friendlyExpression =
            string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_BOOL) : (DT_BOOL)[{2}]",
            columnName, columnName, columnName);
        break;
        default:
        expression = string.Format("#{0}", lineageID);
        friendlyExpression =
            string.Format("{0}", columnName);
        break;
    }
}

所以,这就是我们的派生列实现。现在,我们的包应该将缺失的行带到目标数据表中——正如它应该的那样。尽管派生列通常执行真正的数据派生任务,但它也可以用于解决这个问题。

结论

在本文中,我仅描述了如何以编程方式使用不同的数据流组件来满足我们的自定义目的。我希望你会喜欢它。

© . All rights reserved.