65.9K
CodeProject 正在变化。 阅读更多。
Home

在 Biztalk 2006 中拆分大型消息并扩展平面文件管道反汇编器组件

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.62/5 (7投票s)

2007年1月13日

CPOL

5分钟阅读

viewsIcon

96193

本文旨在解释自定义平面文件反汇编器的基础知识

引言

扩展组件或 API 在 IT 行业并非新鲜事;早在面向对象编程模型中就已经引入了。根据您的需求扩展组件的方法被称为重写。Java、.NET 和许多其他现代编程语言都支持这种方法。

同样,BizTalk 2004/2006 也支持扩展管道组件或汇编器,为现有功能赋予新的含义。与 .NET Framework 或 Java 虚拟机基类库不同,BizTalk 2006 提供了非常好的所有支持类的文档,而且还有非常好的博客和新闻组为您提供更好的信息。

必备组件

本文假定您了解自定义管道的创建和部署。

拆分(Debatching)

集成项目通常需要在接收管道或编排中将大型消息拆分成小消息块。这个过程被称为拆分(De-batching)。使用 BizTalk 拆分消息可以通过以下多种方式实现:

(在许多技术博客中都有介绍)

  • 信封消息拆分:通常,在接收管道中配置两个架构,并从适配器或编排(仅限 BizTalk 2006)调用。
  • 编排拆分:编排被设计为循环处理消息直到结束,并通过调用自定义 .NET 组件或在编排表达式编辑器中使用 xpath 函数来拆分。
  • 自定义管道拆分:将重写管道 Disassembler 组件的Disassemble()GetNext()方法来拆分输入消息。

场景

假设您为一家医疗保险公司工作,您将从全国各地的第三方数据供应商那里收到来自不同医院的所有已完成“主要健康检查”活动的参与者名单。您计划每周收到一次参与者名单,这是一个大约 10 MB 的管道分隔的大型平面文件。

您的公司提供了一个 .NET Web 服务,该服务接受参与者详细信息数组作为输入,并帮助您将详细信息存储到全局数据库中。您的要求是使用 Biztalk 2006 轮询平面文件,将其拆分成 50 条参与者详细信息记录作为一个批次,然后将其传递给内部网 Web 服务。听起来很简单?

扩展 FF Disassembler 组件

可以通过继承“FFDasmComp”类来扩展自定义 Flatfile Disassembler 类,该类位于“Microsoft.BizTalk.Pipeline.Components”命名空间下。要访问“Microsoft.BizTalk.Pipeline.Components”命名空间,您必须在项目下添加以下 DLL 作为引用。

Microsoft BizTalk Server 2006\Pipeline Components\Microsoft.BizTalk.Pipeline.Components.dll

FFDasmComp 类公开以下方法和属性

核心成员的简要描述如下:

InitNew

  • 初始化当前的 FFDasm 类,此方法仅调用一次。
  • 不接受参数
  • 不返回值

Disassemble

  • 将给定的消息拆解成 Biztalk 可理解的 XML 消息,并将其存储到消息集中。此方法将只为给定的消息调用一次。
  • 接受 IPipelineContext IBaseMessage 作为输入参数,其中 IPipelineContext 是执行管道的上下文,IBaseMessage 在本例中是 Flatfile
  • 不返回值

获取下一个

  • 从此方法调用中存储的消息集中返回一条消息。此方法将一直调用,直到返回 Null
  • 接受 IPipelineContext 作为输入参数,其中 IPipelineContext 是执行管道的上下文。
  • 从消息集中返回 IBaseMessage

Load (加载)

  • 从属性包加载键值对。

保存

  • 将键值对存储到属性包中以供将来执行。

DocumentSpecName

  • 获取或设置 Disassemble 方法的架构名称,用于将给定的输入文件解析为 BizTalk 可理解的格式。

开发组件

现在让我们深入开发组件。

步骤 1

创建您自己的类,并按如下方式继承类和接口:

[ComponentCategory(CategoryTypes.CATID_PipelineComponent)] 
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)] 
[System.Runtime.InteropServices.Guid("57D51828-C973-4a62-A534-6DB3CB3CB251")] 
public class LargeFlatfileSplitter : 
FFDasmComp,
IBaseComponent, 
Microsoft.BizTalk.Component.Interop.IDisassemblerComponent, 
Microsoft.BizTalk.Component.Interop.IPersistPropertyBag 
{ 
publicLargeFlatfileSplitter() 
{ 

} 
……
}

第二步

照常为您的组件提供有意义的描述、名称和版本。

步骤 3

使用 .NET GUIDGen 工具创建您的 GUID,将其复制到剪贴板,然后粘贴到您的 GetClassID 方法中。

#regionIPersistPropertyBag Members 
void IPersistPropertyBag.GetClassID(out Guid classID) 
{ 
classID = new Guid("57D51828-C973-4a62-A534-6DB3CB3CB251"); 
} 
void IPersistPropertyBag.InitNew() 
{ 
base.InitNew(); 
} 
void IPersistPropertyBag.Load(IPropertyBag propertyBag, int errorLog) 
{ 
base.Load(propertyBag, errorLog); 
} 
void IPersistPropertyBag.Save
	(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties) 
{ 
base.Save(propertyBag, clearDirty, saveAllProperties); 
} 
#endregion 

请注意,InitNewLoad Save 方法分别调用基类(FFDasmComp)的方法。

步骤 4

public new void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg) 
{ 
try 
{ 
base.DocumentSpecName = this.DocumentSpecName;
base.Disassemble(pContext, pInMsg); 
} 
catch (Exception ex) 
{ 
System.Diagnostics.EventLog.WriteEntry("Disassemble:Error", ex.Message); 
} 
} 

以上方法会将输入的平面文件拆解成 XML 消息。这里将使用您在属性浏览器中指定的平面文件架构(DocumentSpecName)。

步骤 4

public new IBaseMessage GetNext(IPipelineContext pContext) 
{ 
//System.Diagnostics.EventLog.WriteEntry("GetNext:Called", currentMessage.ToString()); 
try 
{ 
if (stopCollectingMessages == false) 
{ 
IBaseMessage ibmTemp = base.GetNext(pContext); 
GetSplittedMessages(ibmTemp, pContext); 
stopCollectingMessages = true; 
if (0 == outboundMessages.Count) 
return null; 
} 
} 
catch (Exception ex) 
{ 
System.Diagnostics.EventLog.WriteEntry("GetNext:Error", ex.Message); 
} 
if (currentMessage == outboundMessages.Count) 
return null; 
// Return the current collected message 
return (IBaseMessage)outboundMessages[currentMessage++]; 
}

上面提到的方法是我们这里的核心方法。它将您的拆解后的消息流加载到 XPathDocument 中(如果消息大于 10 MB,性能会下降,稍后我们将讨论这一点)。

我们使用 Xpath 表达式拆分您的 XPathDocument,循环遍历您感兴趣的每个子节点 50 次,并将它们附加到新的 XmlDocument 中。

这样,您就在 XmlDocument 中收集了 50 个子节点,然后创建一个新的 IBaseMessage ,并将 XmlDocument 保存到 IBaseMessage 数据流中,同时将其存储到集合列表中。

执行这些步骤,直到您完成消息收集并将 stopCollectingMessage 标志设置为 true ,这样当 Biztalk 下次调用您的 GetNext 函数时,您就可以从 CollectionList 返回 IBaseMessage

编译项目,并将 DLL 复制到您的 PipelineComponents 目录并进行 GAC 注册。

就这样。您就可以开始使用了。

性能

此组件仅适用于消息大小小于 15 MB 的情况。由于本文仅旨在解释在管道中拆分消息的另一种方法,因此您可以使用 SeekableReadOnlyStream Microsoft.BizTalk.XPathReader 类轻松地增强此组件,以处理极大型消息(100 MB 以上)。

SeekableReadOnlyStream System.IO.Stream 的包装类,当您不需要修改输入流时,它提供了一种更快更好的访问 IBaseMessage 流的方式。XPathReader 类无法直接添加到组件的引用中。您需要从命令提示符浏览您的 GAC 目录,获取“Microsoft.Biztalk.XpathReader.dll”的路径,并在您的组件中引用它。最有可能的路径是“C:\WINDOWS\assembly\GAC_MSIL\Microsoft.BizTalk.XPathReader\3.0.1.0__31bf3856ad364e35”目录。

private void GetSplittedMessages(IBaseMessage ibmParam, IPipelineContext pCxt) 
{ 
System.Xml.XmlDocument xDoc; 
string temp = ""; 
if (ibmParam != null) 
{ 
XPathDocument xp = new XPathDocument(ibmParam.BodyPart.Data); 
XPathNodeIterator xNI = xp.CreateNavigator().Select
	("/*[local-name()='CustomersList' and namespace-uri()=
	'http://BiztalkArticle.Customers']/*[local-name()='Customers' 
	and namespace-uri()='']"); 
bool blnMoveNext = true; 
while (blnMoveNext) 
{ 
xDoc = new System.Xml.XmlDocument(); 
System.Xml.XmlElement xParent = xDoc.CreateElement
	("CustomersList", http://BiztalkArticle.Customers ); 
for (int i = 0; i < this.recordsPerMsg; i++) 
{ 
blnMoveNext = xNI.MoveNext(); 
if (blnMoveNext == false) break; 
XPathNavigator xn = xNI.Current; 
if (xn != null) 
{ 
temp = xn.InnerXml; 
System.Xml.XmlElement xe = xDoc.CreateElement("Customers"); 
xe.InnerXml = temp; 
xParent.AppendChild(xe); 
} 
} 
xDoc.AppendChild(xParent); 
//System.Diagnostics.EventLog.WriteEntry("GetNext:Xml", xDoc.OuterXml); 
IBaseMessage msg = null; 
msg = pCxt.GetMessageFactory().CreateMessage(); 
//System.Diagnostics.EventLog.WriteEntry("GetNext:After Msg", ""); 
msg.Context = ibmParam.Context; 
IBaseMessagePart msgPart = pCxt.GetMessageFactory().CreateMessagePart(); 
System.IO.MemoryStream memStrm = new MemoryStream(); 
xDoc.Save(memStrm); 
memStrm.Position = 0; 
memStrm.Seek(0, System.IO.SeekOrigin.Begin);
msgPart.Data = memStrm;
msg.AddPart(ibmParam.BodyPartName, msgPart, true); 
outboundMessages.Add(msg); 
pCxt.ResourceTracker.AddResource(memStrm);
} 
} 
} 

#regionIBaseComponent Members 
[Browsable(false)] 
public newstring Description 
{ 
get { return "Biztalk Large flatfile splitting"; } 
} 

[Browsable(false)] 
public new string Name 
{ 
get { return "Large Flat File Disassembler"; } 
} 

[Browsable(false)] 
public new string Version 
{ 
get { return "1.0"; } 
} 
#endregion

历史

  • 2007 年 1 月 13 日:首次发布
© . All rights reserved.