在 SSIS 中创建自定义转换组件(邮件验证器)- 分步指南






4.71/5 (6投票s)
在本文中,我们将探讨如何创建自定义 SSIS 转换组件。该自定义组件是一个简单的
引言
SSIS 是一个出色的 ETL 工具,并且已经存在很长时间了。但我们并非总能获得所需的所有转换组件。为此,我们会使用脚本组件。但其缺点是它特定于应用程序。如果我们需要在其他 SSIS 包中执行相同类型的操作,那么脚本组件将无济于事。相反,我们可以创建自己的自定义转换组件,甚至可以实现重用。本文将以分步方式重点介绍如何构建、部署和使用自定义脚本转换组件。
背景
SSIS 转换组件帮助我们将源数据转换为所需的格式。尽管 SSIS 提供了许多转换组件,但有时我们可能需要执行没有可用组件的操作。在这种情况下,自定义组件非常方便。在本文中,我们将探讨如何创建自定义 SSIS 转换组件。该自定义组件是一个简单的电子邮件验证器,它将从某个源读取电子邮件,并告知我们它是否是有效电子邮件。我们将选择 C# 作为语言。
使用代码
步骤 1
首先,让我们创建一个类库项目(例如 DFCEmailValidator)。
第二步
创建项目后,让我们在项目中添加以下 DLL 引用。
- Microsoft.SqlServer.Dts.Design
- Microsoft.SqlServer.DTSPipelineWrap
- Microsoft.SqlServer.DTSRuntimeWrap
- Microsoft.SqlServer.ManagedDTS
- Microsoft.SqlServer.PipelineHost
步骤 3
让我们创建一个名为 CustomEmailValidator.cs 的类。将该类继承自 PipelineComponent 类,并使用 DtsPipelineComponent 属性 进行装饰,如下所示:
using System; using System.Text.RegularExpressions; using Microsoft.SqlServer.Dts.Pipeline; using Microsoft.SqlServer.Dts.Pipeline.Wrapper; using Microsoft.SqlServer.Dts.Runtime.Wrapper; namespace DFCEmailValidator { [DtsPipelineComponent(DisplayName = "CustomEmailValidator" , Description = "Custom Email Validator of Data Flow Component-Niladri Biswas" , ComponentType = ComponentType.Transform)] public class CustomEmailValidator : PipelineComponent { } }
由于我们正在准备自定义数据流组件,因此它需要继承 PipelineComponent 基类。DTSPipelineComponentAttribute 的存在使此类成为一个转换类型的数据流组件。管道组件是 SSIS 组件的支柱,并与连接管理器协同工作以移动和转换数据。
步骤 4:重写 ProvideComponentProperties 方法
组件初始化在 ProvideComponentProperties 方法中进行。当组件首次添加到数据流任务时会调用此方法,以初始化组件的 Microsoft.SqlServer.Dts.Pipeline.PipelineComponent.ComponentMetaData。我们将在该方法中执行以下操作:
- 设置组件信息
- 添加输入对象
- 添加输出对象
- 添加错误对象
public override void ProvideComponentProperties() { // Set component information ComponentMetaData.Name = "CustomEmailValidator"; ComponentMetaData.Description = "Custom Email Validator of Data Flow Component-Niladri Biswas"; ComponentMetaData.ContactInfo = "mail2nil123@gmail.com"; // Reset the component. //Deletes each Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSInput100 //and Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSOutput100 object from the component. RemoveAllInputsOutputsAndCustomProperties(); // Add an input object IDTSInput100 inputObject = ComponentMetaData.InputCollection.New(); inputObject.Name = "InputToCustomEmailValidator"; inputObject.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent; // Add an output object. IDTSOutput100 outputObject = ComponentMetaData.OutputCollection.New(); outputObject.Name = "OutputFromCustomEmailValidator"; outputObject.SynchronousInputID = inputObject.ID; //Synchronous Transform //Add an error object AddErrorOutput("ErrorFromCustomEmailValidator", inputObject.ID, outputObject.ExclusionGroup); }
步骤 5:重写 Validate 方法
此方法验证组件是否已正确配置。它返回一个 DTSValidationStatus 枚举,如下所示:
枚举成员名称 | 值 | 描述 |
VS_ISVALID | 0 | 组件已正确配置,可供执行 |
VS_ISBROKEN | 1 | 组件配置不正确;通常,这表示某个属性设置不正确 |
VS_NEEDSNEWMETADATA | 2 | 组件的元数据已过时或损坏,调用 Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSDesigntimeComponent100.ReinitializeMetaData() 将会修复该组件 |
VS_ISCORRUPT | 3 | 组件已严重损坏,必须完全重置。设计器会响应此调用组件的 Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSDesigntimeComponent100.ProvideComponentProperties() 方法。 |
我们将在该方法中执行以下操作:
- 检查列类型是否为 DT_STR/DT_WSTR
- 动态创建布尔类型的输出列以对应每个输入列。例如,如果输入列名为 MyInput,则输出列将命名为 IsValidEmail_ MyInput。
public override DTSValidationStatus Validate() { bool pbCancel = false; IDTSInput100 input = ComponentMetaData.InputCollection[0]; string errorMsg = "Wrong datatype specified for {0}. It accepts DT_STR and DT_WSTR"; //Check whether the column type is DT_STR/DT_WSTR or not for (int x = 0; x < input.InputColumnCollection.Count; x++) { if (!(input.InputColumnCollection[x].DataType == DataType.DT_STR || input.InputColumnCollection[x].DataType == DataType.DT_WSTR)) { ComponentMetaData.FireError( 0 , ComponentMetaData.Name , String.Format(errorMsg, input.InputColumnCollection[x].Name) , string.Empty , 0 , out pbCancel); return DTSValidationStatus.VS_ISCORRUPT; } } // Dynamically create the output column of Boolean type IDTSOutput100 output = ComponentMetaData.OutputCollection[0]; foreach (IDTSInputColumn100 inputColumn in input.InputColumnCollection) { bool IsPresent = false; foreach (IDTSOutputColumn100 outputColumn in output.OutputColumnCollection) { if (outputColumn.Name == "IsValidEmail_" + inputColumn.Name) { IsPresent = true; } } if (!IsPresent) { IDTSOutputColumn100 outputcol = output.OutputColumnCollection.New(); outputcol.Name = "IsValidEmail_" + inputColumn.Name; outputcol.Description = String.Format("Tells if {0} is a Valid Email Address.", inputColumn.Name); outputcol.SetDataTypeProperties(DataType.DT_BOOL, 0, 0, 0, 0); } } return DTSValidationStatus.VS_ISVALID; }
步骤 6:重写 ReinitializeMetaData 方法
此方法会修复在验证期间识别出的任何错误,这些错误会导致组件在设计时返回 Microsoft.SqlServer.Dts.Pipeline.Wrapper.DTSValidationStatus.VS_NEEDSNEWMETADATA。
public override void ReinitializeMetaData() { ComponentMetaData.RemoveInvalidInputColumns(); ReinitializeMetaData(); }
步骤 7:重写 InsertOutputColumnAt 方法
阻止从“高级编辑器”添加新的输出列,因为我们已经在 Validate 方法中添加了输出列。
public override IDTSOutputColumn100 InsertOutputColumnAt( int outputID, int outputColumnIndex, string name, string description) { throw new Exception( string.Format("Fail to add output column name to {0} ", ComponentMetaData.Name) , null); }
步骤 8:重写 PreExecute 方法
在数据执行流中,它对每个组件调用一次。
public override void PreExecute() { IDTSInput100 input = ComponentMetaData.InputCollection[0]; inputBufferColumnIdx = new int[input.InputColumnCollection.Count]; Enumerable .Range(0, input.InputColumnCollection.Count) .ToList() .ForEach(i => { IDTSInputColumn100 inputCol = input.InputColumnCollection[i]; inputBufferColumnIdx[i] = BufferManager .FindColumnByLineageID(input.Buffer, inputCol.LineageID); }); IDTSOutput100 output = ComponentMetaData.OutputCollection[0]; outputBufferColumnIdx = new int[output.OutputColumnCollection.Count]; Enumerable .Range(0, input.InputColumnCollection.Count) .ToList() .ForEach(i => { IDTSOutputColumn100 outputCol = output.OutputColumnCollection[i]; outputBufferColumnIdx[i] = BufferManager .FindColumnByLineageID(input.Buffer, outputCol.LineageID); }); }
步骤 9:重写 ProcessInput 方法
当上游组件提供的 PipelineBuffer 可供组件处理时,在运行时会反复调用此方法,以便组件处理传入的行。在此方法中实现电子邮件验证的逻辑。
public override void ProcessInput(int inputID, PipelineBuffer buffer) { if (!buffer.EndOfRowset) { while (buffer.NextRow()) { for (int x = 0; x < inputBufferColumnIdx.Length; x++) { bool isValidEmail = false; DataType BufferColDataType = buffer.GetColumnInfo(inputBufferColumnIdx[x]).DataType; if (BufferColDataType == DataType.DT_STR || BufferColDataType == DataType.DT_WSTR) { isValidEmail = IsValidEmail(buffer.GetString(inputBufferColumnIdx[x])); } buffer.SetBoolean(outputBufferColumnIdx[x], isValidEmail); } } } }
IsValidEmail 方法如下:
private bool IsValidEmail(string email) { string regexPattern = @"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}$"; return new Regex(regexPattern, RegexOptions.IgnoreCase).IsMatch(email); }
完整代码如下:
using System; using System.Linq; using System.Text.RegularExpressions; using Microsoft.SqlServer.Dts.Pipeline; using Microsoft.SqlServer.Dts.Pipeline.Wrapper; using Microsoft.SqlServer.Dts.Runtime.Wrapper; namespace DFCEmailValidator { [DtsPipelineComponent(DisplayName = "CustomEmailValidator" , Description = "Custom Email Validator of Data Flow Component-Niladri Biswas" , ComponentType = ComponentType.Transform)] public class CustomEmailValidator : PipelineComponent { private int[] inputBufferColumnIdx; private int[] outputBufferColumnIdx; #region ProvideComponentProperties public override void ProvideComponentProperties() { // Set component information ComponentMetaData.Name = "CustomEmailValidator"; ComponentMetaData.Description = "Custom Email Validator of Data Flow Component-Niladri Biswas"; ComponentMetaData.ContactInfo = "mail2nil123@gmail.com"; // Reset the component. //Deletes each Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSInput100 //and Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSOutput100 object from the component. RemoveAllInputsOutputsAndCustomProperties(); // Add an input object IDTSInput100 inputObject = ComponentMetaData.InputCollection.New(); inputObject.Name = "InputToCustomEmailValidator"; inputObject.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent; // Add an output object. IDTSOutput100 outputObject = ComponentMetaData.OutputCollection.New(); outputObject.Name = "OutputFromCustomEmailValidator"; outputObject.SynchronousInputID = inputObject.ID; //Synchronous Transform //Add an error object AddErrorOutput("ErrorFromCustomEmailValidator", inputObject.ID, outputObject.ExclusionGroup); } public override DTSValidationStatus Validate() { bool pbCancel = false; IDTSInput100 input = ComponentMetaData.InputCollection[0]; string errorMsg = "Wrong datatype specified for {0}. It accepts DT_STR and DT_WSTR"; //Check whether the column type is DT_STR/DT_WSTR or not for (int x = 0; x < input.InputColumnCollection.Count; x++) { if (!(input.InputColumnCollection[x].DataType == DataType.DT_STR || input.InputColumnCollection[x].DataType == DataType.DT_WSTR)) { ComponentMetaData.FireError( 0 , ComponentMetaData.Name , String.Format(errorMsg, input.InputColumnCollection[x].Name) , string.Empty , 0 , out pbCancel); return DTSValidationStatus.VS_ISCORRUPT; } } // Dynamically create the output column of Boolean type IDTSOutput100 output = ComponentMetaData.OutputCollection[0]; foreach (IDTSInputColumn100 inputColumn in input.InputColumnCollection) { bool IsPresent = false; foreach (IDTSOutputColumn100 outputColumn in output.OutputColumnCollection) { if (outputColumn.Name == "IsValidEmail_" + inputColumn.Name) { IsPresent = true; } } if (!IsPresent) { IDTSOutputColumn100 outputcol = output.OutputColumnCollection.New(); outputcol.Name = "IsValidEmail_" + inputColumn.Name; outputcol.Description = String.Format("Tells if {0} is a Valid Email Address.", inputColumn.Name); outputcol.SetDataTypeProperties(DataType.DT_BOOL, 0, 0, 0, 0); } } return DTSValidationStatus.VS_ISVALID; } public override void ReinitializeMetaData() { ComponentMetaData.RemoveInvalidInputColumns(); ReinitializeMetaData(); } public override IDTSOutputColumn100 InsertOutputColumnAt( int outputID, int outputColumnIndex, string name, string description) { throw new Exception( string.Format("Fail to add output column name to {0} ", ComponentMetaData.Name) , null); } #endregion #region Runtime methods public override void PreExecute() { IDTSInput100 input = ComponentMetaData.InputCollection[0]; inputBufferColumnIdx = new int[input.InputColumnCollection.Count]; Enumerable .Range(0, input.InputColumnCollection.Count) .ToList() .ForEach(i => { IDTSInputColumn100 inputCol = input.InputColumnCollection[i]; inputBufferColumnIdx[i] = BufferManager .FindColumnByLineageID(input.Buffer, inputCol.LineageID); }); IDTSOutput100 output = ComponentMetaData.OutputCollection[0]; outputBufferColumnIdx = new int[output.OutputColumnCollection.Count]; Enumerable .Range(0, input.InputColumnCollection.Count) .ToList() .ForEach(i => { IDTSOutputColumn100 outputCol = output.OutputColumnCollection[i]; outputBufferColumnIdx[i] = BufferManager .FindColumnByLineageID(input.Buffer, outputCol.LineageID); }); } public override void ProcessInput(int inputID, PipelineBuffer buffer) { if (!buffer.EndOfRowset) { while (buffer.NextRow()) { for (int x = 0; x < inputBufferColumnIdx.Length; x++) { bool isValidEmail = false; DataType BufferColDataType = buffer.GetColumnInfo(inputBufferColumnIdx[x]).DataType; if (BufferColDataType == DataType.DT_STR || BufferColDataType == DataType.DT_WSTR) { isValidEmail = IsValidEmail(buffer.GetString(inputBufferColumnIdx[x])); } buffer.SetBoolean(outputBufferColumnIdx[x], isValidEmail); } } } } private bool IsValidEmail(string email) { string regexPattern = @"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}$"; return new Regex(regexPattern, RegexOptions.IgnoreCase).IsMatch(email); } #endregion Run Time Methods } }
步骤 10:构建组件并将其部署到 GAC
现在,我们应该构建类库,生成 强名称 ,然后使用以下命令将其部署到 gac:
C:\Program Files\Microsoft Visual Studio 9.0\VC>gacutil -i "D:\DFCEmailValidator\bin\Debug\DFCEmailValidator.dll"
注意:DLL 路径会有所不同... 请相应更改。
如果成功,我们将收到消息:
程序集已成功添加到缓存
我们还可以通过浏览至 C:\WINDOWS\assembly 来验证这一点,在那里我们会找到名为 DFCEmailValidator 的程序集。
步骤 11:复制 DLL 并将其放入 PipeLineComponents 文件夹
完成上一步后,接下来我们可以将 DLL 复制到 PipelineComponents 文件夹,该文件夹位于 C:\Program Files\Microsoft SQL Server\100\DTS\PipelineComponents。
第 12 步
打开 BIDS。转到 DataFlow 编辑器的工具箱。右键单击 -> 选择项。访问 SSIS DataFlow 项目选项卡,找到 CustomEmailValidator。勾选复选框,然后单击“确定”按钮。
我们可以发现自定义转换已添加到工具箱中。
第 13 步
拖放一个平面文件源并进行配置。假设我们有一个文本文件(Source.txt),其内容如下:
testmailwrong testmail@test.com invalid@invalid valid@gmail.com valid_123@yahoo.co.in 123@123@123@1123#456~xud.com
接下来,拖放我们的 CustomEmailValidator,然后添加从平面文件源到它的数据流路径。双击 CustomEmailValidator 以打开编辑器。
屏幕显示它有三个选项卡:自定义属性、输入列和输入输出属性。
第一个选项卡显示我们设置的自定义属性。
访问“输入列”选项卡,我们会找到可用的输入列,需要选择。在这种情况下,它是 Column0。
在“输入输出属性”中,如果展开“输出列”,我们会发现我们的输出列已自动创建,名为 IsValidEmail_Column0。
单击“确定”。接下来,拖放一个条件拆分,然后添加从 CustomEmailValidator 到它的数据流路径。并按如下方式配置条件拆分组件:
输出名称 | 条件 |
成功 | [IsValidEmail_Column0] == TRUE |
失败 | [IsValidEmail_Column0] == FALSE |
最后,添加两个平面文件目标组件,一个的源将是条件路径的成功路径,另一个将是失败路径。目标文件分别命名为 ValidEmail.txt 和 InvalidEmail.txt。
整个包如下:
第 14 步
让我们运行应用程序,我们将在 ValidEmail.txt 中看到有效的电子邮件。
testmail@test.com valid@gmail.com valid_123@yahoo.co.in
而 InvalidEmail.txt 包含以下内容:
testmailwrong invalid@invalid 123@123@123@1123#456~xud.com
结论
本文通过一个非常简单的示例,以分步方式展示了如何构建自定义转换组件。我希望它能帮助许多对这一概念感到陌生的新开发人员。我们可以遵循这个概念,在 SSIS 中构建任何类型的自定义组件。在下一篇文章中,我们将探讨如何在 SSIS 中构建自定义任务。
感谢阅读。