SSIS:通过编码
使用 C# 分步创建 SSIS 包。
引言
SSIS
,更广为人知的名称是“SQL Server Integration Services (SSIS)”,是 SQL Server 的一个组件。
根据Wikipedia:
“SSIS 是一个数据集成和工作流应用程序的平台。它提供了一个快速灵活的数据仓库工具,用于数据提取、转换和加载 (ETL)。该工具还可以用于自动化 SQL Server 数据库的维护以及多维数据集数据的更新。”
在这篇文章中,我将通过 C# 演示在同一服务器上的两个表之间传输数据,并分步完全通过编码创建包。虽然通过 GUI 创建 SSIS 包非常容易(只需设置几个属性即可执行),但这里我将演示通过编码创建 SSIS 包的方法。
您将学到 (目录)
- 在数据库中创建两个表 (源和目标)
- 设置 SSIS 项目的初始任务
- 创建 SSIS 包
- 创建源和目标连接管理器对象
- 创建数据流任务 (管道) – 包的核心
- 创建源数据源并分配源连接管理器
- 创建目标数据源并分配目标连接管理器
- 在源输出和目标之间创建路径
- 将源输出列映射到目标输入列
- 保存包并执行实际代码
步骤详解
1. 在数据库中创建两个表 (源和目标)
创建两个名为SourceTable
和 DestinationTable1
的 dummy 表,字段相同,如下所示:CREATE TABLE [dbo].[SourceTable](
[ID] [int] NOT NULL,
[Name] [varchar](20) NULL,
[Age] [int] NULL
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
CREATE TABLE [dbo].[DestinationTable1](
[ID] [int] NOT NULL,
[Name] [varchar](20) NULL,
[Age] [int] NULL
) ON [PRIMARY]
我还向源表中填充了一些 dummy 数据ID | 名称 | 年龄 |
1 | Alok | 30 |
2 | Ashish | 30 |
3 | Jasdeep | 30 |
4 | Ritesh | 35 |
2. 设置 SSIS 项目的初始任务
- 创建一个
C# Windows 窗体项目
。无需花哨的设计。 - 在窗体上添加一个按钮,并包含
OnClick
事件处理程序,我们将在事件处理程序中进行所有编程。 - 现在,将支持的 SSIS DotNet 程序集添加到项目中。为此,在解决方案资源管理器中右键单击“引用”,然后浏览到 %Program Files%\Microsoft SQL Server\100\SDK\Assemblies 并包含以下文件:
- Microsoft.SQLServer.DTSPipelineWrap
- Microsoft.SqlServer.DTSRuntimeWrap
- Microsoft.SqlServer.ManagedDTS
- 关闭“添加程序集”对话框并保存解决方案。
3. 创建 SSIS 包
以下代码将创建包对象:
Package objPackage = new Package();
objPackage.Name = "SSISExample";
objPackage.PackageType = DTSPackageType.DTSDesigner100;
objPackage.VersionBuild = 1;
上面,我们创建了 SSIS 包对象,并将其命名为“SSISExample”,并将包类型定义为 DTSPackageType.DTSDesigner100
。还有许多其他选项。请参阅此处(我们指的是我们正在创建面向设计器的包)4. 创建源和目标连接管理器对象
在继续之前,最好让我们的连接管理器运行起来。这里我创建了两个指向同一数据库的 Oledb 连接,这个连接将由 OleDB 源和 OleDB 目标使用。var connectingString =
@"Data Source=localhost;Integrated Security=SSPI;Initial Catalog=TestDB;Provider=SQLNCLI10.1;Persist Security Info=True;";
ConnectionManager oleDBConnSrc = objPackage.Connections.Add("OLEDB");
oleDBConnSrc.ConnectionString = connectingString;
oleDBConnSrc.Name = "SourceConnection";
ConnectionManager oleDBDestination = objPackage.Connections.Add("OLEDB");
oleDBDestination.ConnectionString = connectingString;
oleDBDestination.Name = "DestinationConnection";
这里我提供了连接字符串,其中包含我连接的 SQL 数据库的连接字符串。您需要注意一件事:您登录数据库的用户应该具有写入访问权限。oleDBConnSrc
和 oleDBDestination
是两个 ConnectionManager 对象。5. 创建数据流任务 (管道) – 包的核心
现在我们来到包中最重要的一部分,所有导入/导出都在这里进行。虽然它的后端名称是 Pipeline,但在设计器屏幕上它被称为“数据流任务”,我们所有源和目标的编码都将在该数据流任务中完成,而且创建起来非常简单。//Create DataFlowTask in the package
TaskHost dataFlowTaskHost = (TaskHost)objPackage.Executables.Add("SSIS.Pipeline.2");
dataFlowTaskHost.Name = @"SSISPipeline";
dataFlowTaskHost.FailPackageOnFailure = true;
dataFlowTaskHost.FailParentOnFailure = true;
dataFlowTaskHost.DelayValidation = false;
dataFlowTaskHost.Description = @"Data Flow Task";
//-----------Data Flow Inner component starts----------------
MainPipe dataFlowTask = dataFlowTaskHost.InnerObject as MainPipe;
从我们的包对象中,我们将使用 objPackage.Executables.Add()
方法添加 Pipeline 对象,并传递 "SSIS.Pipeline.2"
作为要创建的对象名称,并为任务对象提供默认属性。然后,我们获取创建的数据流任务主管道 MainPipe
,其余组件将附加到这个 MainPipe
(数据流任务)。6. 创建源数据源并分配源连接管理器
在步骤 5 中,我们的管道已成功创建。现在,使用其主管道,我们将添加源 oledb 连接。// Create and configure an OLE DB source component.
IDTSComponentMetaData100 sourceOleDB = dataFlowTask.ComponentMetaDataCollection.New();
sourceOleDB.ComponentClassID = "DTSAdapter.OLEDBSource.2";
CManagedComponentWrapper srcDesignTime = sourceOleDB.Instantiate();
// The ProvideComponentProperties method creates a default output.
srcDesignTime.ProvideComponentProperties();
sourceOleDB.Name = "TestDB DATA Source";
在这里,我们在数据流任务对象中创建一个对象,并分配 ComponentClassID = "DTSAdapter.OLEDBSource.2"
,以告知包我们正在添加 OLEDBSource 适配器。之后,我们实例化其设计时组件,并获取其属性,我们将在后续分配。// Assign the connection manager.
sourceOleDB.RuntimeConnectionCollection[0].ConnectionManagerID = oleDBConnSrc.ID;
sourceOleDB.RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.GetExtendedInterface(oleDBConnSrc);
现在,将源数据库适配器与源连接管理器关联起来,上面的代码完成了这个部分。现在我们将设置打开表的属性以进行读取模式。 // Set the custom properties of the source.
srcDesignTime.SetComponentProperty("AccessMode", 0); // Mode 0 : OpenRowset / Table - View
srcDesignTime.SetComponentProperty("OpenRowset", "[dbo].[SourceTable]");
// Connect to the data source, and then update the metadata for the source.
srcDesignTime.AcquireConnections(null);
srcDesignTime.ReinitializeMetaData();
srcDesignTime.ReleaseConnections();
这里 AccessMode=0
表示我们在视图模式下打开数据库,OpenRowSet
属性表示我们要打开的表。7. 创建目标数据源并分配目标连接管理器
代码与源适配器类似,除了我们将传递ComponentClassID
为 "DTSAdapter.OleDbDestination",AccessMode
= 3,并且 OpenRowSet
属性包含目标表。IDTSComponentMetaData100 destinationOleDb =
dataFlowTask.ComponentMetaDataCollection.New();
destinationOleDb.ComponentClassID = "DTSAdapter.OleDbDestination";
CManagedComponentWrapper destDesignTime = destinationOleDb.Instantiate();
destDesignTime.ProvideComponentProperties();
// Assign the connection manager.
destinationOleDb.RuntimeConnectionCollection[0].ConnectionManagerID = oleDBDestination.ID;
destinationOleDb.RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.GetExtendedInterface(oleDBDestination);
// Set the custom properties of the source.
destDesignTime.SetComponentProperty("AccessMode", 3);
destDesignTime.SetComponentProperty("OpenRowset",
"[dbo].[DestinationTable1]");
// Connect to the data source, and then update the metadata for the source.
destDesignTime.AcquireConnections(null);
destDesignTime.ReinitializeMetaData();
destDesignTime.ReleaseConnections();
8. 在源输出和目标之间创建路径
现在我们的源和目标适配器已准备就绪。源输出列也已准备好,现在我们将源输出列与目标输入列连接起来,以便目标了解其接收到的内容。为此,我们将在它们之间添加一个路径。 // Create the path from source to destination
IDTSPath100 pathDestination = dataFlowTask.PathCollection.New();
pathDestination.AttachPathAndPropagateNotifications(sourceOleDB.OutputCollection[0],
destinationOleDb.InputCollection[0]);
这里 AttachPathAndPropagateNotifications
方法从源 oledb 输出初始化目标输入。由于源数据库只有一个输出,我们可以安全地假设其数组位置为 0,目标同理。9. 将源输出列映射到目标输入列
// Get the destination's default input and virtual input.
IDTSInput100 destinationinput = destinationOleDb.InputCollection[0];
IDTSVirtualInput100 vdestinationinput = destinationinput.GetVirtualInput();
// Iterate through the virtual input column collection.
foreach (IDTSVirtualInputColumn100 vColumn in
vdestinationinput.VirtualInputColumnCollection)
{
IDTSInputColumn100 vCol = destDesignTime.SetUsageType(destinationinput.ID, vdestinationinput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
// check if the column exist in the destination table
string cinputColumnName = vColumn.Name;
var columnExist = (from item in destinationinput.ExternalMetadataColumnCollection.Cast<IDTSExternalMetadataColumn100>()
where item.Name == cinputColumnName
select item).Count();
// if yes map it
if (columnExist > 0)
destDesignTime.MapInputColumn(destinationinput.ID, vCol.ID,
destinationinput.ExternalMetadataColumnCollection[vColumn.Name].ID);
}
在这里,我们获取目标输入列,在设计视图中将其标记为可用,然后搜索目标表中匹配的列名。如果找到匹配项,我们使用 MapInputColumn 方法进行映射。10. 保存包并执行包
SSIS 文件实际上是 XML 文件。如果您将 dtsx 文件重命名为 xml,您可以看到我们设置的所有属性。我们将保存它,并在 Business Intelligence Studio 中查看它。Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
app.SaveToXml(string.Format(@"c:\workSamplePackage\SamplePackage{0}.dtsx",
DateTime.Now.ToString("hhmmss")),
objPackage, null);
objPackage.Execute();
objPackage.Dispose();
app = null;
截图
如果我们在 Business Intelligence Studio 中打开上面创建的包文件,它看起来会是这样的:
您可以清楚地看到我们添加的两个连接管理器。还有我们创建的源和目标连接。现在,如果您运行它,它会显示类似这样的内容:
它显示有 4 行数据已从源数据库传输到目标数据库。这里看一下目标表: