65.9K
CodeProject 正在变化。 阅读更多。
Home

在 SSIS 中创建自定义转换组件(邮件验证器)- 分步指南

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.71/5 (6投票s)

2011 年 8 月 3 日

CPOL

6分钟阅读

viewsIcon

50063

downloadIcon

849

在本文中,我们将探讨如何创建自定义 SSIS 转换组件。该自定义组件是一个简单的

引言

SSIS 是一个出色的 ETL 工具,并且已经存在很长时间了。但我们并非总能获得所需的所有转换组件。为此,我们会使用脚本组件。但其缺点是它特定于应用程序。如果我们需要在其他 SSIS 包中执行相同类型的操作,那么脚本组件将无济于事。相反,我们可以创建自己的自定义转换组件,甚至可以实现重用。本文将以分步方式重点介绍如何构建、部署和使用自定义脚本转换组件。

背景

SSIS 转换组件帮助我们将源数据转换为所需的格式。尽管 SSIS 提供了许多转换组件,但有时我们可能需要执行没有可用组件的操作。在这种情况下,自定义组件非常方便。在本文中,我们将探讨如何创建自定义 SSIS 转换组件。该自定义组件是一个简单的电子邮件验证器,它将从某个源读取电子邮件,并告知我们它是否是有效电子邮件。我们将选择 C# 作为语言。

使用代码

步骤 1

首先,让我们创建一个类库项目(例如 DFCEmailValidator)。

第二步

创建项目后,让我们在项目中添加以下 DLL 引用。

  1. Microsoft.SqlServer.Dts.Design
  2. Microsoft.SqlServer.DTSPipelineWrap
  3. Microsoft.SqlServer.DTSRuntimeWrap
  4. Microsoft.SqlServer.ManagedDTS
  5. Microsoft.SqlServer.PipelineHost

步骤 3

让我们创建一个名为 CustomEmailValidator.cs 的类。将该类继承自 PipelineComponent 类,并使用 DtsPipelineComponent 属性 进行装饰,如下所示:

using System;
using System.Text.RegularExpressions;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace DFCEmailValidator
{
    [DtsPipelineComponent(DisplayName = "CustomEmailValidator"
                        , Description = "Custom Email Validator of Data Flow 
                                         Component-Niladri Biswas"
                        , ComponentType = ComponentType.Transform)]
    public class CustomEmailValidator : PipelineComponent
    {
    }
}

由于我们正在准备自定义数据流组件,因此它需要继承 PipelineComponent 基类。DTSPipelineComponentAttribute 的存在使此类成为一个转换类型的数据流组件。管道组件是 SSIS 组件的支柱,并与连接管理器协同工作以移动和转换数据。

步骤 4:重写 ProvideComponentProperties 方法

组件初始化在 ProvideComponentProperties 方法中进行。当组件首次添加到数据流任务时会调用此方法,以初始化组件的 Microsoft.SqlServer.Dts.Pipeline.PipelineComponent.ComponentMetaData。我们将在该方法中执行以下操作:

  1. 设置组件信息
  2. 添加输入对象
  3. 添加输出对象
  4. 添加错误对象
public override void ProvideComponentProperties()
{
    // Set component information
    ComponentMetaData.Name = "CustomEmailValidator";
    ComponentMetaData.Description = "Custom Email Validator of Data Flow Component-Niladri Biswas";
    ComponentMetaData.ContactInfo = "mail2nil123@gmail.com";

    // Reset the component.
    //Deletes each Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSInput100 
    //and Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSOutput100 object from the component.
    RemoveAllInputsOutputsAndCustomProperties();

    // Add an input object
    IDTSInput100 inputObject = ComponentMetaData.InputCollection.New();
    inputObject.Name = "InputToCustomEmailValidator";
    inputObject.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent;

    // Add an output object.
    IDTSOutput100 outputObject = ComponentMetaData.OutputCollection.New();
    outputObject.Name = "OutputFromCustomEmailValidator";
    outputObject.SynchronousInputID = inputObject.ID; //Synchronous Transform           

    //Add an error object
    AddErrorOutput("ErrorFromCustomEmailValidator", inputObject.ID, outputObject.ExclusionGroup);

}

步骤 5:重写 Validate 方法

此方法验证组件是否已正确配置。它返回一个 DTSValidationStatus 枚举,如下所示:

枚举成员名称 描述
VS_ISVALID 0 组件已正确配置,可供执行
VS_ISBROKEN 1 组件配置不正确;通常,这表示某个属性设置不正确
VS_NEEDSNEWMETADATA 2 组件的元数据已过时或损坏,调用 Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSDesigntimeComponent100.ReinitializeMetaData() 将会修复该组件
VS_ISCORRUPT 3 组件已严重损坏,必须完全重置。设计器会响应此调用组件的 Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSDesigntimeComponent100.ProvideComponentProperties() 方法。

我们将在该方法中执行以下操作:

  1. 检查列类型是否为 DT_STR/DT_WSTR
  2. 动态创建布尔类型的输出列以对应每个输入列。例如,如果输入列名为 MyInput,则输出列将命名为 IsValidEmail_ MyInput。
public override DTSValidationStatus Validate()
{
    bool pbCancel = false;
    IDTSInput100 input = ComponentMetaData.InputCollection[0];
    string errorMsg = "Wrong datatype specified for {0}. It accepts DT_STR and DT_WSTR";

    //Check whether the column type is DT_STR/DT_WSTR or not
    for (int x = 0; x < input.InputColumnCollection.Count; x++)
    {
	if (!(input.InputColumnCollection[x].DataType == DataType.DT_STR
	   || input.InputColumnCollection[x].DataType == DataType.DT_WSTR))
	{
	    ComponentMetaData.FireError(
					0
					, ComponentMetaData.Name
					, String.Format(errorMsg, input.InputColumnCollection[x].Name)
					, string.Empty
					, 0
					, out pbCancel);
	    return DTSValidationStatus.VS_ISCORRUPT;


	}
    }

   // Dynamically create the output column of Boolean type
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    foreach (IDTSInputColumn100 inputColumn in input.InputColumnCollection)
    {
	bool IsPresent = false;
	foreach (IDTSOutputColumn100 outputColumn in output.OutputColumnCollection)
	{
	    if (outputColumn.Name == "IsValidEmail_" + inputColumn.Name)
	    {
		IsPresent = true;
	    }
	}

	if (!IsPresent)
	{
	    IDTSOutputColumn100 outputcol = output.OutputColumnCollection.New();
	    outputcol.Name = "IsValidEmail_" + inputColumn.Name;
	    outputcol.Description = String.Format("Tells if {0} is a Valid Email Address.", inputColumn.Name);
	    outputcol.SetDataTypeProperties(DataType.DT_BOOL, 0, 0, 0, 0);
	}
    }
    return DTSValidationStatus.VS_ISVALID;
}

步骤 6:重写 ReinitializeMetaData 方法

此方法会修复在验证期间识别出的任何错误,这些错误会导致组件在设计时返回 Microsoft.SqlServer.Dts.Pipeline.Wrapper.DTSValidationStatus.VS_NEEDSNEWMETADATA。

public override void ReinitializeMetaData()
{
    ComponentMetaData.RemoveInvalidInputColumns();
    ReinitializeMetaData();
}

步骤 7:重写 InsertOutputColumnAt 方法

阻止从“高级编辑器”添加新的输出列,因为我们已经在 Validate 方法中添加了输出列。

public override IDTSOutputColumn100 InsertOutputColumnAt(
							     int outputID,
							     int outputColumnIndex,
							     string name,
							     string description)
{
    throw new Exception(
	string.Format("Fail to add output column name to {0} ", ComponentMetaData.Name)
	, null);

}

步骤 8:重写 PreExecute 方法

在数据执行流中,它对每个组件调用一次。

public override void PreExecute()
{
    IDTSInput100 input = ComponentMetaData.InputCollection[0];
    inputBufferColumnIdx = new int[input.InputColumnCollection.Count];

    Enumerable
	.Range(0, input.InputColumnCollection.Count)
	.ToList()
	.ForEach(i =>
	{
	    IDTSInputColumn100 inputCol = input.InputColumnCollection[i];
	    inputBufferColumnIdx[i] = BufferManager
				       .FindColumnByLineageID(input.Buffer, inputCol.LineageID);
	});


    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
    outputBufferColumnIdx = new int[output.OutputColumnCollection.Count];

    Enumerable
	.Range(0, input.InputColumnCollection.Count)
	.ToList()
	.ForEach(i =>
	{
	    IDTSOutputColumn100 outputCol = output.OutputColumnCollection[i];
	    outputBufferColumnIdx[i] = BufferManager
					.FindColumnByLineageID(input.Buffer, outputCol.LineageID);
	});
}

步骤 9:重写 ProcessInput 方法

当上游组件提供的 PipelineBuffer 可供组件处理时,在运行时会反复调用此方法,以便组件处理传入的行。在此方法中实现电子邮件验证的逻辑。

public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
    if (!buffer.EndOfRowset)
    {
	while (buffer.NextRow())
	{
	    for (int x = 0; x < inputBufferColumnIdx.Length; x++)
	    {
		bool isValidEmail = false;
		DataType BufferColDataType = buffer.GetColumnInfo(inputBufferColumnIdx[x]).DataType;

		if (BufferColDataType == DataType.DT_STR ||
		    BufferColDataType == DataType.DT_WSTR)
		{
		    isValidEmail = IsValidEmail(buffer.GetString(inputBufferColumnIdx[x]));
		}

		buffer.SetBoolean(outputBufferColumnIdx[x], isValidEmail);
	    }
	}
    }
}

IsValidEmail 方法如下:

private bool IsValidEmail(string email)
{
    string regexPattern = @"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}$";
    return new Regex(regexPattern, RegexOptions.IgnoreCase).IsMatch(email);
}

完整代码如下:

using System;
using System.Linq;
using System.Text.RegularExpressions;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace DFCEmailValidator
{
    [DtsPipelineComponent(DisplayName = "CustomEmailValidator"
                        , Description = "Custom Email Validator of Data Flow Component-Niladri Biswas"
                        , ComponentType = ComponentType.Transform)]
    public class CustomEmailValidator : PipelineComponent
    {
        private int[] inputBufferColumnIdx;
        private int[] outputBufferColumnIdx;

        #region ProvideComponentProperties
        public override void ProvideComponentProperties()
        {
            // Set component information
            ComponentMetaData.Name = "CustomEmailValidator";
            ComponentMetaData.Description = "Custom Email Validator of Data Flow Component-Niladri Biswas";
            ComponentMetaData.ContactInfo = "mail2nil123@gmail.com";

            // Reset the component.
            //Deletes each Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSInput100 
            //and Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSOutput100 object from the component.
            RemoveAllInputsOutputsAndCustomProperties();

            // Add an input object
            IDTSInput100 inputObject = ComponentMetaData.InputCollection.New();
            inputObject.Name = "InputToCustomEmailValidator";
            inputObject.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent;

            // Add an output object.
            IDTSOutput100 outputObject = ComponentMetaData.OutputCollection.New();
            outputObject.Name = "OutputFromCustomEmailValidator";
            outputObject.SynchronousInputID = inputObject.ID; //Synchronous Transform           

            //Add an error object
            AddErrorOutput("ErrorFromCustomEmailValidator", inputObject.ID, outputObject.ExclusionGroup);

        }
       
        public override DTSValidationStatus Validate()
        {
            bool pbCancel = false;
            IDTSInput100 input = ComponentMetaData.InputCollection[0];
            string errorMsg = "Wrong datatype specified for {0}. It accepts DT_STR and DT_WSTR";

            //Check whether the column type is DT_STR/DT_WSTR or not
            for (int x = 0; x < input.InputColumnCollection.Count; x++)
            {
                if (!(input.InputColumnCollection[x].DataType == DataType.DT_STR
                   || input.InputColumnCollection[x].DataType == DataType.DT_WSTR))
                {
                    ComponentMetaData.FireError(
                                                0
                                                , ComponentMetaData.Name
                                                , String.Format(errorMsg, input.InputColumnCollection[x].Name)
                                                , string.Empty
                                                , 0
                                                , out pbCancel);
                    return DTSValidationStatus.VS_ISCORRUPT;
                    
                    
                }
            }

           // Dynamically create the output column of Boolean type
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

            foreach (IDTSInputColumn100 inputColumn in input.InputColumnCollection)
            {
                bool IsPresent = false;
                foreach (IDTSOutputColumn100 outputColumn in output.OutputColumnCollection)
                {
                    if (outputColumn.Name == "IsValidEmail_" + inputColumn.Name)
                    {
                        IsPresent = true;
                    }
                }

                if (!IsPresent)
                {
                    IDTSOutputColumn100 outputcol = output.OutputColumnCollection.New();
                    outputcol.Name = "IsValidEmail_" + inputColumn.Name;
                    outputcol.Description = String.Format("Tells if {0} is a Valid Email Address.", inputColumn.Name);
                    outputcol.SetDataTypeProperties(DataType.DT_BOOL, 0, 0, 0, 0);
                }
            }
            return DTSValidationStatus.VS_ISVALID;
        }
       
        public override void ReinitializeMetaData()
        {
            ComponentMetaData.RemoveInvalidInputColumns();
            ReinitializeMetaData();
        }
        
        public override IDTSOutputColumn100 InsertOutputColumnAt(
             int outputID,
             int outputColumnIndex,
             string name,
             string description)
        {
            throw new Exception(
                string.Format("Fail to add output column name to {0} ", ComponentMetaData.Name)
                , null);

        }
        #endregion


        #region Runtime methods

        public override void PreExecute()
        {
            IDTSInput100 input = ComponentMetaData.InputCollection[0];
            inputBufferColumnIdx = new int[input.InputColumnCollection.Count];

            Enumerable
                .Range(0, input.InputColumnCollection.Count)
                .ToList()
                .ForEach(i =>
                {
                    IDTSInputColumn100 inputCol = input.InputColumnCollection[i];
                    inputBufferColumnIdx[i] = BufferManager
                                               .FindColumnByLineageID(input.Buffer, inputCol.LineageID);
                });


            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
            outputBufferColumnIdx = new int[output.OutputColumnCollection.Count];

            Enumerable
                .Range(0, input.InputColumnCollection.Count)
                .ToList()
                .ForEach(i =>
                {
                    IDTSOutputColumn100 outputCol = output.OutputColumnCollection[i];
                    outputBufferColumnIdx[i] = BufferManager
                                                .FindColumnByLineageID(input.Buffer, outputCol.LineageID);
                });
        }

        public override void ProcessInput(int inputID, PipelineBuffer buffer)
        {
            if (!buffer.EndOfRowset)
            {
                while (buffer.NextRow())
                {
                    for (int x = 0; x < inputBufferColumnIdx.Length; x++)
                    {
                        bool isValidEmail = false;
                        DataType BufferColDataType = buffer.GetColumnInfo(inputBufferColumnIdx[x]).DataType;

                        if (BufferColDataType == DataType.DT_STR ||
                            BufferColDataType == DataType.DT_WSTR)
                        {
                            isValidEmail = IsValidEmail(buffer.GetString(inputBufferColumnIdx[x]));
                        }

                        buffer.SetBoolean(outputBufferColumnIdx[x], isValidEmail);
                    }
                }
            }
        }


        private bool IsValidEmail(string email)
        {
            string regexPattern = @"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}$";
            return new Regex(regexPattern, RegexOptions.IgnoreCase).IsMatch(email);
        }

        #endregion Run Time Methods
    }
}

步骤 10:构建组件并将其部署到 GAC

现在,我们应该构建类库,生成 强名称 ,然后使用以下命令将其部署到 gac:

C:\Program Files\Microsoft Visual Studio 9.0\VC>gacutil  -i "D:\DFCEmailValidator\bin\Debug\DFCEmailValidator.dll"

注意:DLL 路径会有所不同... 请相应更改。

如果成功,我们将收到消息:

程序集已成功添加到缓存

1.jpg

我们还可以通过浏览至 C:\WINDOWS\assembly 来验证这一点,在那里我们会找到名为 DFCEmailValidator 的程序集。

2.jpg

步骤 11:复制 DLL 并将其放入 PipeLineComponents 文件夹

完成上一步后,接下来我们可以将 DLL 复制到 PipelineComponents 文件夹,该文件夹位于 C:\Program Files\Microsoft SQL Server\100\DTS\PipelineComponents

3.jpg

第 12 步

打开 BIDS。转到 DataFlow 编辑器的工具箱。右键单击 -> 选择项。访问 SSIS DataFlow 项目选项卡,找到 CustomEmailValidator。勾选复选框,然后单击“确定”按钮。

4.jpg

我们可以发现自定义转换已添加到工具箱中。

5.jpg

第 13 步

拖放一个平面文件源并进行配置。假设我们有一个文本文件(Source.txt),其内容如下:

testmailwrong
testmail@test.com
invalid@invalid
valid@gmail.com
valid_123@yahoo.co.in
123@123@123@1123#456~xud.com

接下来,拖放我们的 CustomEmailValidator,然后添加从平面文件源到它的数据流路径。双击 CustomEmailValidator 以打开编辑器。

6.jpg

屏幕显示它有三个选项卡:自定义属性、输入列和输入输出属性。

第一个选项卡显示我们设置的自定义属性。

访问“输入列”选项卡,我们会找到可用的输入列,需要选择。在这种情况下,它是 Column0

7.jpg

在“输入输出属性”中,如果展开“输出列”,我们会发现我们的输出列已自动创建,名为 IsValidEmail_Column0

8.jpg

单击“确定”。接下来,拖放一个条件拆分,然后添加从 CustomEmailValidator 到它的数据流路径。并按如下方式配置条件拆分组件:

9.jpg

输出名称 条件
成功 [IsValidEmail_Column0] == TRUE
失败 [IsValidEmail_Column0] == FALSE

最后,添加两个平面文件目标组件,一个的源将是条件路径的成功路径,另一个将是失败路径。目标文件分别命名为 ValidEmail.txt 和 InvalidEmail.txt。

整个包如下:

10.jpg

第 14 步

让我们运行应用程序,我们将在 ValidEmail.txt 中看到有效的电子邮件。

testmail@test.com
valid@gmail.com
valid_123@yahoo.co.in

而 InvalidEmail.txt 包含以下内容:

testmailwrong
invalid@invalid
123@123@123@1123#456~xud.com

11.jpg

结论

本文通过一个非常简单的示例,以分步方式展示了如何构建自定义转换组件。我希望它能帮助许多对这一概念感到陌生的新开发人员。我们可以遵循这个概念,在 SSIS 中构建任何类型的自定义组件。在下一篇文章中,我们将探讨如何在 SSIS 中构建自定义任务。

感谢阅读。

© . All rights reserved.