目录

入门

构建Biml文件

C#代码

类块

Biml脚本

构建SSIS项目

完整的Biml文件


使用SQL Server作为关键业务数据的备份提供了一个基本的安全网,可以防止丢失,并使业务用户能够更轻松地将这些数据与报告、分析等功能连接起来。Biml是一种XML方言,可用于创建Microsoft SQL Server BI对象,如SSIS包。将自定义SSIS组件与Biml配对使您能够轻松构建SSIS包,从而可以访问标准SSIS连接之外的数据,如Salesforce。主要优势包括:

  • 内置元数据发现——许多自定义SSIS组件都像使用SQL Server一样公开元数据,甚至为无模式数据源动态生成模式
  • 动态SSIS任务生成——使用Biml中的代码块通过迭代已发现的元数据来构建SSIS任务
  • 读取和写入外部源——本机源和目标组件使外部数据看起来就像数据库

本文演示如何将Biml与自定义SSIS组件一起使用,以动态构建SSIS任务(每个Salesforce实体一个),以将Salesforce数据复制到Microsoft SQL Server数据库。我们一次单步执行Biml文件,但在文章末尾包含了完整的Biml文件。虽然本文使用CData SSIS组件,但任务生成的原则适用于任何自定义SSIS组件。

入门

要在Visual Studio中的SSIS项目中使用Biml,请安装BimlExpress。安装BimlExpress后,打开Visual Studio,创建一个新的Integration Services项目,然后添加一个新的Biml文件。

构建Biml文件

使用Biml,您可以编写脚本来动态生成SSIS项目、包和任务。要查看现有项目的Biml文件(并获得有关在项目中使用Biml与任何自定义SSIS任务的见解),只需创建任务,然后右键单击该项目并选择“将SSIS包转换为Biml”。

C#代码

1、使用指令<#@ ..#>导入必要的命名空间和用于Salesforce的CData SSIS组件的程序集。

<#@ template language="C#" hostspecific="true"#>
<#@ import namespace="System.Data"#>
<#@ import namespace="System.IO"#>
<#@ import namespace="System.Collections"#>
<#@ import namespace="System.Data.CData.Salesforce"#>
<#@ assembly name="C:\Program Files\CData\CData SSIS Components for Salesforce 2018\lib\CData.SSIS2017.Salesforce.dll"#>

2、在新的控件块<# ... #>中,编写代码以检索外部数据源的元数据。使用Biml时,通常的做法是使用存储在数据库中的元数据。对于CData组件,您只需编写ADO.NET代码即可动态检索元数据。首先,为将在整个Biml脚本中使用的值创建变量,包括Salesforce的连接字符串和存储Salesforce元数据的结构。

var salesforceConnectionString = "User=username;Password=password;SecurityToken=Your_Security_Token;";
var replicationServer = "SERVER";
var replicationCatalog = "CATALOG";
var replicationUserID = "sqluser";
var replicationPassword = "sqlpassword";
List<string> allEntityNames = new List<string>();
Hashtable entitySchema = new Hashtable();

3、在用于定义变量的同一控件块中,使用ADO.NET代码以编程方式查询Salesforce实体(表)和字段(列)。

using (SalesforceConnection connection = new SalesforceConnection(salesforceConnectionString)) {connection.Open();var entities = connection.GetSchema("Tables").Rows;foreach (DataRow entity in entities){allEntityNames.Add(entity["TABLE_NAME"].ToString());}foreach (string entity in allEntityNames){var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;entitySchema.Add(entity,columns);}
}

类块

在我们创建复制任务的Biml脚本中,有几个地方动态创建重复的XML元素(主要用于SSIS任务中的列)。而不是重复代码,添加一个类块<#+ ... #>并使用方法创建一个帮助器类来合并重复的代码(文章末尾的完整代码)。

1、添加公共静态变量以确定要创建的XML元素类型。

public static int OUTPUT_WITH_ERROR = 0;
public static int EXTERNAL = 1;
public static int OUTPUT = 2;
public static int DATAOVERRIDE_COLUMN = 4;

2、添加一个公共方法来构建SQL语句,以便在ExecuteSQL任务中使用,该任务用于删除现有表并为复制数据创建新表。

// Dynamically builds a DROP TABLE and CREATE statement
// for each entity (table) in Salesforce using the table name and metadata.
public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {...
}

3、添加公共方法以构建基于列的XML元素的集合。

// Dynamically build various column-based XML elements
// for each entity (table) in Salesforce based on the column
// metadata and the parent element
public static string GetColumnDefs(DataRowCollection columns, int columnType){...
}

Biml脚本

现在您已拥有表元数据和Helper类来减少重复代码,请编写Biml脚本以动态创建复制包。

1、首先为自定义SSIS任务添加CustomSsisConnection元素。请注意,ObjectData属性必须是XML编码的。典型的连接字符串类似于以下内容(请注意ConnectionString属性的salesforceConnectionString变量的使用:

<SalesforceConnectionManager><Property Name="ConnectionString"><#=salesforceConnectionString#></Property>
</SalesforceConnectionManager>

配置与自定义SSIS任务的连接后,配置与复制数据库的连接。完成的Connections元素如下所示(注意使用文本块<#= ... #>来为连接字符串值添加变量):

<Connections><CustomSsisConnection Name="CData Salesforce Connection Manager" CreationName = "CDATA_SALESFORCE" ObjectData = "<SalesforceConnectionManager> <Property Name="ConnectionString"> <#=salesforceConnectionString#></Property> </SalesforceConnectionManager>" /><Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/>
</Connections>

2、随着Connections元素被配置,你就可以建立我们的复制软件包。在包中,为要复制的每个表创建一个ExecuteSQL任务和一个Dataflow任务。要构建每组任务,请在控件块中使用while循环来遍历实体(表)名称:

int entityCounter = 0; while(entityCounter < allEntityNames.Count){
var tableName = allEntityNames[entityCounter].ToString();
DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);
  • ExecuteSQL任务

在ExecuteSQL任务中,执行SQL查询以删除与Salesforce实体(表)同名的任何现有表,并根据使用CData SSIS组件发现的元数据创建新表。要动态创建查询,请使用Helper.GetDeleteAndCreateStatement()帮助程序函数。

  • Dataflow任务

在Dataflow任务中,使用CustomComponent作为源组件,使用OleDbDestination 作为目标组件。

​​​​​​​a)、CustomComponent元素

CustomComponent元素使用CData SSIS 源组件来检索Salesforce数据。首先配置要与CData组件一起使用的组件。

<CustomComponent Name="CData Salesforce Source" ComponentTypeName="CData.SSIS.Salesforce.SalesforceSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true">
...
</CustomComponent>

                  DataflowOverridesOutputPaths元素

配置连接后的下一步是将Columns元素添加到DataflowOverrides元素的OutputPath子元素中。为此,请调用Helper.GetColumnDefs()帮助函数。使用相同的Helper类像不同的OutputPaths元素的OutputColumns和ExternalColumns子元素添加列。创建的定义提供有关SSIS组件的输入、输出和错误信息的信息。

<DataflowOverrides><OutputPath OutputPathName="CData Salesforce Source Output"><Columns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #></Columns></OutputPath>
</DataflowOverrides>
...
<OutputPaths><OutputPath Name="CData Salesforce Source Output"><OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #></OutputColumns><ExternalColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #></ExternalColumns></OutputPath><OutputPath Name="CData Salesforce Source Error Output" IsErrorOutput="true"><OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #      </OutputColumns></OutputPath>
</OutputPaths>

                       CustomProperties元素

自定义组件通常有自己的自定义配置界面,需要一系列CustomProperties:

<CustomProperties><CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty><CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.Salesforce.AccessModeToStringConverter">0</CustomProperty><CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty><CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty>
</CustomProperties>

                      Connections 元素

添加到CustomComponent元素的最后一个元素是一个Connections元素,将先前定义的连接附加到任务:

<Connections><Connection Name="Salesforce 2018 Connection" ConnectionName="CData Salesforce Connection Manager" />
</Connections>

​​​​​​​             b)、OleDbDestination元素

Dataflow任务的最后一部分是OleDbDestination元素。将先前定义的OleDbConnection附加到元素,设置InputPath和ExternalTableOutput:

<OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false"><InputPath OutputPathName="CData Salesforce Source.CData Salesforce Source Output" /><ExternalTableOutput Table="[<#=tableName#>]" />
</OleDbDestination>

3、使用控制块来递增用于迭代实体(表)名称集合的计数器。在Tasks元素结束后在Dataflow元素内执行此操作:

...</Dataflow>
<# entityCounter++;}#></Tasks></Package></Packages>
</Biml>

构建SSIS项目

编写Biml文件后,右键单击Server Explorer中的Biml文件,然后选择Generate SSIS Packages。此时,Visual Studio和BimlExpress会将Biml文件转换为SSIS包,准备运行。

运行程序包以开始将Salesforce数据复制到SQL Server数据库(或您选择的任何其他目标)。

完整的Biml文件

<#@ template language="C#" hostspecific="true"#>
<#@ import namespace="System.Data"#>
<#@ import namespace="System.IO"#>
<#@ import namespace="System.Collections"#>
<#@ import namespace="System.Data.CData.Salesforce"#>
<#@ assembly name="C:\Program Files\CData\CData SSIS Components for Salesforce 2018\lib\CData.SSIS2017.Salesforce.dll"#>
<#
var salesforceConnectionString = ""User=username;Password=password;SecurityToken=Your_Security_Token;";
var replicationServer = "JDG";
var replicationCatalog = "BIML";
var replicationUserID = "sqltest";
var replicationPassword = "sqltest";
List<string> allEntityNames = new List<string>();
Hashtable entitySchema = new Hashtable();
using (SalesforceConnection connection = new SalesforceConnection(salesforceConnectionString)) {connection.Open();var entities = connection.GetSchema("Tables").Rows;foreach (DataRow entity in entities){allEntityNames.Add(entity["TABLE_NAME"].ToString());}foreach (string entity in allEntityNames){var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;entitySchema.Add(entity,columns);}
}#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd"><Connections><CustomSsisConnection Name="CData Salesforce Connection Manager" CreationName="CDATA_SALESFORCE" ObjectData="<SalesforceConnectionManager><Property Name="ConnectionString"><#=salesforceConnectionString#></Property></SalesforceConnectionManager>"/><Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/></Connections><Packages><Package Name="Replicate Salesforce Package" Language="None" ConstraintMode="LinearOnCompletion" ProtectionLevel="EncryptSensitiveWithUserKey"><Tasks>
<# int entityCounter = 0; while(entityCounter < allEntityNames.Count){var tableName = allEntityNames[entityCounter].ToString();if (tableName.Equals("IdpEventLog")) break;DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);#><ExecuteSQL Name="Create <#=tableName#> Replication Table" ConnectionName="Destination"><DirectInput>
<#=HelperClass.GetDeleteAndCreateStatement(tableName,columns)#></DirectInput></ExecuteSQL><Dataflow Name="Replicate <#=tableName#>"><Transformations><CustomComponent Name="CData Salesforce Source" ComponentTypeName="CData.SSIS.Salesforce.SalesforceSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true"><DataflowOverrides><OutputPath OutputPathName="CData Salesforce Source Output"><Columns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #></Columns></OutputPath></DataflowOverrides><CustomProperties><CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty><CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.Salesforce.AccessModeToStringConverter">0</CustomProperty><CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty><CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty></CustomProperties><OutputPaths><OutputPath Name="CData Salesforce Source Output"><OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #></OutputColumns><ExternalColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #></ExternalColumns></OutputPath><OutputPath Name="CData Salesforce Source Error Output" IsErrorOutput="true"><OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #>                     </OutputColumns></OutputPath></OutputPaths><Connections><Connection Name="Salesforce 2018 Connection" ConnectionName="CData Salesforce Connection Manager" /></Connections></CustomComponent><OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false"><InputPath OutputPathName="CData Salesforce Source.CData Salesforce Source Output" /><ExternalTableOutput Table="[<#=tableName#>]" /></OleDbDestination></Transformations></Dataflow>
<# entityCounter++;}#></Tasks></Package></Packages>
</Biml>
<#+
public static class HelperClass {public static int OUTPUT_WITH_ERROR = 0;public static int EXTERNAL = 1;public static int OUTPUT = 2;public static int DATAOVERRIDE_COLUMN = 4;public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {var dropAndCreateStatement = "IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{0}]') AND type IN (N'U'))\r\n" + "DROP TABLE [{0}];\r\n" + "CREATE TABLE [{0}]\r\n" + "(\r\n" + "{1}\r\n" + ")\r\n" + "ON \"default\";";string columnDefs = "";foreach (DataRow column in columns){string columnDef = "    [{0}] {1}";string dataType = column["DATA_TYPE"].ToString();if (dataType.ToLower().StartsWith("bool")) {dataType = "bit";} else if (dataType.ToLower().Equals("real")) {dataType = "float";} else if (dataType.ToLower().Contains("varchar")) {var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];dataType = "nvarchar(" + ((int)columnLength > 4000 ? "MAX" : columnLength) + ")";         } columnDefs += String.Format(columnDef,column["COLUMN_NAME"],dataType) + ",\r\n";}columnDefs = columnDefs.Remove(columnDefs.LastIndexOf(",\r\n"),",\r\n".Length);return String.Format(dropAndCreateStatement,tableName,columnDefs);}public static string GetColumnDefs(DataRowCollection columns, int columnType){var columnDefTemplate = "";var columnElements = "";if (columnType == DATAOVERRIDE_COLUMN) {columnDefTemplate = "                      <Column ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" ColumnName=\"{0}\" />\r\n";foreach(DataRow column in columns) {var columnName = column["COLUMN_NAME"];columnElements += String.Format(columnDefTemplate,columnName);}return columnElements;} if (columnType == OUTPUT_WITH_ERROR)columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} ExternalMetadataColumnName=\"{0}\" ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" />\r\n";else if (columnType == EXTERNAL)columnDefTemplate = "                      <ExternalColumn Name=\"{0}\" {1} />\r\n";else if (columnType == OUTPUT)columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} />\r\n";foreach(DataRow column in columns){ var columnName = column["COLUMN_NAME"];var dataTypeRaw = column["DATA_TYPE"].ToString().ToLower();var typeAndRelatedInfo = "";if (dataTypeRaw.Equals("bool")) {typeAndRelatedInfo = "DataType=\"Boolean\"";} else if (dataTypeRaw.Equals("date")) {typeAndRelatedInfo = "DataType=\"Date\" SsisDataTypeOverride=\"DT_DBDATE\"";} else if (dataTypeRaw.Equals("datetime")) {typeAndRelatedInfo = "DataType=\"DateTime\"";} else if (dataTypeRaw.Equals("real")) {typeAndRelatedInfo = ((int)column["NumericPrecision"] > 0 ? "Precision=\"18\" " : " ") + ((int)column["NumericScale"] > 0 ? "Scale=\"15\" " : " ") + "DataType=\"Decimal\"";} else if (dataTypeRaw.Equals("varchar")) {var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];if ((int)columnLength > 4000) {typeAndRelatedInfo = "DataType=\"String\"";} else {typeAndRelatedInfo = "Length=\"" + columnLength + "\" DataType=\"String\" CodePage=\"1252\"";}}columnElements += String.Format(columnDefTemplate,columnName,typeAndRelatedInfo);}return columnElements;}
}
#>

原文地址:https://www.codeproject.com/Articles/5131984/Automated-Salesforce-Data-Replication-with-SQL-SSI

使用SQL SSIS和BIML自动化Salesforce数据复制相关推荐

  1. SQL 将一张表的数据复制到另一张表

    destTable 需复制的表 srcTable 来源表 最基础的复制 INSERT INTO destTable SELECT * FROM srcTable 如果两个表的字段不对应,select后 ...

  2. VS用SSIS实现SQL Server数据库与Excel表格数据的相互导入

    VS用SSIS实现SQL Server数据库与Excel表格数据的相互导入 打开Visual Studio 2019,新建Integration Services项目 拖一个数据流任务到控制流中 切换 ...

  3. ssis 角本组件更新数据_使用SSIS脚本组件作为数据源

    ssis 角本组件更新数据 介绍 (Introduction) SSIS Script component is one data transformation tasks in SQL Server ...

  4. SQL截断增强功能:SQL Server 2019中的静默数据截断

    In this article, we'll take a look into SQL truncate improvement in SQL Server 2019. 在本文中,我们将研究SQL S ...

  5. 《PowerShell V3——SQL Server 2012数据库自动化运维权威指南》——1.2 在你开始之前:使用SQL Server和PowerShell工作...

    本节书摘来自异步社区出版社<PowerShell V3-SQL Server 2012数据库自动化运维权威指南>一书中的第1章,第1.2节,作者:[加拿大]Donabel Santos,更 ...

  6. ExcelToDatabase:批量导入Excel文件到MySQL/Oracle/SQL Server数据库的自动化工具

    ExcelToDatabase:批量导入Excel到MySQL/Oracle/SQL Server数据库的自动化工具 简介 ExcelToDatabase 是一个可以批量导入excel到mysql/o ...

  7. SQL on Hadoop在快手大数据平台的实践与优化 | 分享实录

    快手大数据架构工程师钟靓 本文是根据快手大数据架构工程师钟靓于 5月18-19日在A2M人工智能与机器学习创新峰会<SQL on Hadoop在快手大数据平台的实践与优化>演讲中的分享内容 ...

  8. oracle判断非空并拼接,oracle sql 判断字段非空,数据不重复,插入多跳数据

     oracle sql 判断字段非空,数据不重复 select distinct(mobile) from wx_user_mobile where active_time is not null ...

  9. 一次 SQL 查询优化原理分析(900W+ 数据,从 17s 到 300ms)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 来源:Muscleape jianshu.com/p/0768eb ...

最新文章

  1. 基于java的IO流的文件读取系统
  2. 剑指offer-调整数组顺序使奇数位于偶数前面
  3. rtems 4.11 RTC驱动 (arm, beagle)
  4. raid5 合适 多少块硬盘_raid1 raid2 raid5 raid6 raid10如何选择使用?各需要几块硬盘?...
  5. android 继承现有控件,Android继承现有控件拓展实现自定义控件textView
  6. 软件测试基础方法总结
  7. “能耗大户”数据中心供配电系统
  8. 海康SDK设备信息NET_DVR_GET_DEVICECFG解析
  9. sql插入数据的方式
  10. 【MySQL】关系型数据库基本知识点
  11. BYD Mes系统接入示例图源码
  12. 线性代数 | (3) 行列式
  13. python 步数_用python如何修改微信和支付宝每天走路的步数
  14. 单模连接器损耗与影响因素
  15. Python—异常处理
  16. 计算机专业前沿算法,CNCC2018 | 研究经典计算机算法已经过时了吗?
  17. nginxssl证书配置
  18. 【AI视野·今日CV 计算机视觉论文速览 第247期】Fri, 22 Apr 2022
  19. 聪明好用的学习助手,帮孩子提升专注力,有道智能学习灯上手
  20. 一篇文章带你搞定线程池的自定义创建和扩展

热门文章

  1. sublime怎么运行go_使用SublimeGDB调试Go程序
  2. mysql文件导出NULL值处理_Mysql select into outfile NULL值导出的处理方法
  3. 设计灵感|网页建议页面(联系页面)版式案例
  4. 设计师交流社区,让你的原创设计作品展示给世界
  5. 年底了,各大电商大促会员活动反馈万能模板,必备的PSD分层格式
  6. 手机海报模板,收藏就对了!
  7. 最短寻道时间算法c语言,如果北京到上海有千亿条路,寻找最短路径用C语言编程用枚举法没效率,应该用什么算法才能高效解决它?...
  8. js能调用c语言吗,如何从Javascript调用Object-C?
  9. Qt程序缺少dll解决方案
  10. Linux开机启动过程(3):显示模式初始化和进入保护模式