使用 SSIS 对象模型导入数据






3.74/5 (8投票s)
介绍了如何使用 SSIS 对象模型将数据导入到 SQL Server 2005
引言
SQL Server 2005 Integration Service 提供了一套丰富的 API,在本文中,我将向您展示一种通过 C# 代码以编程方式创建和执行 SSIS 包的方法。
背景
几周前,我需要将一个巨大的 CSV(逗号分隔值)文件导入 SQL Server 数据库。嗯,这并不是一件非常困难的事情,我本不必为此写一篇文章。任何人都可以使用 SQL Server 2005 导入向导来完成这项工作,它是一个非常灵活的工具,可以处理此类任务。但我需要以编程方式完成这项工作。这促使我思考并在网上搜索解决方案。经过几个小时的研究,我发现 SQL Server 2005 提供了 SSIS(SQL Server 2005 Integration Service)丰富的 API,任何人都可以轻松地创建 SSIS 包(以前称为 DTS 包)并执行该包。不幸的是,我没有找到创建从 CSV 文件导入数据到 SQL Server 2005 数据库的包的代码。在我搜索过程中发现的大多数演示代码都是将数据从 SQL Server 导入到 SQL Server,或者从 SQL Server 导入到任何平面文件。通过遵循现有的演示代码,为我的目的实现一个包应该相对简单。但很快我发现这与那些示例代码略有不同。经过两天的深思熟虑,我创建了一个让我满意的代码库。在本文中,我将与您分享我的代码和想法。
使用代码
在我的代码库中,我为这个导入业务创建了两个项目。第一个项目(ImportLib
)是一个类库,实现了 SSIS 包创建任务;第二个项目(Import
)是一个 Windows 窗体应用程序,实际上是一个 DEMO,它以第一个项目为引用并启动一个导入作业。因此,研究第一个项目就足以对我的实际工作有一个很好的了解。
我创建的对象模型基本上基于几个接口,即 ImportLib.ISourceStorage
、ImportLib.IDestinationStorage
、ImportLib.IntegrationService.ISource
和 ImportLib.IntegrationService.IDestination
。在深入细节之前,我想表达一下我的想法。
在探索 SSIS 后,您会发现将数据从任何类型的数据源导入任何数据目标都非常容易。SSIS 对象模型实现了一些定义良好的接口来支持这个出色的功能。既然我正在以编程方式创建包,为什么不以一种可以更好地扩展的方式来实现我的代码呢?目前它将能够从 CSV 文件导入数据到 SQL 数据库,但谁知道明天我可能需要将数据从 Excel 文件导入到 SQL Server 或其他任何数据目标呢?这个想法促使我编写了 ImportLib.ISourceStorage
和 ImportLib.IDestinationStorage
接口。我想您已经对这些接口的用途有了一个大致的了解——对吗?是的,对于所有类型的源存储,我都定义了 ImportLib.ISourceStorage
接口。目前我只为 CSV 文件源实现了这个接口,但将来当我编写 Excel 源时,我将再次实现这个接口。同样的想法也适用于目标存储,接口是 ImportLib.IDestinationStorage
。
让我们来看看这些接口
/// <summary>
/// Defines a contact for the data source
/// </summary>
public interface ISourceStorage
{
/// <summary>
/// Initializes the data source
/// </summary>
/// <param name="properties">
/// A <see cref="T:IDictionary<string,
/// string>"/> that contains
/// the initialization properties</param>
void InitializeSource(IDictionary<string, string> properties);
/// <summary>
/// Get the schema table for the data source
/// </summary>
/// <returns>An instance of <
/// see cref="T:DataTable"/> that contains the schema
/// information of the data source</returns>
DataTable GetSchemaTable();
/// <summary>
/// Get the Category of the underlying persistent storage
/// </summary>
StorageMedium StorageMedium { get; }
/// <summary>
/// Get or set the mapping manager
/// </summary>
ColumnMappingController MapManager { get; set; }
}
并且
/// <summary>
/// Defines a contact for the Data destination
/// </summary>
public interface IDestinationStorage
{
/// <summary>
/// Initializes the data destination
/// </summary>
/// <param name="properties">
/// A <see cref="T:IDictionary<string,
/// string>"/> that contains
/// the initialization properties</param>
void InitializeDestination(IDictionary<string,
string> properties);
/// <summary>
/// Get the Category of the underlying persistent storage
/// </summary>
StorageMedium StorageMedium { get; }
/// <summary>
/// Get or set the mapping manager
/// </summary>
ColumnMappingController MapManager { get; set; }
}
现在让我们看看具体的实现。ImportLib.Delimited.DelimitedDataSource
是实现了 ImportLib.ISourceStorage
的具体类。这个类简单地封装了接口定义的、针对 CSV 文件的功能。它使用 System.Data.OleDb.OleDbConnection
类读取 CSV 文件以获取源架构。另一个我命名为 ImportLib.Sql.SqlStorageDestination
的具体类实现了 ImportLib.IDestinationStorage
接口。这个类提供了接口定义的功能,并且还包含用于创建目标数据存储(数据库和表)的例程。
到目前为止一切顺利。现在,让我们专注于 SSIS 特定的类和接口。嗯,在这里,我再次定义了之前提到的两个接口:ImportLib.IntegrationService.ISource
和 ImportLib.IntegrationService.IDestination
。让我们来看看它们。
/// <summary>
/// Defines a contact for the SQL Server
/// Integration Service source objects.
/// </summary>
/// <remarks>
/// Different SSIS source objects should satisfy this interface.
/// Like
/// Flat file data source, OleDB data source etc.
/// </remarks>
public interface ISource
{
/// <summary>
/// get or set the <
/// see cref="T:ImportLib.IDataSource"/> instance
/// that represents the physical data source.
/// </summary>
ISourceStorage DataSource
{
get;
set;
}
/// <summary>
/// Creates a connection manager instance for a given source file
/// into the context of the given <
/// see cref="T:ImportLib.IntegrationService
/// .SsisPackage"/>
/// object.
/// </summary>
/// <param name="package">
/// An instance of <
/// see
/// cref="T:ImportLib.IntegrationService
/// .SsisPackage"/>.</param>
void CreateConnection(DTSPackage package);
/// <summary>
/// Creates the source dataflow component.
/// </summary>
/// <param name="package">The package
/// instance.</param>
/// <param name="dataflowTask">
/// The dataflow task under which the component will be
/// created.</param>
/// <returns>An instance of <see
/// cref="T:SsisDataflowComponent"/>.</returns>
DataflowComponent CreateSourceDataFlowComponent(
DTSPackage package, DTSExecutable dataflowTask);
/// <summary>
/// Initialize the source dataflow component
/// </summary>
/// <param name="sourceDataFlowComponent">
/// An instance of <see cref="T:
/// SsisDataflowComponent"/></param>
void InitializeDataflowComponent(DataflowComponent
sourceDataFlowComponent);
}
还有另一个是
/// <summary>
/// Defines a contact for the SQL Server Integration
/// Service Destination objects.
/// </summary>
/// <remarks>
/// Different data destinations should satisfy
/// this interface. such as Flat file
/// destinations, OleDB destination etc.
/// </remarks>
public interface IDestination
{
/// <summary>
/// Get or set the <see cref="T:ImportLib.
/// IDataDestination"/> instance
/// that represents the physical data destination.
/// </summary>
IDestinationStorage DataDestination
{
get;
set;
}
/// <summary>
/// Get or set the <see cref="T:ImportLib.
/// ISsisSource"/> instance
/// </summary>
ISource SsisSource
{
get;
set;
}
/// <summary>
/// Creates a connection manager instance for a given source file
/// into the context of the
/// given <see cref="T:ImportLib.IntegrationService.
/// SsisPackage"/>
/// object.
/// </summary>
/// <param name="package">
/// An instance
/// of <see cref="T:ImportLib.
/// IntegrationService.SsisPackage"/>.</param>
void CreateConnection(DTSPackage package);
/// <summary>
/// Creates the task that will create the
/// destination storage (ex. database, table etc);
/// </summary>
/// <param name="package">The
/// package where the task should be created</param>
/// <returns>
/// An instance of <see cref="T:ImportLib.IntegrationService.
/// SsisExecutable"/></returns>
DTSExecutable CreateStorageCreationTask(DTSPackage package);
/// <summary>
/// Creates the destination dataflow component.
/// </summary>
/// <param name="package">The package
/// instance.</param>
/// <param name="dataflowTask">
/// The dataflow task under which the component will
/// be created.</param>
/// <returns>
/// An instance of <see cref="T:SsisDataflowComponent
/// "/>.</returns>
DataflowComponent CreateDestinationDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask);
/// <summary>
/// Initialize the destination dataflow component
/// </summary>
/// <param name="destinationDataFlowComponent">
/// An instance of <see
/// cref="T:SsisDataflowComponent"/></param>
void InitializeDataflowComponent(DataflowComponent
destinationDataFlowComponent);
}
这些接口严格定义了 SSIS 特定的例程,例如创建连接、创建数据流组件以及一些初始化例程。定义 SSIS 数据流超出了本文的范围,有关这些 SSIS 组件的详细信息,请参阅 MSDN。现在我们将看一下具体的实现。ImportLib.IntegrationService.Specialized.FlatFileSource
是 ImportLib.IntegrationService.ISource
接口的具体实现。看看这个类。这里重要的部分是我们必须显式创建源列(CreateSourceColumns
方法),SSIS 永远不会为您创建这些列。
namespace ImportLib.IntegrationService.Specialized
{
/// <summary>
/// Contains the implementations of the
/// <see
/// cref="T:ImportLib.IntegrationService.
/// ISsisSource"/>
/// interface.
/// </summary>
public class FlatFileSource : ISource
{
// The delimited data source instance
private DelimitedDataSource delimitedDataSource;
// The Moniker text for the flatfile source
public const string FlatFileMoniker = @"FLATFILE";
// The source data flow component GUID
public const string SourceDataFlowComponentID
= "{90C7770B-DE7C-435E-880E-E718C92C0573}";
// schema table
private DataTable schemaTable = null;
// The connection manager instance
private ConnectionManager connectionManager;
/// <summary>
/// Creates a new instance
/// </summary>
public FlatFileSource()
{
}
#region ISsisSource Members
/// <summary>
/// get or set the <see
/// cref="T:ImportLib.IDataSource"/> instance
/// that represents the physical data source.
/// </summary>
public ISourceStorage DataSource
{
get { return delimitedDataSource; }
set
{
// initializing
delimitedDataSource = value as DelimitedDataSource;
// Get the schema table
schemaTable = value.GetSchemaTable();
// Assert
Debug.Assert(delimitedDataSource != null);
}
}
/// <summary>
/// Creates a connection manager instance for a given source file
/// into the context of the given <see
/// cref="T:ImportLib.IntegrationService.SsisPackage"/>
/// object.
/// </summary>
/// <param name="package">An instance of
/// <see cref="T:ImportLib.IntegrationService.
/// SsisPackage"/>.</param>
public void CreateConnection(DTSPackage package)
{
#region Logging
Logger.WriteInformation(
"Creating connection to the source file.");
#endregion
// creating a connection manager
// instance using the FLATFILE moniker
connectionManager =
package.InnerObject.Connections.Add(FlatFileMoniker);
connectionManager.ConnectionString =
delimitedDataSource.FileName;
connectionManager.Name =
"SSIS Connection Manager for Files";
connectionManager.Description =
string.Concat("SSIS Connection Manager");
// Setting some common properties of the connection manager object
connectionManager.Properties
["ColumnNamesInFirstDataRow"].
SetValue(connectionManager,
delimitedDataSource.FirstRowIsHeader);
connectionManager.Properties["Format"].SetValue
(connectionManager, "Delimited");
connectionManager.Properties["HeaderRowDelimiter"].
SetValue(connectionManager,
delimitedDataSource.HeaderRowDelimiter);
if (delimitedDataSource.TextQualifier != null)
{ // If user has been specified a text qualifier then put
// it into the connection string property
connectionManager.Properties["TextQualifier"]
.SetValue(connectionManager,
delimitedDataSource.TextQualifier);
}
// create the source columns into the connection manager
CreateSourceColumns();
#region Logging
Logger.WriteInformation(
"Creating connection to the source file.....Completed");
#endregion
}
/// <summary>
/// Creates the source dataflow component.
/// </summary>
/// <param name="package">The package
/// instance.</param>
/// <param name="dataflowTask">
/// The dataflow task under which the component will be
/// created.</param>
/// <returns>An instance of <see
/// cref="T:SsisDataflowComponent"/>.</returns>
public DataflowComponent CreateSourceDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask)
{
// create the component
DataflowComponent sourceDataFlowComponent =
new DataflowComponent(dataflowTask, SourceDataFlowComponentID,
"Source Data Flow component");
return sourceDataFlowComponent;
}
/// <summary>
/// Initialize the source dataflow component
/// </summary>
/// <param name="sourceDataFlowComponent">
/// An instance of <see
/// cref="T:SsisDataflowComponent"/>
/// </param>
public void InitializeDataflowComponent(
DataflowComponent sourceDataFlowComponent)
{
#region Logging
Logger.WriteInformation(
"Initializing the managed instance for the source file.");
#endregion
// load the COM for the given GUID
CManagedComponentWrapper managedFlatFileInstance =
sourceDataFlowComponent.ComponentInstance;
// get the populate the properties
managedFlatFileInstance.ProvideComponentProperties();
// putting the connection
if (
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection.Count > 0)
{ // If connection is necessary
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].ConnectionManagerID =
connectionManager.ID;
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.ToConnectionManager90
(connectionManager);
}
// establish a connection
managedFlatFileInstance.AcquireConnections(null);
// Initialize the metadata
managedFlatFileInstance.ReinitializeMetaData();
// create the mapping now
IDTSExternalMetadataColumn90 exOutColumn;
foreach (IDTSOutputColumn90 outColumn in
sourceDataFlowComponent.InnerObject.
OutputCollection[0].OutputColumnCollection)
{ // create the MAP
exOutColumn =
sourceDataFlowComponent.InnerObject.
OutputCollection[0].
ExternalMetadataColumnCollection[outColumn.Name];
// map it
managedFlatFileInstance.MapOutputColumn(
sourceDataFlowComponent.InnerObject.
OutputCollection[0].ID, outColumn.ID,
exOutColumn.ID, true);
}
// Release the connection now
managedFlatFileInstance.ReleaseConnections();
#region Logging
Logger.WriteInformation
("Initializing the managed instance for the source
file......completed");
#endregion
}
#endregion
/// <summary>
/// Creates the source columns for the flat file connection manager
/// instance
/// </summary>
private void CreateSourceColumns()
{
// get the actual connection manger instance
RuntimeWrapper.IDTSConnectionManagerFlatFile90
flatFileConnection =
connectionManager.InnerObject
as RuntimeWrapper.IDTSConnectionManagerFlatFile90;
RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
RuntimeWrapper.IDTSName90 name;
// trace the current count
Debug.WriteLine(flatFileConnection.Columns.Count);
DataTable schemaTable
= DataSource.GetSchemaTable(); // get the schema table
foreach (DataRow row in schemaTable.Rows)
{ // iterate
string colName
= row["ColumnName"] as string;
// get the col name
// now create a new column for the connection manager
column
= flatFileConnection.Columns.Add();
// if this is the last row
if (schemaTable.Rows.IndexOf(row)
== (schemaTable.Rows.Count - 1))
column.ColumnDelimiter =
delimitedDataSource.HeaderRowDelimiter;
// add the row delimiter
else
column.ColumnDelimiter = delimitedDataSource.Delimiter;
column.TextQualified =
delimitedDataSource.TextQualifier != null;
column.ColumnType = "Delimited";
column.DataType =
RuntimeWrapper.DataType.DT_WSTR;
column.DataPrecision = 0;
column.DataScale = 0;
name = (RuntimeWrapper.IDTSName90)column;
name.Name = colName;
}
}
}
}
现在让我们看看对应的目标实现。
namespace ImportLib.IntegrationService.Specialized
{
/// <summary>
/// Contains the implementation of the IDataDestination interface.
/// </summary>
public class SqlServerDataDestination : IDestination
{
// The SQL server data destination
private SqlStorageDestination sqlDataDestination;
// The Moniker text for the Ole DB
public const string OleDBMoniker = "OLEDB";
// the component ID
public const string OleDBDestinationDataFlowComponentID
= "{E2568105-9550-4F71-A638-B7FE42E66922}";
// source
private ISource ssisSource;
// connection manager
private ConnectionManager connectionManager;
#region ISsisDestination Members
/// <summary>
/// Get or set the <see
/// cref="T:ImportLib.IDataDestination"/> instance
/// that represents the physical data destination.
/// </summary>
public IDestinationStorage DataDestination
{
get { return sqlDataDestination; }
set
{
// initializing
sqlDataDestination = value as SqlStorageDestination;
// Assert
Debug.Assert(sqlDataDestination != null);
}
}
/// <summary>
/// Get or set the <see
/// cref="T:ImportLib.ISsisSource"/> instance
/// </summary>
public ISource SsisSource
{
get { return ssisSource; }
set { ssisSource = value; }
}
/// <summary>
/// Creates a connection manager instance for a given source file
/// into the context of the given <see
/// cref="T:ImportLib.IntegrationService.SsisPackage"/>
/// object.
/// </summary>
/// <param name="package">An instance of
/// <see cref="T:ImportLib.IntegrationService.SsisPackage
/// "/>.</param>
public void CreateConnection(DTSPackage package)
{
// Creating a connection using the oledb moniker
connectionManager =
package.InnerObject.Connections.Add(OleDBMoniker);
connectionManager.ConnectionString =
GetSsisConnectionString();
connectionManager.Name =
"SSIS Connection Manager for Oledb";
connectionManager.Description =
string.Concat(
"SSIS Connection Manager for ",
sqlDataDestination.DatabaseName);
}
/// <summary>
/// Creates the task that will create the destination
/// storage (ex. database, table etc);
/// </summary>
/// <param name="package">The package
/// where the task should be created</param>
/// <returns>An instance of
/// <see cref="T:ImportLib.
/// IntegrationService.SsisExecutable"/></returns>
public DTSExecutable CreateStorageCreationTask(DTSPackage package)
{
// get the SQL task type
Type taskType = typeof(ExecuteSQLTask);
// create a task of type ExecuteSQLTask
DTSExecutable executable = new DTSExecutable(package, taskType);
// now configuring the new task
TaskHost taskHost = executable.InnerObject as TaskHost;
// get the Task host instance
ExecuteSQLTask sqlTask = taskHost.InnerObject as ExecuteSQLTask;
// get the SQL task from the host
sqlTask.Connection = connectionManager.Name;
// set the connection manager
sqlTask.SqlStatementSource =
sqlDataDestination.GetDestinationTableCreationSql
(ssisSource.DataSource.GetSchemaTable());
// set the SQL that generates the table
return executable;
}
/// <summary>
/// Creates the destination dataflow component.
/// </summary>
/// <param name="package">The package
/// instance.</param>
/// <param name="dataflowTask">The dataflow task
/// under which
/// the component will be created.</param>
/// <returns>An instance of <see
/// cref="T:SsisDataflowComponent"/>.</returns>
public DataflowComponent CreateDestinationDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask)
{
#region Logging
Logger.WriteInformation(
"Creating managed instances for the destination database");
#endregion
// create the component now
DataflowComponent destinationDataFlowComponent
= new DataflowComponent(dataflowTask,
OleDBDestinationDataFlowComponentID,
"Destination Oledb Component");
// Before going thru the initialization we need
// to create the destination table
// because the SSIS object model will try to
// access that table fore reading the metadata
sqlDataDestination.CreateDataStore
(ssisSource.DataSource.GetSchemaTable());
// get the COM instance
CManagedComponentWrapper managedOleInstance =
destinationDataFlowComponent.ComponentInstance;
// populate the properties
managedOleInstance.ProvideComponentProperties();
// setting the connection
if (destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection.Count > 0)
{ // If connection is necessary
destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].
ConnectionManagerID =
connectionManager.ID;
destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].
ConnectionManager =
DtsConvert.ToConnectionManager90
(connectionManager);
}
// Set the custom properties.
managedOleInstance.SetComponentProperty(
"AccessMode", 0);
// Table of View mode
managedOleInstance.SetComponentProperty
("AlwaysUseDefaultCodePage", false);
// Default Codepage
managedOleInstance.SetComponentProperty
("DefaultCodePage", 1252);
// Set it
managedOleInstance.SetComponentProperty
("FastLoadKeepIdentity", false);
// Fast load
managedOleInstance.SetComponentProperty
("FastLoadKeepNulls", false);
managedOleInstance.SetComponentProperty
("FastLoadMaxInsertCommitSize", 0);
managedOleInstance.SetComponentProperty
("FastLoadOptions",
"TABLOCK,CHECK_CONSTRAINTS");
managedOleInstance.SetComponentProperty("OpenRowset",
string.Format("[{0}].[dbo].[{1}]",
sqlDataDestination.DatabaseName,
sqlDataDestination.TableName));
#region Logging
Logger.WriteInformation(
"Creating managed instances for the destination
database....completed");
#endregion
return destinationDataFlowComponent;
}
/// <summary>
/// Initialize the destination dataflow component
/// </summary>
/// <param name="destinationDataFlowComponent">
/// An instance of <see
/// cref="T:SsisDataflowComponent"/></param>
public void InitializeDataflowComponent(DataflowComponent
destinationDataFlowComponent)
{
#region Logging
Logger.WriteInformation(
"Creating the destination columns and their mappings");
#endregion
// Get the COM instance
CManagedComponentWrapper managedOleInstance
= destinationDataFlowComponent.ComponentInstance;
// Now activate a connection and create the mappings
// Establish a connection
managedOleInstance.AcquireConnections(null);
// initialize the metadata
managedOleInstance.ReinitializeMetaData();
// Get the destination's default input and virtual input.
IDTSInput90 input = destinationDataFlowComponent.
InnerObject.InputCollection[0];
IDTSVirtualInput90 vInput = input.GetVirtualInput();
// Iterate through the virtual input column collection.
foreach (IDTSVirtualInputColumn90 vColumn
in vInput.VirtualInputColumnCollection)
{
bool res = sqlDataDestination.MapManager.
IsSuppressedSourceColumn
(vColumn.Name,ssisSource.DataSource.GetSchemaTable());
if (!res)
{
// Call the SetUsageType method of the destination
// to add each available virtual input column as
// an input column.
managedOleInstance.SetUsageType(
input.ID, vInput, vColumn.LineageID,
DTSUsageType.UT_READONLY);
}
}
IDTSExternalMetadataColumn90 exColumn;
foreach (IDTSInputColumn90 inColumn in
destinationDataFlowComponent.InnerObject.
InputCollection[0].InputColumnCollection)
{ // create the map
exColumn = destinationDataFlowComponent.InnerObject.
InputCollection[0].
ExternalMetadataColumnCollection[inColumn.Name];
string destName = sqlDataDestination.MapManager.
GetDestinationColumn(exColumn.Name).ColumnName;
exColumn.Name = destName;
managedOleInstance.MapInputColumn(
destinationDataFlowComponent.
InnerObject.InputCollection[0].ID,
inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstance.ReleaseConnections();
// Now remove the table that we did create
// for the SSIS object model
sqlDataDestination.DeleteDataStore();
#region Logging
Logger.WriteInformation
("Creating the destination columns and their
mappings.....completed");
#endregion
}
#endregion
/// <summary>
/// Get the SSIS compatible connection string.
/// </summary>
/// <returns>A Connection string that is compatible with
/// SSIS</returns>
/// <remarks>
/// The SSIS Oledb connections uses a provider different than
/// usual SQL client provider.
/// It is "SQLNCLI". Without this provider the SSIS cant
/// create a connection. (!)
/// On the other hand this provider (SQLNCLI) cant be used as SQL
/// Client (.NET class) connections.
/// So this procedure converts a SQL client connection string to SSIS
/// compatible connection
/// string by modifying the provider.
/// </remarks>
private string GetSsisConnectionString()
{
//connectionManager.ConnectionString =
"Data Source=VSTS;
Initial Catalog=TEST;Provider=SQLNCLI;Integrated
Security=SSPI;Auto Translate=false;";
//ConMgr.ConnectionString = "
// Data Source=VSTS;Initial Catalog=TEST;Integrated
// Security=True";
string connectionString
= sqlDataDestination.ConnectionString;
// get the SQL connection string
Dictionary<string,
string> connectionProperties =
new Dictionary<string, string>();
foreach( string part in
connectionString.Split(";".ToCharArray()))
{ // Iterate thru the properties of the connection string
string[] keyValue = part.Split("=".ToCharArray(),
StringSplitOptions.RemoveEmptyEntries);
if (keyValue != null && keyValue.Length == 2)
{
string propertyName = keyValue[0].Trim();
// the name of the property
string valueName = keyValue[1].Trim();
// the value of the property
// create the entry
connectionProperties.Add(propertyName, valueName);
}
}
// Now update these followings
connectionProperties["Provider"]
= "SQLNCLI";
connectionProperties["Integrated Security"]
= "SSPI";
connectionProperties["Auto Translate"]
= "false";
// Now we are going to create the SSIS compatible
// connectionstring
StringBuilder ssisCompatibaleConnectionString
= new StringBuilder();
for (Dictionary<string, string>.Enumerator iterator
=
connectionProperties.GetEnumerator();
iterator.MoveNext(); )
{ // Iterate
if (ssisCompatibaleConnectionString.Length > 0)
{ // If already there is some properties added
ssisCompatibaleConnectionString.Append(";");
}
ssisCompatibaleConnectionString.Append(
string.Format("{0}={1}",
iterator.Current.Key, iterator.Current.Value));
}
return ssisCompatibaleConnectionString.ToString();
}
}
}
除了这些核心的导入业务类之外,我还为 SSIS 包编写了一个日志提供程序,通过它可以在包执行期间将日志记录到 GUI。创建日志提供程序意味着您必须扩展 Microsoft 提供的 LogProviderBase
类。这是我的日志提供程序类。
namespace ImportLib.IntegrationService.Logging
{
/// <summary>
/// A custom log provider class
/// </summary>
/// <author>
/// Moim Hossain
/// </author>
[DtsLogProvider(DisplayName = "LogProvider",
Description = "
Log provider for DTS packages.",
LogProviderType = "Custom")]
public class EventLogProvider : LogProviderBase
{
/// <summary>
/// Configuration string
/// </summary>
public override string ConfigString
{
get
{
return string.Empty;
}
set
{
}
}
/// <summary>
/// Open log
/// </summary>
public override void OpenLog()
{
base.OpenLog();
}
/// <summary>
/// Closing log
/// </summary>
public override void CloseLog()
{
base.CloseLog();
}
/// <summary>
/// Initializations
/// </summary>
/// <param name="connections"></param>
/// <param name="events"></param>
/// <param name="refTracker"></param>
public override void InitializeLogProvider
(Connections connections,
IDTSInfoEvents events,
ObjectReferenceTracker refTracker)
{
base.InitializeLogProvider(connections, events, refTracker);
}
/// <summary>
/// Write a log
/// </summary>
/// <param name="logEntryName"></param>
/// <param name="computerName"></param>
/// <param name="operatorName"></param>
/// <param name="sourceName"></param>
/// <param name="sourceID"></param>
/// <param name="executionID"></param>
/// <param name="messageText"></param>
/// <param name="startTime"></param>
/// <param name="endTime"></param>
/// <param name="dataCode"></param>
/// <param name="dataBytes"></param>
public override void Log(string logEntryName,
string computerName,
string operatorName,
string sourceName,
string sourceID,
string executionID, string messageText,
DateTime startTime,
DateTime endTime, int dataCode, byte[] dataBytes)
{
//base.Log(logEntryName, computerName, operatorName,
// sourceName, sourceID, executionID,
// messageText, startTime, endTime, dataCode, dataBytes);
LogEventArgs e = new
LogEventArgs(logEntryName,computerName,messageText);
//
OnLogCreated(e);
}
/// <summary>
///
/// </summary>
/// <param name="e"></param>
protected virtual void OnLogCreated(LogEventArgs e)
{
LogCreatedDelegate mLogCreated = this.LogCreated;
if (mLogCreated != null)
{
mLogCreated(this, e);
}
}
/// <summary>
///
/// </summary>
public event LogCreatedDelegate LogCreated;
}
/// <summary>
///
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public delegate void LogCreatedDelegate ( object sender , LogEventArgs e );
}
好了,最后但同样重要的一点是映射。这意味着,在导入任务期间,可以将一个列的数据导入到另一个列。因此,您可以提供源列和目标列之间的映射。您甚至可以在导入任务期间忽略一些源列的数据。尽管我没有创建用于定义映射的 GUI,但我通过代码创建了映射。但要创建一个交互式 GUI 让用户指定映射,这仅仅是时间问题。这是封装映射信息的类。
namespace ImportLib.Mappings
{
/// <summary>
/// Encapsulates the mapping information for a source data source
/// and a destination data source
/// </summary>
[Serializable()]
public class ColumnMappingController
{
/// <summary>
/// Creates a new instance
/// </summary>
public ColumnMappingController()
{
mappings = new List<Map>();
}
private List<Map> mappings;
/// <summary>
/// Get or set the mappings
/// </summary>
public List<Map> Mappings
{
get { return mappings; }
set { mappings = value; }
}
private Column[] destinationColumns;
/// <summary>
/// Get or set the destination columns
/// </summary>
public Column[] DestinationColumns
{
get { return destinationColumns; }
set { destinationColumns = value; }
}
/// <summary>
/// Get the destination columns array where the
/// columns are not bind to any source columns
/// </summary>
public Column[] UnmappedDestinationColumns
{
get
{
List<Column> unmappedColumns = new List<Column>();
foreach (Column destinationColumn in destinationColumns)
{ // iterate
if (!ContainsInDestinationMap(destinationColumn))
{ // if no mapping found
unmappedColumns.Add(destinationColumn);
}
}
return unmappedColumns.ToArray();
}
}
/// <summary>
/// Get the source columns where the columns are not bind with any
/// destination columns
/// </summary>
public Column[] SuppressedSourceColumns(DataTable srcSchemaTable)
{
List<Column> suppressedColumns = new List<Column>();
foreach (DataRow row in srcSchemaTable.Rows)
{ // iterate
string columnName = row["columnName"] as string;
if (!ContainsInSourceMap(columnName))
{ // if no mapping found
suppressedColumns.Add(new Column(columnName));
}
}
return suppressedColumns.ToArray();
}
/// <summary>
/// Determine if the specified source column is in suppressed
/// list or not
/// </summary>
public bool IsSuppressedSourceColumn(
string sourceColumnName, DataTable srcSchemaTable)
{
return Array.IndexOf<Column>(
SuppressedSourceColumns(srcSchemaTable),
new Column(sourceColumnName)) > -1;
}
/// <summary>
/// Get the destination column for a given source column
/// </summary>
/// <param name="sourceColumnName">The
/// specified source column</param>
/// <returns>An instance of <see
/// cref="T:ImportLib.Mappings.Column"/>
/// which is the destination for the given source column
/// name.</returns>
public Column GetDestinationColumn(string sourceColumnName)
{
foreach (Map map in mappings)
{ // iterate
if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
return map.DestinationColumn;
}
throw new ApplicationException
("No mapping defined for the source column " +
sourceColumnName);
}
/// <summary>
/// Determines if the specified column is contains into the
/// mapping as a source
/// </summary>
/// <param name="destinationColumn">The source
/// column</param>
/// <returns>
/// <c>true</c> if the source column found into
/// the mapping, <c>false</c> otherwise.</returns>
private bool ContainsInSourceMap(string sourceColumnName)
{
foreach (Map map in mappings)
{ // iterate
if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
return true;
}
return false;
}
/// <summary>
/// Determines if the specified column is contains into the
/// mapping as a destination
/// </summary>
/// <param name="destinationColumn">
/// The destination column</param>
/// <returns><c>true</c>
/// if the destination column found into the mapping,
/// <c>false</c> otherwise.</returns>
private bool ContainsInDestinationMap(Column destinationColumn)
{
foreach (Map map in mappings)
{ // iterate
if (map.DestinationColumn.Equals(destinationColumn))
return true;
}
return false;
}
/// <summary>
/// Validate the content
/// </summary>
private void Validate()
{
//if (sourceColumns == null)
throw
new NullReferenceException
("SourceColumns is not set to an instance of an object");
if (destinationColumns == null)
throw
new NullReferenceException(
"DestinationColumns is
not set to an instance of an object");
}
}
}
您会看到,当我创建一个导入作业时,我会将这个类的实例传递给导入管理器。因此,您可以从代码中创建这个实例(就像我所做的那样),或者让您的用户从 GUI 中创建它。
希望您会喜欢这个!
关注点
我还没有编写任何可以执行数据转换的代码,例如,您需要在数据被导入到目标数据存储之前对其进行一些处理。我希望很快能在这方面编写一些代码,然后我将尝试发布一篇关于该主题的文章。