65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SQL SSIS 和 BIML 自动执行 Salesforce 数据复制

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2019 年 6 月 19 日

CPOL
viewsIcon

5642

本文演示了如何使用 Biml 和自定义 SSIS 组件来动态构建 SSIS 任务(每个 Salesforce 实体一个),将 Salesforce 数据复制到 Microsoft SQL Server 数据库。

将 SQL Server 作为关键业务数据的备份,可以为防止数据丢失提供重要的安全网,并使用户能够更轻松地将数据与报表、分析等功能结合。Biml 是一种 XML 方言,可用于创建 Microsoft SQL Server BI 对象,例如 SSIS 包。将自定义 SSIS 组件与 Biml 配对,可以轻松构建访问 Salesforce 等标准 SSIS 连接之外的数据源的 SSIS 包。主要优势包括:

  • 内置元数据发现 — 许多自定义 SSIS 组件的元数据公开方式与使用 SQL Server 类似,甚至可以动态生成无架构数据源的架构
  • 动态 SSIS 任务生成 — 使用 Biml 中的代码片段,通过迭代发现的元数据来构建 SSIS 任务
  • 读写外部源 — 原生源和目标组件使外部数据看起来就像数据库一样

本文演示了如何使用 Biml 和自定义 SSIS 组件来动态构建 SSIS 任务(每个 Salesforce 实体一个),将 Salesforce 数据复制到 Microsoft SQL Server 数据库。我们将逐节介绍 Biml 文件,但已在文章末尾附上完整的 Biml 文件。本文使用 CData SSIS 组件,但任务生成原则适用于任何自定义 SSIS 组件。

入门

为了在 Visual Studio 中的 SSIS 项目中使用 Biml,请安装 BimlExpress。安装 BimlExpress 后,打开 Visual Studio,创建一个新的 Integration Services 项目,然后添加一个新的 Biml 文件。

构建 Biml 文件

使用 Biml,您可以编写脚本来动态生成 SSIS 项目、包和任务。要查看现有项目的 Biml 文件(并了解如何在项目中将 Biml 用于任何自定义 SSIS 任务),只需创建您的任务,然后右键单击项目并选择“将 SSIS 包转换为 Biml”。

C# 代码

  1. 使用指令 <#@ .. #> 导入必要的命名空间和 CData SSIS 组件 for Salesforce 的程序集。
<#@ template language="C#" hostspecific="true"#>
<#@ import namespace="System.Data"#>
<#@ import namespace="System.IO"#>
<#@ import namespace="System.Collections"#>
<#@ import namespace="System.Data.CData.Salesforce"#>
<#@ assembly name="C:\Program Files\CData\CData SSIS Components for Salesforce 2018\lib\CData.SSIS2017.Salesforce.dll"#>
  1. 在新的控制片段 <# ... #> 中,编写代码以检索外部数据源的元数据。使用 Biml 时,通常将元数据存储在数据库中。对于 CData 组件,您只需编写 ADO.NET 代码即可动态检索元数据。首先,创建将在整个 Biml 脚本中使用的值的变量,包括 Salesforce 的连接字符串以及用于存储 Salesforce 元数据的结构。
var salesforceConnectionString = "User=username;Password=password;SecurityToken=Your_Security_Token;";
var replicationServer = "SERVER";
var replicationCatalog = "CATALOG";
var replicationUserID = "sqluser";
var replicationPassword = "sqlpassword";
List<string> allEntityNames = new List<string>();
Hashtable entitySchema = new Hashtable();
  1. 在定义变量的同一个控制片段中,使用 ADO.NET 代码以编程方式查询 Salesforce 实体(表)和字段(列)。
using (SalesforceConnection connection = new SalesforceConnection(salesforceConnectionString)) {
  connection.Open();
  var entities = connection.GetSchema("Tables").Rows;
  foreach (DataRow entity in entities)
  {
    allEntityNames.Add(entity["TABLE_NAME"].ToString());
  }
  foreach (string entity in allEntityNames){
    var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;
    entitySchema.Add(entity,columns);
  }
}

类片段

在我们的 Biml 脚本中,有几个地方会动态创建重复的 XML 元素(主要用于 SSIS 任务中的列)。为了避免重复代码,请添加一个类片段 <#+ ... #> 并创建一个具有用于合并重复代码的方法的帮助类(完整代码在文章末尾)。

  1. 添加公共静态变量以确定要创建的 XML 元素的类型。
public static int OUTPUT_WITH_ERROR = 0;
public static int EXTERNAL = 1;
public static int OUTPUT = 2;
public static int DATAOVERRIDE_COLUMN = 4;
  1. 添加一个公共方法来构建 SQL 语句,用于在 ExecuteSQL 任务中删除现有表并为复制的数据创建新表。
// Dynamically builds a DROP TABLE and CREATE statement
// for each entity (table) in Salesforce using the table name and metadata.
public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {
  ...
}
  1. 添加一个公共方法来构建基于列的 XML 元素集合。
// Dynamically build various column-based XML elements
// for each entity (table) in Salesforce based on the column 
// metadata and the parent element
public static string GetColumnDefs(DataRowCollection columns, int columnType){
  ...
}

Biml 脚本

现在您已经拥有了表元数据和用于减少重复代码的帮助类,请编写 Biml 脚本来动态创建复制包。

  1. 首先,为自定义 SSIS 任务添加一个 CustomSsisConnection 元素。请注意,ObjectData 属性必须经过 XML 编码。典型的连接字符串类似如下(注意 ConnectionString 属性中使用了 salesforceConnectionString 变量)
<SalesforceConnectionManager>
  <Property Name="ConnectionString"><#=salesforceConnectionString#></Property>
</SalesforceConnectionManager>

配置自定义 SSIS 任务的连接后,配置复制数据库的连接。完成的 Connections 元素如下所示(注意使用文本片段 <#= ... #> 添加连接字符串值的变量)

<Connections>
  <CustomSsisConnection Name="CData Salesforce Connection Manager" CreationName = "CDATA_SALESFORCE" ObjectData = "<SalesforceConnectionManager> <Property Name="ConnectionString"> <#=salesforceConnectionString#></Property> </SalesforceConnectionManager>" />
  <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/>
</Connections>
  1. 配置好 Connections 元素后,就可以构建复制包了。在包中,为要复制的每个表创建一个 ExecuteSQL 任务和一个 Dataflow 任务。要构建每个任务集,请在控制片段中使用 while 循环来迭代实体(表)名称
int entityCounter = 0; while(entityCounter < allEntityNames.Count){
var tableName = allEntityNames[entityCounter].ToString();
DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);
  • ExecuteSQL 任务

    在 ExecuteSQL 任务中,执行 SQL 查询以删除任何与我们的 Salesforce 实体(表)同名的现有表,并根据使用 CData SSIS Component 发现的元数据创建一个新表。要动态创建查询,请使用 Helper.GetDeleteAndCreateStatement() 帮助函数。

  • Dataflow 任务

    在 Dataflow 任务中,使用 CustomComponent 作为源组件,使用 OleDbDestination 作为目标。

    • CustomComponent 元素

      CustomComponent 元素使用 CData SSIS Source 组件检索 Salesforce 数据。首先配置组件以与 CData 组件一起使用。

      <CustomComponent Name="CData Salesforce Source" ComponentTypeName="CData.SSIS.Salesforce.SalesforceSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true">
      ...
      </CustomComponent>

      DataflowOverrides 和 OutputPaths 元素

      配置连接后,下一步是将 Columns 元素添加到 DataflowOverrides 元素的 OutputPath 子元素中。为此,请调用 Helper.GetColumnDefs() 帮助函数。使用相同的 Helper 类将列添加到各种 OutputPaths 元素的 OutputColumnsExternalColumns 子元素中。创建的定义提供了有关 SSIS 组件的输入、输出和错误信息。

      <DataflowOverrides>
        <OutputPath OutputPathName="CData Salesforce Source Output">
          <Columns>
      <#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #>
          </Columns>
        </OutputPath>
      </DataflowOverrides>
      ...
      <OutputPaths>
        <OutputPath Name="CData Salesforce Source Output">
          <OutputColumns>
      <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #>
          </OutputColumns>
          <ExternalColumns>
      <#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #>
          </ExternalColumns>
        </OutputPath>
        <OutputPath Name="CData Salesforce Source Error Output" IsErrorOutput="true">
          <OutputColumns>
      <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #      
          </OutputColumns>
        </OutputPath>
      </OutputPaths>

      CustomProperties 元素

      自定义组件通常有自己的自定义配置界面,其中包含一系列必需的 CustomProperties

      <CustomProperties>
        <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty>
        <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.Salesforce.AccessModeToStringConverter">0</CustomProperty>
        <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty>
        <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty>
      </CustomProperties>

      Connections 元素

      添加到 CustomComponent 元素的最后一个元素是 Connections 元素,将先前定义的连接附加到任务

      <Connections>
        <Connection Name="Salesforce 2018 Connection" ConnectionName="CData Salesforce Connection Manager" />
      </Connections>
    • OleDbDestination 元素

      Dataflow 任务的最后一部分是 OleDbDestination 元素。将先前定义的 OleDbConnection 附加到该元素,设置 InputPathExternalTableOutput

      <OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false">
        <InputPath OutputPathName="CData Salesforce Source.CData Salesforce Source Output" />
        <ExternalTableOutput Table="[<#=tableName#>]" />
      </OleDbDestination>
  1. 使用控制片段来递增用于迭代实体(表)名称集合的计数器。在 Tasks 元素内,在 Dataflow 元素结束后执行此操作
...
          </Dataflow>          
<# entityCounter++;}#>
        </Tasks>
    </Package>
  </Packages>
</Biml>

构建 SSIS 项目

编写完 Biml 文件后,在 Server Explorer 中右键单击 Biml 文件,然后选择“生成 SSIS 包”。此时,Visual Studio 和 BimlExpress 将 Biml 文件转换为 SSIS 包,即可运行。

运行包,开始将 Salesforce 数据复制到 SQL Server 数据库(或您选择的任何其他目标)。

完整的 Biml 文件

<#@ template language="C#" hostspecific="true"#>
<#@ import namespace="System.Data"#>
<#@ import namespace="System.IO"#>
<#@ import namespace="System.Collections"#>
<#@ import namespace="System.Data.CData.Salesforce"#>
<#@ assembly name="C:\Program Files\CData\CData SSIS Components for Salesforce 2018\lib\CData.SSIS2017.Salesforce.dll"#>
<#
var salesforceConnectionString = ""User=username;Password=password;SecurityToken=Your_Security_Token;";
var replicationServer = "JDG";
var replicationCatalog = "BIML";
var replicationUserID = "sqltest";
var replicationPassword = "sqltest";
List<string> allEntityNames = new List<string>();
Hashtable entitySchema = new Hashtable();
using (SalesforceConnection connection = new SalesforceConnection(salesforceConnectionString)) {
    connection.Open();
    var entities = connection.GetSchema("Tables").Rows;
    foreach (DataRow entity in entities)
    {
        allEntityNames.Add(entity["TABLE_NAME"].ToString());
    }
    foreach (string entity in allEntityNames){
        var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;
        entitySchema.Add(entity,columns);
    }
}#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
  <Connections>
    <CustomSsisConnection Name="CData Salesforce Connection Manager" CreationName="CDATA_SALESFORCE" ObjectData="<SalesforceConnectionManager><Property Name="ConnectionString"><#=salesforceConnectionString#></Property></SalesforceConnectionManager>"/>
    <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/>
  </Connections>
  <Packages>
    <Package Name="Replicate Salesforce Package" Language="None" ConstraintMode="LinearOnCompletion" ProtectionLevel="EncryptSensitiveWithUserKey">
      <Tasks>
<# int entityCounter = 0; while(entityCounter < allEntityNames.Count){
   var tableName = allEntityNames[entityCounter].ToString();
   if (tableName.Equals("IdpEventLog")) break;
   DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);#>
        <ExecuteSQL Name="Create <#=tableName#> Replication Table" ConnectionName="Destination">
          <DirectInput>
<#=HelperClass.GetDeleteAndCreateStatement(tableName,columns)#>
          </DirectInput>
        </ExecuteSQL>
        <Dataflow Name="Replicate <#=tableName#>">
          <Transformations>
            <CustomComponent Name="CData Salesforce Source" ComponentTypeName="CData.SSIS.Salesforce.SalesforceSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true">
              <DataflowOverrides>
                <OutputPath OutputPathName="CData Salesforce Source Output">
                  <Columns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #>
                  </Columns>
                </OutputPath>
              </DataflowOverrides>
              <CustomProperties>
                <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty>
                <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.Salesforce.AccessModeToStringConverter">0</CustomProperty>
                <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty>
                <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty>
              </CustomProperties>
              <OutputPaths>
                <OutputPath Name="CData Salesforce Source Output">
                  <OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #>
                  </OutputColumns>
                  <ExternalColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #>
                  </ExternalColumns>
                </OutputPath>
                <OutputPath Name="CData Salesforce Source Error Output" IsErrorOutput="true">
                  <OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #>                     
                  </OutputColumns>
                </OutputPath>
              </OutputPaths>
              <Connections>
                <Connection Name="Salesforce 2018 Connection" ConnectionName="CData Salesforce Connection Manager" />
              </Connections>
            </CustomComponent>
            <OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false">
              <InputPath OutputPathName="CData Salesforce Source.CData Salesforce Source Output" />
              <ExternalTableOutput Table="[<#=tableName#>]" />
            </OleDbDestination>
          </Transformations>
        </Dataflow>          
<# entityCounter++;}#>
      </Tasks>
    </Package>
  </Packages>
</Biml>
<#+
public static class HelperClass {
    
    public static int OUTPUT_WITH_ERROR = 0;
    public static int EXTERNAL = 1;
    public static int OUTPUT = 2;
    public static int DATAOVERRIDE_COLUMN = 4;
    
    public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {
        var dropAndCreateStatement = 
            "IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{0}]') AND type IN (N'U'))\r\n" + 
            "DROP TABLE [{0}];\r\n" + 
            "CREATE TABLE [{0}]\r\n" + 
            "(\r\n" + 
            "{1}\r\n" + 
            ")\r\n" + 
            "ON \"default\";";
        string columnDefs = "";
        foreach (DataRow column in columns){
            string columnDef = "    [{0}] {1}";
            string dataType = column["DATA_TYPE"].ToString();
            if (dataType.ToLower().StartsWith("bool")) {
                dataType = "bit";
            } else if (dataType.ToLower().Equals("real")) {
                dataType = "float";
            } else if (dataType.ToLower().Contains("varchar")) {
                var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];
                dataType = "nvarchar(" + ((int)columnLength > 4000 ? "MAX" : columnLength) + ")";         
            } 
            columnDefs += String.Format(columnDef,column["COLUMN_NAME"],dataType) + ",\r\n";
            
        }
        columnDefs = columnDefs.Remove(columnDefs.LastIndexOf(",\r\n"),",\r\n".Length);
        return String.Format(dropAndCreateStatement,tableName,columnDefs);
    }
    
    public static string GetColumnDefs(DataRowCollection columns, int columnType){
        var columnDefTemplate = "";
        var columnElements = "";
        
        if (columnType == DATAOVERRIDE_COLUMN) {
            columnDefTemplate = "                      <Column ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" ColumnName=\"{0}\" />\r\n";
            foreach(DataRow column in columns) {
                var columnName = column["COLUMN_NAME"];
                columnElements += String.Format(columnDefTemplate,columnName);
            }
            return columnElements;
        } 
        if (columnType == OUTPUT_WITH_ERROR)
            columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} ExternalMetadataColumnName=\"{0}\" ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" />\r\n";
        else if (columnType == EXTERNAL)
            columnDefTemplate = "                      <ExternalColumn Name=\"{0}\" {1} />\r\n";
        else if (columnType == OUTPUT)
            columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} />\r\n";
        
        foreach(DataRow column in columns){ 
            var columnName = column["COLUMN_NAME"];
            var dataTypeRaw = column["DATA_TYPE"].ToString().ToLower();
            var typeAndRelatedInfo = "";
            if (dataTypeRaw.Equals("bool")) {
                typeAndRelatedInfo = "DataType=\"Boolean\"";
            } else if (dataTypeRaw.Equals("date")) {
                typeAndRelatedInfo = "DataType=\"Date\" SsisDataTypeOverride=\"DT_DBDATE\"";
            } else if (dataTypeRaw.Equals("datetime")) {
                typeAndRelatedInfo = "DataType=\"DateTime\"";
            } else if (dataTypeRaw.Equals("real")) {
                typeAndRelatedInfo = ((int)column["NumericPrecision"] > 0 ? "Precision=\"18\" " : " ") + ((int)column["NumericScale"] > 0 ? "Scale=\"15\" " : " ") + "DataType=\"Decimal\"";
            } else if (dataTypeRaw.Equals("varchar")) {
                var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];
                if ((int)columnLength > 4000) {
                    typeAndRelatedInfo = "DataType=\"String\"";
                } else {
                    typeAndRelatedInfo = "Length=\"" + columnLength + "\" DataType=\"String\" CodePage=\"1252\"";
                }
            }
            columnElements += String.Format(columnDefTemplate,columnName,typeAndRelatedInfo);
        }
        return columnElements;
    }
}
#>
© . All rights reserved.