用于工作流的 WS-Transfer 服务






4.93/5 (15投票s)
本文介绍了一种将 WF 工作流连接到 Windows Communication Foundation (WCF) 服务以实现 WS-Transfer 操作契约的设计和实现。
目录
注释
本文使用以下内容编写:
- .NET Framework 3.0 (RTM)
- VS 2005 Windows Workflow Foundation 扩展 (RTM)
引言
Microsoft Windows Workflow Foundation (WF) 是 Windows 平台的一部分。它是下一代 .Net Framework 3.0 的核心组件。WF 能够将应用程序解耦为由活动驱动的业务工作流。工作流可以加载到宿主进程中,例如 Windows NT 服务、IIS、WinForm、控制台等,并通过托管层连接到 WF Runtime 核心。该层提供持久化、跟踪、调度、事务和通信支持。当然,还可以创建扩展——例如自定义托管服务,以在宿主进程和工作流之间基于接口/事件契约添加自定义通信服务。
工作流可以通过通信服务,以事件/委托时尚模式相互本地或远程通信。消息交换模式 (MEP) 基于请求和响应事件,传递应用程序特定的数据契约。本文介绍了一种工作流托管连接(管道)到 Windows Communication Foundation (WCF) 服务以实现 WS-Transfer 操作契约的设计和实现。这是我 WS-Transfer for WCF 文章的扩展,其中 WCF 服务被封装到通用的服务层和物理适配器中,用于处理特定资源。在这种情况下,我们的资源是 WF 的工作流实例。
注意,本项目中包含的工作流仅用于演示和测试目的。
拥有一个 WS-* 驱动的工作流模型层后,我们可以将工作流插入“WS-* 服务总线
”,如下图所示:
WCF 服务(带有可配置的工作流适配器)能够创建逻辑(分布式)模型,将业务逻辑封装到工作流和活动中。注意,上图仅显示了服务。WCF/WF 客户端可以以类似的方式创建,并轻松插入到服务总线中。当然,任何“传统”的 WS-* 服务/客户端也可以通过工作流模型层以透明的方式连接到服务总线。
本文将仅介绍 WS-Transfer 服务,但其概念和实现对于其他服务(如 WS-Eventing、WS-Enum 等)类似且直接。让我们从连接 WCF 和 WF 这两个基础设施的概念开始。为了理解这个模式,我假设您熟悉我的文章 WS-Transfer for WCF,并且至少有一些 Microsoft WCF 和 WF 技术方面的经验(知识)。
概念与设计实现
实现的概念基于使用 Indigo 范例,通过 WS-Transfer 堆栈驱动 WCF 服务层来封装资源(物理)层。在我们的例子中,物理资源由 WF 的工作流实例表示。
下图显示了 WCF 和 WF 之间连接的最高级别。
如您所见,上述模型已解耦为通信层和业务处理层。这两个层是独立的,拥有自己的技术。服务层负责与 WS-* 堆栈(在本例中为 WS-Transfer)发送和接收消息,以及与特定资源进行通信。这是一个通用的服务层,可以通过编程方式或通过配置文件进行管理,具备服务行为扩展的能力。另一个层——资源层,是业务特定的层,由业务工作流模型表示。这两个层都可以根据应用程序的需求,在自己的行为中同步或异步运行。
配置文件的以下部分显示了内存存储工作流操作的服务行为扩展。
<configuration>
<system.serviceModel>
<behaviors>
<serviceBehaviors>
<behavior name="WxfServiceExtension" >
<wsTransferAdapter
name="PublicStorage"
type="RKiss.WSTransfer.Adapters.WFAdapter, WFAdapter"
TransactionScopeRequired="true"
TransactionAutoComplete="true"
Topic="Imaging"
WorkflowTypeCreate=
"WFLibTest.MemoryStorage.WorkflowCreate, WFLibTest"
WorkflowTypeGet="WFLibTest.MemoryStorage.WorkflowGet, WFLibTest"
WorkflowTypePut="WFLibTest.MemoryStorage.WorkflowPut, WFLibTest"
WorkflowTypeDelete=
"WFLibTest.MemoryStorage.WorkflowDelete, WFLibTest" />
</behavior>
</serviceBehaviors>
</behaviors>
<extensions>
<behaviorExtensions>
<add name="wsTransferAdapter"
type="RKiss.WSTransfer.ServiceAdapterBehaviorElement, WSTransfer,
Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"/>
</behaviorExtensions>
</extensions>
</system.serviceModel>
</configuration>
在上例中,每个 WS-Transfer 操作(Create、Get、Put 和 Delete)都已配置为处理与 Topic="Imaging
" 相关的事务性工作流。存储的名称是“PublicStorage
”。管道适配器——WCF 服务和 WF 之间的连接——是在 WFAdapter 程序集中实现的。
用于工作流处理的 WS-Transfer 适配器内置了以下属性:
属性名称 | 默认值 | 注释 |
名称 |
null |
适配器名称 |
type |
null |
适配器类的类型 |
TransactionScopeRequired |
"false " |
需要事务范围 |
TransactionAutoComplete |
"true " |
事务范围完成 |
主题 |
null |
资源的主题 |
WorkflowType |
null |
通用工作流类的类型 |
WorkflowTypeCreate |
null |
用于创建资源的类的工作流类型 |
WorkflowTypeGet |
null |
用于获取资源的类的工作流类型 |
WorkflowTypePut |
null |
用于放置资源的类的工作流类型 |
WorkflowTypeDelete |
null |
用于删除资源的类的工作流类型 |
注意,属性集合在工作流初始化阶段传递给工作流实例。
托管 WF Runtime 服务
WF Runtime 服务可以通过 ServiceHost
扩展机制附加到 WCF 托管进程(每个 AppDomain 只能有一个)。以下代码片段展示了由控制台程序托管的 WSTransferService
的示例。
using (System.ServiceModel.ServiceHost host =
new System.ServiceModel.ServiceHost(typeof(WSTransferService)))
{
// Service extension for WF
WFServiceHostExtension extension =
new WFServiceHostExtension("WorkflowRuntimeConfig",
"LocalServicesConfig");
// Add the Extension to the ServiceHost collection
host.Extensions.Add(extension);
host.Open();
Console.WriteLine("Press any key to stop server...");
Console.ReadLine();
host.Close();
}
其中,WFServiceHostExtension
是实现了 WF Runtime 服务 IExtension
接口的宿主扩展类。
void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner)
{
// add services from config file
if(_workflowServicesConfig == null)
_workflowRuntime = new WorkflowRuntime();
else
_workflowRuntime = new WorkflowRuntime(_workflowServicesConfig);
// not handled exception
_workflowRuntime.ServicesExceptionNotHandled +=
new EventHandler<SERVICESEXCEPTIONNOTHANDLEDEVENTARGS>
(workflowRuntime_ServicesExceptionNotHandled);
// external services
_exchangeServices =
_workflowRuntime.GetService<EXTERNALDATAEXCHANGESERVICE>();
if (_exchangeServices == null)
{
if (_localServicesConfig == null)
_exchangeServices = new ExternalDataExchangeService();
else
_exchangeServices = new ExternalDataExchangeService(_localServicesConfig);
// add service for exchange data
_workflowRuntime.AddService(_exchangeServices);
}
// Start all services registered in this container
_workflowRuntime.StartRuntime();
}
void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner)
{
_workflowRuntime.StopRuntime();
}
注意,“WorkflowRuntimeConfig
”是配置节的名称,其中包含 WF Runtime 服务并对特定工作流处理是必需的。例如:System.Workflow.Runtime.Hosting.ManualWorkflowSchedulerService
允许在与 WCF 服务操作相同的线程内同步运行工作流活动。
以下配置片段显示了用于添加运行时和本地服务的配置节。
<configSections>
<section name="WorkflowRuntimeConfig"
type="System.Workflow.Runtime.Configuration.WorkflowRuntimeSection,
System.Workflow.Runtime,
Version=3.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
<section name="LocalServicesConfig"
type="System.Workflow.Activities.ExternalDataExchangeServiceSection,
System.Workflow.Activities,
Version=3.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
</configSections>
<WorkflowRuntimeConfig >
<Services>
<add type="System.Workflow.Runtime.Hosting.ManualWorkflowSchedulerService,
System.Workflow.Runtime,
Version=3.0.00000.0, Culture=neutral,
PublicKeyToken=31bf3856ad364e35"/>
</Services>
</WorkflowRuntimeConfig>
<LocalServicesConfig>
<Services>
<add type="RKiss.WSTransfer.Adapters.WSTransferLocalService, WFAdapter,
Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" />
</Services>
</LocalServicesConfig>
WF 本地通信服务 (LCS)
本地通信服务代表了在 WorkflowRuntime
中注册的通信接口的实现,这些接口允许工作流和宿主进程层之间基于 .NET 事件/委托时尚模式进行数据交换。接口必须使用 ExternalDataExchange
属性进行装饰,以便 WF Runtime 找到并提供正确的事件拦截。
让我们使用 Reflector 详细查看这一层。为了将自定义 LCS 引入管道业务,我们必须通过内置的 ExternalDataExchangeService
服务将其附加到 WorkflowRuntime
。
以下代码片段显示了 AddService
方法的实现。注意,行内注释是在 Reflector 后添加的。
public void AddService(object service)
{
// validation
if (service == null)
{
throw new ArgumentNullException("service");
}
if (base.Runtime == null)
{
throw new InvalidOperationException
("Error_ExternalRuntimeContainerNotFound");
}
// plumbing of the local service to the workflow instance
this.InterceptService(service, true);
// add local service into runtime collection
base.Runtime.AddService(service);
}
实际的管道(事件订阅)是在 InterceptService
方法中完成的。以下代码片段是由 Reflector 捕获的,手动注释的行显示了其实现。
private void InterceptService(object service, bool add)
{
bool flag1 = false;
Type[] typeArray1 = service.GetType().GetInterfaces();
// walk through all interfaces and select ones with
// ExternalDataExchange attributed
Type[] typeArray2 = typeArray1;
for (int num2 = 0; num2 < typeArray2.Length; num2++)
{
Type type1 = typeArray2[num2];
object[] objArray1 =
type1.GetCustomAttributes
(typeof(ExternalDataExchangeAttribute), false);
if (objArray1.Length != 0)
{
// get all events for this interface contract
flag1 = true;
EventInfo[] infoArray1 = type1.GetEvents();
if (infoArray1 != null)
{
// walk through all events and assign
// WorkflowMessageEventHandler handler
EventInfo[] infoArray2 = infoArray1;
for (int num3 = 0; num3 < infoArray2.Length; num3++)
{
EventInfo info1 = infoArray2[num3];
WorkflowMessageEventHandler handler1 = null;
int num1 = type1.GetHashCode() ^ info1.Name.GetHashCode();
lock (this.sync)
{
// check collection for this event
if (!this.eventHandlers.ContainsKey(num1))
{
// create event handler to workflow
handler1 = new WorkflowMessageEventHandler
(type1,info1,base.Runtime);
// add to the collection
this.eventHandlers.Add(num1, handler1);
}
else
{
// use handler from the collection
handler1 = this.eventHandlers[num1];
}
}
// add delegate of the event handler to the local service event
this.AddRemove(service, handler1.Delegate, add, info1.Name);
}
}
}
}
if (!flag1)
{
throw new InvalidOperationException("Error_ServiceMissing ...");
}
}
拦截器的概念基于捕获 LCS 中所有正确的事件并订阅它们。LCS 通信接口契约由自定义 ExternalDataExchange
属性找到。对于这些选定的接口,拦截器会遍历每个事件。拦截器创建一个 WorkflowMessageEventHandler
对象,然后将处理程序的委托订阅到事件。此时,事件/委托管道已完成并可供运行时使用。
如上所述,WF Runtime 拦截器负责注入通信层,以抽象化并将源事件传递到工作流事件接收器,例如 HandleExternalEvent
活动。下图显示了宿主和工作流实例之间的主要序列。
宿主(在本例中为 WCF 服务 - 适配器)会引发一个事件,例如 CreateRequest
,并附带应用程序特定的参数,其委托将调用 WorkflowMessageEventhandler
对象上的事件处理程序。处理程序根据事件源参数向 WorkflowRuntime
请求工作流实例。事件源参数是工作流实例 ID。
之后,处理程序将创建一个 MethodMesage
,并在工作流实例上调用 EnqueueItem
方法。该方法会将事件排队到 WorkflowQueuingService
提供的相应队列中。为此,工作流实例将从 WorkflowRuntime
加载 WorkflowExecutor
。一旦事件(消息)存储在队列中,它就可以被订阅的监听器(如 HandleExternalEvent
活动)取消排队。
来自工作流的响应方法比上述请求方法要简单。WF 有一个内置的 CallExternalMethod
活动,它会在 LCS 的通信接口上引发一个事件。宿主应用程序需要实现一个事件处理程序,使用标准的事件/委托时尚模式接收来自工作流的响应参数。
WCF/WF 管道
上述描述显示了到工作流实例或从工作流实例的通信机制。我们需要在 WCF 服务端也有类似的层。下图显示了工作流和服务管道的模型。
如您所见,上述管道需要插入一个特定的适配器来实现服务行为扩展。在本例中,该适配器面向 WS-Transfer,并具有向工作流实例引发事件并以阻塞方式等待其响应的能力。该图显示了一个顺序工作流的示例,其中包含输入活动(事件接收器)、应用程序特定的活动(将资源存储到内存中)和输出活动(将响应消息发送到适配器)。工作流可以根据业务需求进行扩展,添加额外的活动。注意,HandleExternalEvent
和 CallExternalMethod
活动对于基于请求/响应消息交换模式的与服务适配器的正确通信是必需的。将所有层连接在一起,WS-Transfer 消息最终到达可配置的工作流事件接收器,并在那里分派。基于工作流活动,结果通过调用活动传递回服务。注意,适配器以阻塞方式等待此结果。
实现
作为实现的第一步,有必要创建用于适配器和工作流之间消息交换的通信接口。我们可以有一个通用的事件源(Request),用于所有 WS-Transfer 接口的请求方法。请参阅以下代码片段:
[ExternalDataExchange]
public interface IWSTransferLocalServiceIn
{
// to workflow
event EventHandler<RequestEventArgs> Request;
}
下一步是创建一个应用程序特定的 EventArgs
对象,以根据委托签名(如源对象和 EventArgs
)将请求参数传递给事件接收器。以下代码片段显示了此类派生类。
[Serializable]
public class RequestEventArgs : ExternalDataEventArgs
{
private ResourceDescriptor _rd;
private object _resource;
public RequestEventArgs
(Guid InstanceId, ResourceDescriptor rd, object resource) :
base(InstanceId)
{
this._resource = resource;
this._rd = rd;
}
public object Resource
{
get { return _resource; }
set { _resource = value; }
}
public ResourceDescriptor ResourceDescriptor
{
get { return _rd; }
set { _rd = value; }
}
}
在上例的特定 EventArgs
对象中,我们传递了资源描述符和资源主体,这些是工作流进程中实现的资源工厂所需要的。
来自工作流的响应接口契约操作非常简单,只有一个用于传递响应参数的方法。此方法由 CallExternalMethod
工作流活动调用。
[ExternalDataExchange]
public interface IWSTransferLocalServiceOut
{
// from workflow
void RaiseResponseEvent(Guid workflowInstanceId,
ResourceDescriptor rd, object resource);
}
现在,我们可以创建一个本地服务,称之为 WSTransferLocalService
。此类代表了从外部“宿主”层到工作流的通信层。其实现很简单,并且有简单的逻辑用于引发请求或响应的事件。注意,根据 ManualWorkflowScheduler Service
,向工作流引发事件可以选择在同一线程中运行工作流和适配器。以下代码片段显示了 WS-Transfer 适配器的本地服务实现。
public class WSTransferLocalService :
IWSTransferLocalServiceIn, IWSTransferLocalServiceOut
{
private WorkflowRuntime _workflowRuntime;
private ManualWorkflowSchedulerService _scheduler;
public WorkflowRuntime WorkflowRuntime
{ get { return _workflowRuntime; } }
public ManualWorkflowSchedulerService Scheduler
{ get { return _scheduler; } }
public WSTransferLocalService(WorkflowRuntime workflowRuntime)
{
if (workflowRuntime == null)
throw new ArgumentNullException("workflowRuntime");
// the workflow runtime core
_workflowRuntime = workflowRuntime;
// manual workflow scheduler
_scheduler =
_workflowRuntime.GetService<MANUALWORKFLOWSCHEDULERSERVICE>();
}
#region IWSTransferLocalServiceIn Members - to workflow
public event EventHandler<RequestEventArgs> Request;
public void RaiseRequestEvent
(Guid workflowInstanceId, ResourceDescriptor rd)
{
RaiseRequestEvent(workflowInstanceId, rd, null);
}
public void RaiseRequestEvent(Guid workflowInstanceId,
ResourceDescriptor rd, object resource)
{
// Create the EventArgs for this event
RequestEventArgs e =
new RequestEventArgs(workflowInstanceId, rd, resource);
// Raise the event
if (this.Request != null)
{
if (this.Scheduler == null)
{
this.Request(null, e);
}
else
{
this.Scheduler.RunWorkflow(workflowInstanceId);
this.Request(null, e);
this.Scheduler.RunWorkflow(workflowInstanceId);
}
}
}
#endregion
#region IWSTransferLocalServiceOut Members - from workflow
public event EventHandler<ResponseEventArgs> Response;
public void RaiseResponseEvent(Guid workflowInstanceId,
ResourceDescriptor rd, object resource)
{
// Create the EventArgs for this event
ResponseEventArgs e =
new ResponseEventArgs(workflowInstanceId, rd, resource);
// Raise the event
if (this.Response != null)
{
this.Response(null, e);
}
}
#endregion
}
以上是工作流通信层的内容。让我们看看另一边,即 WCF 服务。正如我在 WS-Transfer for WCF 文章中所述,处理物理资源(工厂和操作)的层被封装到适配器中。每条 WS-Transfer 消息都会调用适配器中的特定方法,并将响应返回给服务层,以便映射到 WS-Transfer 消息。方法中实现的逻辑很简单且轻量级,并提供以下功能:
- 根据配置文件创建特定的工作流
- 注册工作流响应的事件接收器
- 向工作流引发请求事件
- 等待工作流响应
- 将结果返回给服务层
注意,上述步骤对于请求/响应消息交换模式是必需的。对于“即发即弃”事件,我们只需要将请求事件发送到工作流接收器。
以下代码片段显示了 Get
方法的实现,用于将消息传递到 WorkflowTypeGet
。
public object Get(MessageHeaders resourceIdentifier)
{
// transaction context
base.Properties["TransactionDependentClone"] =
Transaction.Current == null ?
null : Transaction.Current.DependentClone
(DependentCloneOption.BlockCommitUntilComplete);
// create workflow
WorkflowInstance wi = CreateWorkflow(Defaults.Keys.WorkflowTypeGet);
// register response event
AsyncResponseFromLocalService ar =
new AsyncResponseFromLocalService
(this.wstransferLocalService, wi.InstanceId);
// resource identifier
ResourceDescriptor rd = GetResourceIdentifier(resourceIdentifier);
// fire request
this.wstransferLocalService.RaiseRequestEvent(wi.InstanceId, rd);
// processing
ar.WaitForResponse(workflowTimeoutInSec);
// result
object result = ar.Response.Resource;
// convert xml text formatted resource back to the XmlElement
XmlDocument doc = new XmlDocument();
doc.LoadXml(result as string);
result = doc.DocumentElement;
return result;
}
在同步请求/响应消息交换中,响应以阻塞方式等待消息。以下代码片段显示了为此目的的内部类。调用 WaitForResponse
方法时,线程将等待工作流调用 localservice_FireResponse
方法的事件。此方法将存储响应消息,然后设置同步对象以解除线程阻塞。如果工作流抛出异常,此类将重新抛出异常。
internal class AsyncResponseFromLocalService
{
AutoResetEvent _waitForResponse = new AutoResetEvent(false);
WorkflowTerminatedEventArgs _e;
WSTransferLocalService _localservice;
ResponseEventArgs _response;
Guid _istanceId;
public AsyncResponseFromLocalService
(WSTransferLocalService localservice, Guid instanceid)
{
_istanceId = instanceid;
_localservice = localservice;
// response handler
_localservice.Response +=
new EventHandler<ResponseEventArgs>(localservice_FireResponse);
// exception handler
_localservice.WorkflowRuntime.WorkflowTerminated +=
new EventHandler<WorkflowTerminatedEventArgs>(OnWorkflowTerminated);
}
public void OnWorkflowTerminated
(object sender, WorkflowTerminatedEventArgs e)
{
if (_istanceId == e.WorkflowInstance.InstanceId)
{
this._e = e;
_waitForResponse.Set();
_localservice.WorkflowRuntime.WorkflowTerminated -=
new EventHandler<WorkflowTerminatedEventArgs>
(OnWorkflowTerminated);
}
}
public void WaitForResponse(int secondsTimeOut)
{
bool retval = _waitForResponse.WaitOne(secondsTimeOut * 1000, false);
if (_e != null)
throw this._e.Exception;
if (retval == false)
throw new FaultException("The workflow timeout expired");
}
public ResponseEventArgs Response
{
get { return _response; }
}
public WorkflowTerminatedEventArgs WorkflowTerminatedEventArgs
{
get { return _e; }
}
public void localservice_FireResponse(object sender, ResponseEventArgs e)
{
if (_istanceId == e.InstanceId)
{
_response = e;
_waitForResponse.Set();
_localservice.Response -=
new EventHandler<ResponseEventArgs>(localservice_FireResponse);
}
}
}
测试
完整的解决方案已创建并解耦为小型项目。下图显示了其布局。
我复制了我先前文章 WS-Transfer for WCF 中的 WSTransfer 文件夹,并将其集成到解决方案中,使其成为一个统一的包。然后,我创建了 WFAdapter 文件夹用于实现 WCF 和工作流之间的连接。通过 WFServiceHostExtension,工作流已连接到宿主进程,并且是完全可重用的程序集。应用程序特定的工作流位于 WSTransferWorkflowLibrary 中。注意,这是一个测试库,必须为特定资源创建。我仅为了演示目的创建了 MemoryStorage
工作流。
最后,我们需要一个测试客户端和服务,因此我为该项目创建了一个单独的文件夹。此外,我还添加了我的 Logger,该 Logger 基于消息拦截器,并在宿主控制台程序上发布消息。
当然,要构建此解决方案,我们必须安装 WF 扩展。
下载解决方案并编译后,我们就可以进行测试了。服务宿主程序必须在客户端应用程序控制台程序之前启动。测试期间将产生以下屏幕截图。测试非常简单,即在由工作流中的活动处理的 MemoryStorage
中创建一个资源,然后客户端调用 get、put、delete 等操作。Logger 将通过 WFAdapter 和 Workflow 显示 WS-Transfer 对话。
结论
在本文中,我描述了如何为 WS-Transfer 服务连接 WCF 和工作流。将业务逻辑封装到工作流活动中,使我们能够创建逻辑业务模型,其中连接性完全透明,基于部署架构。带有 WFAdapter 的 WCF Transfer 服务统一了基于 WS-Transfer 规范的工作流之间的通信。基于此概念,我们可以构建其他 WS-* 规范驱动的工作流服务,并将它们插入“服务总线”中,从而利用 SOA 的优势。