65.9K
CodeProject 正在变化。 阅读更多。
Home

SSIS:通过编码

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (9投票s)

2013年6月9日

CPOL

5分钟阅读

viewsIcon

54185

downloadIcon

763

使用 C# 分步创建 SSIS 包。

引言

SSIS,更广为人知的名称是“SQL Server Integration Services (SSIS)”,是 SQL Server 的一个组件。
根据Wikipedia

“SSIS 是一个数据集成和工作流应用程序的平台。它提供了一个快速灵活的数据仓库工具,用于数据提取、转换和加载 (ETL)。该工具还可以用于自动化 SQL Server 数据库的维护以及多维数据集数据的更新。”

在这篇文章中,我将通过 C# 演示在同一服务器上的两个表之间传输数据,并分步完全通过编码创建包。虽然通过 GUI 创建 SSIS 包非常容易(只需设置几个属性即可执行),但这里我将演示通过编码创建 SSIS 包的方法。

您将学到 (目录)

  1. 在数据库中创建两个表 (源和目标)
  2. 设置 SSIS 项目的初始任务
  3. 创建 SSIS 包
  4. 创建源和目标连接管理器对象
  5. 创建数据流任务 (管道) – 包的核心
  6. 创建源数据源并分配源连接管理器
  7.  创建目标数据源并分配目标连接管理器
  8. 在源输出和目标之间创建路径
  9. 将源输出列映射到目标输入列
  10. 保存包并执行实际代码

步骤详解 

1. 在数据库中创建两个表 (源和目标)

 创建两个名为 SourceTableDestinationTable1 的 dummy 表,字段相同,如下所示:
CREATE TABLE [dbo].[SourceTable](
    [ID] [int] NOT NULL,
    [Name] [varchar](20) NULL,
    [Age] [int] NULL
) ON [PRIMARY]
GO 
SET ANSI_PADDING OFF
GO
CREATE TABLE [dbo].[DestinationTable1](
    [ID] [int] NOT NULL,
    [Name] [varchar](20) NULL,
    [Age] [int] NULL 
) ON [PRIMARY] 

我还向源表中填充了一些 dummy 数据
ID 名称 年龄
1Alok30
2Ashish30
3Jasdeep30
4Ritesh35

2. 设置 SSIS 项目的初始任务

  • 创建一个 C# Windows 窗体项目。无需花哨的设计。
  • 在窗体上添加一个按钮,并包含 OnClick 事件处理程序,我们将在事件处理程序中进行所有编程。
  • 现在,将支持的 SSIS DotNet 程序集添加到项目中。为此,在解决方案资源管理器中右键单击“引用”,然后浏览到 %Program Files%\Microsoft SQL Server\100\SDK\Assemblies 并包含以下文件:
    • Microsoft.SQLServer.DTSPipelineWrap
    • Microsoft.SqlServer.DTSRuntimeWrap
    • Microsoft.SqlServer.ManagedDTS
  • 关闭“添加程序集”对话框并保存解决方案。

3. 创建 SSIS 包

以下代码将创建包对象:
   
Package objPackage = new Package();
            objPackage.Name = "SSISExample";
            objPackage.PackageType = DTSPackageType.DTSDesigner100;
            objPackage.VersionBuild = 1;
上面,我们创建了 SSIS 包对象,并将其命名为“SSISExample”,并将包类型定义为 DTSPackageType.DTSDesigner100。还有许多其他选项。请参阅此处(我们指的是我们正在创建面向设计器的包)

4. 创建源和目标连接管理器对象

 在继续之前,最好让我们的连接管理器运行起来。这里我创建了两个指向同一数据库的 Oledb 连接,这个连接将由 OleDB 源和 OleDB 目标使用。
var connectingString =
@"Data Source=localhost;Integrated Security=SSPI;Initial Catalog=TestDB;Provider=SQLNCLI10.1;Persist Security Info=True;"; 

ConnectionManager oleDBConnSrc = objPackage.Connections.Add("OLEDB"); 
oleDBConnSrc.ConnectionString = connectingString;
oleDBConnSrc.Name = "SourceConnection";
 
ConnectionManager oleDBDestination = objPackage.Connections.Add("OLEDB");
oleDBDestination.ConnectionString = connectingString; 
oleDBDestination.Name = "DestinationConnection";
这里我提供了连接字符串,其中包含我连接的 SQL 数据库的连接字符串。您需要注意一件事:您登录数据库的用户应该具有写入访问权限。oleDBConnSrcoleDBDestination 是两个 ConnectionManager 对象。

5. 创建数据流任务 (管道) – 包的核心

现在我们来到包中最重要的一部分,所有导入/导出都在这里进行。虽然它的后端名称是 Pipeline,但在设计器屏幕上它被称为“数据流任务”,我们所有源和目标的编码都将在该数据流任务中完成,而且创建起来非常简单。
//Create DataFlowTask in the package 
TaskHost dataFlowTaskHost = (TaskHost)objPackage.Executables.Add("SSIS.Pipeline.2"); 
dataFlowTaskHost.Name = @"SSISPipeline"; 
dataFlowTaskHost.FailPackageOnFailure = true; 
dataFlowTaskHost.FailParentOnFailure = true; 
dataFlowTaskHost.DelayValidation = false; 
dataFlowTaskHost.Description = @"Data Flow Task";

 //-----------Data Flow Inner component starts---------------- 
 MainPipe dataFlowTask = dataFlowTaskHost.InnerObject as MainPipe;
 
从我们的包对象中,我们将使用 objPackage.Executables.Add() 方法添加 Pipeline 对象,并传递 "SSIS.Pipeline.2" 作为要创建的对象名称,并为任务对象提供默认属性。然后,我们获取创建的数据流任务主管道 MainPipe,其余组件将附加到这个 MainPipe (数据流任务)。

6. 创建源数据源并分配源连接管理器

在步骤 5 中,我们的管道已成功创建。现在,使用其主管道,我们将添加源 oledb 连接。
// Create and configure an OLE DB source component. 
IDTSComponentMetaData100 sourceOleDB = dataFlowTask.ComponentMetaDataCollection.New();
sourceOleDB.ComponentClassID = "DTSAdapter.OLEDBSource.2"; 
CManagedComponentWrapper srcDesignTime = sourceOleDB.Instantiate(); 

// The ProvideComponentProperties method creates a default output. 
srcDesignTime.ProvideComponentProperties(); 
sourceOleDB.Name = "TestDB DATA Source";
在这里,我们在数据流任务对象中创建一个对象,并分配 ComponentClassID = "DTSAdapter.OLEDBSource.2",以告知包我们正在添加 OLEDBSource 适配器。之后,我们实例化其设计时组件,并获取其属性,我们将在后续分配。
// Assign the connection manager. 
sourceOleDB.RuntimeConnectionCollection[0].ConnectionManagerID = oleDBConnSrc.ID;
 sourceOleDB.RuntimeConnectionCollection[0].ConnectionManager =
 DtsConvert.GetExtendedInterface(oleDBConnSrc);
现在,将源数据库适配器与源连接管理器关联起来,上面的代码完成了这个部分。现在我们将设置打开表的属性以进行读取模式。
 // Set the custom properties of the source. 
 srcDesignTime.SetComponentProperty("AccessMode", 0); // Mode 0 : OpenRowset / Table - View 
 srcDesignTime.SetComponentProperty("OpenRowset", "[dbo].[SourceTable]"); 
 
 // Connect to the data source, and then update the metadata for the source. 
 srcDesignTime.AcquireConnections(null); 
 srcDesignTime.ReinitializeMetaData(); 
 srcDesignTime.ReleaseConnections(); 
 
这里 AccessMode=0 表示我们在视图模式下打开数据库,OpenRowSet 属性表示我们要打开的表。

7. 创建目标数据源并分配目标连接管理器

代码与源适配器类似,除了我们将传递 ComponentClassID"DTSAdapter.OleDbDestination",AccessMode = 3,并且 OpenRowSet 属性包含目标表。
IDTSComponentMetaData100 destinationOleDb = 
dataFlowTask.ComponentMetaDataCollection.New(); 
destinationOleDb.ComponentClassID = "DTSAdapter.OleDbDestination";

CManagedComponentWrapper destDesignTime = destinationOleDb.Instantiate(); 
destDesignTime.ProvideComponentProperties(); 

// Assign the connection manager. 
destinationOleDb.RuntimeConnectionCollection[0].ConnectionManagerID = oleDBDestination.ID; 
destinationOleDb.RuntimeConnectionCollection[0].ConnectionManager = 
DtsConvert.GetExtendedInterface(oleDBDestination); 
// Set the custom properties of the source.

destDesignTime.SetComponentProperty("AccessMode", 3); 
destDesignTime.SetComponentProperty("OpenRowset", 
"[dbo].[DestinationTable1]"); 
// Connect to the data source, and then update the metadata for the source. 
destDesignTime.AcquireConnections(null); 
destDesignTime.ReinitializeMetaData(); 
destDesignTime.ReleaseConnections();

8. 在源输出和目标之间创建路径

现在我们的源和目标适配器已准备就绪。源输出列也已准备好,现在我们将源输出列与目标输入列连接起来,以便目标了解其接收到的内容。为此,我们将在它们之间添加一个路径。
    // Create the path from source to destination 
IDTSPath100 pathDestination = dataFlowTask.PathCollection.New(); 
pathDestination.AttachPathAndPropagateNotifications(sourceOleDB.OutputCollection[0],
destinationOleDb.InputCollection[0]);
这里 AttachPathAndPropagateNotifications 方法从源 oledb 输出初始化目标输入。由于源数据库只有一个输出,我们可以安全地假设其数组位置为 0,目标同理。

9. 将源输出列映射到目标输入列

// Get the destination's default input and virtual input. 
IDTSInput100 destinationinput = destinationOleDb.InputCollection[0]; 
IDTSVirtualInput100 vdestinationinput = destinationinput.GetVirtualInput(); 

// Iterate through the virtual input column collection. 
foreach (IDTSVirtualInputColumn100 vColumn in 
vdestinationinput.VirtualInputColumnCollection) 
{ 
  IDTSInputColumn100 vCol = destDesignTime.SetUsageType(destinationinput.ID, vdestinationinput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
  
 // check if the column exist in the destination table 
  string cinputColumnName = vColumn.Name; 
  var columnExist = (from item in destinationinput.ExternalMetadataColumnCollection.Cast<IDTSExternalMetadataColumn100>() 
                     where item.Name == cinputColumnName 
                     select item).Count();        
 // if yes map it 
 if (columnExist > 0) 
 destDesignTime.MapInputColumn(destinationinput.ID, vCol.ID, 
                               destinationinput.ExternalMetadataColumnCollection[vColumn.Name].ID); 
 }
 
在这里,我们获取目标输入列,在设计视图中将其标记为可用,然后搜索目标表中匹配的列名。如果找到匹配项,我们使用 MapInputColumn 方法进行映射。

10. 保存包并执行包

SSIS 文件实际上是 XML 文件。如果您将 dtsx 文件重命名为 xml,您可以看到我们设置的所有属性。我们将保存它,并在 Business Intelligence Studio 中查看它。
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application(); 
app.SaveToXml(string.Format(@"c:\workSamplePackage\SamplePackage{0}.dtsx", 
                             DateTime.Now.ToString("hhmmss")), 
                             objPackage, null);
objPackage.Execute(); 
objPackage.Dispose(); 
app = null;

截图

如果我们在 Business Intelligence Studio 中打开上面创建的包文件,它看起来会是这样的:

您可以清楚地看到我们添加的两个连接管理器。还有我们创建的源和目标连接。现在,如果您运行它,它会显示类似这样的内容:

它显示有 4 行数据已从源数据库传输到目标数据库。这里看一下目标表:

看点

敬请期待本系列的更多文章!
© . All rights reserved.