65.9K
CodeProject 正在变化。 阅读更多。
Home

用于工作流的 WS-Transfer 服务

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (15投票s)

2006 年 5 月 8 日

CPOL

12分钟阅读

viewsIcon

97905

downloadIcon

457

本文介绍了一种将 WF 工作流连接到 Windows Communication Foundation (WCF) 服务以实现 WS-Transfer 操作契约的设计和实现。

目录

注释

本文使用以下内容编写:

  • .NET Framework 3.0 (RTM)
  • VS 2005 Windows Workflow Foundation 扩展 (RTM)

引言

Microsoft Windows Workflow Foundation (WF) 是 Windows 平台的一部分。它是下一代 .Net Framework 3.0 的核心组件。WF 能够将应用程序解耦为由活动驱动的业务工作流。工作流可以加载到宿主进程中,例如 Windows NT 服务、IIS、WinForm、控制台等,并通过托管层连接到 WF Runtime 核心。该层提供持久化、跟踪、调度、事务和通信支持。当然,还可以创建扩展——例如自定义托管服务,以在宿主进程和工作流之间基于接口/事件契约添加自定义通信服务。

工作流可以通过通信服务,以事件/委托时尚模式相互本地或远程通信。消息交换模式 (MEP) 基于请求和响应事件,传递应用程序特定的数据契约。本文介绍了一种工作流托管连接(管道)到 Windows Communication Foundation (WCF) 服务以实现 WS-Transfer 操作契约的设计和实现。这是我 WS-Transfer for WCF 文章的扩展,其中 WCF 服务被封装到通用的服务层和物理适配器中,用于处理特定资源。在这种情况下,我们的资源是 WF 的工作流实例。

注意,本项目中包含的工作流仅用于演示和测试目的。

拥有一个 WS-* 驱动的工作流模型层后,我们可以将工作流插入“WS-* 服务总线”,如下图所示:

WCF 服务(带有可配置的工作流适配器)能够创建逻辑(分布式)模型,将业务逻辑封装到工作流和活动中。注意,上图仅显示了服务。WCF/WF 客户端可以以类似的方式创建,并轻松插入到服务总线中。当然,任何“传统”的 WS-* 服务/客户端也可以通过工作流模型层以透明的方式连接到服务总线。

本文将仅介绍 WS-Transfer 服务,但其概念和实现对于其他服务(如 WS-Eventing、WS-Enum 等)类似且直接。让我们从连接 WCF 和 WF 这两个基础设施的概念开始。为了理解这个模式,我假设您熟悉我的文章 WS-Transfer for WCF,并且至少有一些 Microsoft WCF 和 WF 技术方面的经验(知识)。

概念与设计实现

实现的概念基于使用 Indigo 范例,通过 WS-Transfer 堆栈驱动 WCF 服务层来封装资源(物理)层。在我们的例子中,物理资源由 WF 的工作流实例表示。

下图显示了 WCF 和 WF 之间连接的最高级别。

如您所见,上述模型已解耦为通信层和业务处理层。这两个层是独立的,拥有自己的技术。服务层负责与 WS-* 堆栈(在本例中为 WS-Transfer)发送和接收消息,以及与特定资源进行通信。这是一个通用的服务层,可以通过编程方式或通过配置文件进行管理,具备服务行为扩展的能力。另一个层——资源层,是业务特定的层,由业务工作流模型表示。这两个层都可以根据应用程序的需求,在自己的行为中同步或异步运行。

配置文件的以下部分显示了内存存储工作流操作的服务行为扩展。

<configuration>
 <system.serviceModel>

  <behaviors>
   <serviceBehaviors>
    <behavior name="WxfServiceExtension" >
      <wsTransferAdapter
        name="PublicStorage"
        type="RKiss.WSTransfer.Adapters.WFAdapter, WFAdapter"
        TransactionScopeRequired="true"
        TransactionAutoComplete="true"
        Topic="Imaging"
        WorkflowTypeCreate=
            "WFLibTest.MemoryStorage.WorkflowCreate, WFLibTest"
        WorkflowTypeGet="WFLibTest.MemoryStorage.WorkflowGet, WFLibTest"
        WorkflowTypePut="WFLibTest.MemoryStorage.WorkflowPut, WFLibTest"
        WorkflowTypeDelete=
            "WFLibTest.MemoryStorage.WorkflowDelete, WFLibTest" />
    </behavior>
   </serviceBehaviors>
  </behaviors>

  <extensions>
   <behaviorExtensions>
    <add name="wsTransferAdapter"
         type="RKiss.WSTransfer.ServiceAdapterBehaviorElement, WSTransfer,
               Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"/>
   </behaviorExtensions>
  </extensions>

 </system.serviceModel>
</configuration>

在上例中,每个 WS-Transfer 操作(Create、Get、Put 和 Delete)都已配置为处理与 Topic="Imaging" 相关的事务性工作流。存储的名称是“PublicStorage”。管道适配器——WCF 服务和 WF 之间的连接——是在 WFAdapter 程序集中实现的。

用于工作流处理的 WS-Transfer 适配器内置了以下属性:

属性名称 默认值 注释
名称 null 适配器名称
type null 适配器类的类型
TransactionScopeRequired "false" 需要事务范围
TransactionAutoComplete "true" 事务范围完成
主题 null 资源的主题
WorkflowType null 通用工作流类的类型
WorkflowTypeCreate null 用于创建资源的类的工作流类型
WorkflowTypeGet null 用于获取资源的类的工作流类型
WorkflowTypePut null 用于放置资源的类的工作流类型
WorkflowTypeDelete null 用于删除资源的类的工作流类型

注意,属性集合在工作流初始化阶段传递给工作流实例。

托管 WF Runtime 服务

WF Runtime 服务可以通过 ServiceHost 扩展机制附加到 WCF 托管进程(每个 AppDomain 只能有一个)。以下代码片段展示了由控制台程序托管的 WSTransferService 的示例。

using (System.ServiceModel.ServiceHost host =
    new System.ServiceModel.ServiceHost(typeof(WSTransferService)))
{
    // Service extension for WF
    WFServiceHostExtension extension =
      new WFServiceHostExtension("WorkflowRuntimeConfig", 
                        "LocalServicesConfig");

    // Add the Extension to the ServiceHost collection
    host.Extensions.Add(extension);

    host.Open();

    Console.WriteLine("Press any key to stop server...");
    Console.ReadLine();
    host.Close();
}

其中,WFServiceHostExtension 是实现了 WF Runtime 服务 IExtension 接口的宿主扩展类。

void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner)
{
  // add services from config file
  if(_workflowServicesConfig == null)
    _workflowRuntime = new WorkflowRuntime();
  else
    _workflowRuntime = new WorkflowRuntime(_workflowServicesConfig);

  // not handled exception
  _workflowRuntime.ServicesExceptionNotHandled +=
    new EventHandler<SERVICESEXCEPTIONNOTHANDLEDEVENTARGS>
            (workflowRuntime_ServicesExceptionNotHandled);

  // external services
  _exchangeServices = 
    _workflowRuntime.GetService<EXTERNALDATAEXCHANGESERVICE>();
  if (_exchangeServices == null)
  {
    if (_localServicesConfig == null)
     _exchangeServices = new ExternalDataExchangeService();
    else
     _exchangeServices = new ExternalDataExchangeService(_localServicesConfig);

    // add service for exchange data
    _workflowRuntime.AddService(_exchangeServices);
  }

  // Start all services registered in this container
  _workflowRuntime.StartRuntime();
}
void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner)
{
    _workflowRuntime.StopRuntime();
}

注意,“WorkflowRuntimeConfig”是配置节的名称,其中包含 WF Runtime 服务并对特定工作流处理是必需的。例如:System.Workflow.Runtime.Hosting.ManualWorkflowSchedulerService 允许在与 WCF 服务操作相同的线程内同步运行工作流活动。

以下配置片段显示了用于添加运行时和本地服务的配置节。

<configSections>
 <section name="WorkflowRuntimeConfig"
    type="System.Workflow.Runtime.Configuration.WorkflowRuntimeSection, 
        System.Workflow.Runtime,
         Version=3.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
  <section name="LocalServicesConfig"
    type="System.Workflow.Activities.ExternalDataExchangeServiceSection, 
        System.Workflow.Activities,
         Version=3.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
</configSections>

<WorkflowRuntimeConfig >
  <Services>
   <add type="System.Workflow.Runtime.Hosting.ManualWorkflowSchedulerService, 
            System.Workflow.Runtime,
             Version=3.0.00000.0, Culture=neutral, 
            PublicKeyToken=31bf3856ad364e35"/>
  </Services>
</WorkflowRuntimeConfig>

<LocalServicesConfig>
  <Services>
   <add type="RKiss.WSTransfer.Adapters.WSTransferLocalService, WFAdapter,
              Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" />
  </Services>
</LocalServicesConfig>

WF 本地通信服务 (LCS)

本地通信服务代表了在 WorkflowRuntime 中注册的通信接口的实现,这些接口允许工作流和宿主进程层之间基于 .NET 事件/委托时尚模式进行数据交换。接口必须使用 ExternalDataExchange 属性进行装饰,以便 WF Runtime 找到并提供正确的事件拦截。

让我们使用 Reflector 详细查看这一层。为了将自定义 LCS 引入管道业务,我们必须通过内置的 ExternalDataExchangeService 服务将其附加到 WorkflowRuntime

以下代码片段显示了 AddService 方法的实现。注意,行内注释是在 Reflector 后添加的。

public void AddService(object service)
{
    // validation
    if (service == null)
    {
        throw new ArgumentNullException("service");
    }
    if (base.Runtime == null)
    {
        throw new InvalidOperationException
            ("Error_ExternalRuntimeContainerNotFound");
    }

    // plumbing of the local service to the workflow instance 
    this.InterceptService(service, true);

    // add local service into runtime collection 
    base.Runtime.AddService(service);
}

实际的管道(事件订阅)是在 InterceptService 方法中完成的。以下代码片段是由 Reflector 捕获的,手动注释的行显示了其实现。

private void InterceptService(object service, bool add)
{
  bool flag1 = false;
  Type[] typeArray1 = service.GetType().GetInterfaces();

  // walk through all interfaces and select ones with 
  // ExternalDataExchange attributed
  Type[] typeArray2 = typeArray1;
  for (int num2 = 0; num2 < typeArray2.Length; num2++)
  {
    Type type1 = typeArray2[num2];
    object[] objArray1 =
        type1.GetCustomAttributes
        (typeof(ExternalDataExchangeAttribute), false);
    if (objArray1.Length != 0)
    {
      // get all events for this interface contract
      flag1 = true;
      EventInfo[] infoArray1 = type1.GetEvents();
      if (infoArray1 != null)
      {
        // walk through all events and assign 
        // WorkflowMessageEventHandler handler
        EventInfo[] infoArray2 = infoArray1;
        for (int num3 = 0; num3 < infoArray2.Length; num3++)
        {
          EventInfo info1 = infoArray2[num3];
          WorkflowMessageEventHandler handler1 = null;
          int num1 = type1.GetHashCode() ^ info1.Name.GetHashCode();
          lock (this.sync)
          {
            // check collection for this event
            if (!this.eventHandlers.ContainsKey(num1))
            {
                  // create event handler to workflow
                  handler1 = new WorkflowMessageEventHandler
                    (type1,info1,base.Runtime);

                  // add to the collection
                  this.eventHandlers.Add(num1, handler1);
            }
            else
            {
                  // use handler from the collection
                  handler1 = this.eventHandlers[num1];
            }
          }

          // add delegate of the event handler to the local service event
          this.AddRemove(service, handler1.Delegate, add, info1.Name);
        }
      }
    }
  }
  if (!flag1)
  {
        throw new InvalidOperationException("Error_ServiceMissing ...");
  }
}

拦截器的概念基于捕获 LCS 中所有正确的事件并订阅它们。LCS 通信接口契约由自定义 ExternalDataExchange 属性找到。对于这些选定的接口,拦截器会遍历每个事件。拦截器创建一个 WorkflowMessageEventHandler 对象,然后将处理程序的委托订阅到事件。此时,事件/委托管道已完成并可供运行时使用。

如上所述,WF Runtime 拦截器负责注入通信层,以抽象化并将源事件传递到工作流事件接收器,例如 HandleExternalEvent 活动。下图显示了宿主和工作流实例之间的主要序列。

宿主(在本例中为 WCF 服务 - 适配器)会引发一个事件,例如 CreateRequest ,并附带应用程序特定的参数,其委托将调用 WorkflowMessageEventhandler 对象上的事件处理程序。处理程序根据事件源参数向 WorkflowRuntime 请求工作流实例。事件源参数是工作流实例 ID。

之后,处理程序将创建一个 MethodMesage ,并在工作流实例上调用 EnqueueItem 方法。该方法会将事件排队到 WorkflowQueuingService 提供的相应队列中。为此,工作流实例将从 WorkflowRuntime 加载 WorkflowExecutor。一旦事件(消息)存储在队列中,它就可以被订阅的监听器(如 HandleExternalEvent 活动)取消排队。

来自工作流的响应方法比上述请求方法要简单。WF 有一个内置的 CallExternalMethod 活动,它会在 LCS 的通信接口上引发一个事件。宿主应用程序需要实现一个事件处理程序,使用标准的事件/委托时尚模式接收来自工作流的响应参数。

WCF/WF 管道

上述描述显示了到工作流实例或从工作流实例的通信机制。我们需要在 WCF 服务端也有类似的层。下图显示了工作流和服务管道的模型。

如您所见,上述管道需要插入一个特定的适配器来实现服务行为扩展。在本例中,该适配器面向 WS-Transfer,并具有向工作流实例引发事件并以阻塞方式等待其响应的能力。该图显示了一个顺序工作流的示例,其中包含输入活动(事件接收器)、应用程序特定的活动(将资源存储到内存中)和输出活动(将响应消息发送到适配器)。工作流可以根据业务需求进行扩展,添加额外的活动。注意,HandleExternalEvent CallExternalMethod 活动对于基于请求/响应消息交换模式的与服务适配器的正确通信是必需的。将所有层连接在一起,WS-Transfer 消息最终到达可配置的工作流事件接收器,并在那里分派。基于工作流活动,结果通过调用活动传递回服务。注意,适配器以阻塞方式等待此结果。

实现

作为实现的第一步,有必要创建用于适配器和工作流之间消息交换的通信接口。我们可以有一个通用的事件源(Request),用于所有 WS-Transfer 接口的请求方法。请参阅以下代码片段:

  [ExternalDataExchange]
  public interface IWSTransferLocalServiceIn
  {
      // to workflow
      event EventHandler<RequestEventArgs> Request;
  }

下一步是创建一个应用程序特定的 EventArgs 对象,以根据委托签名(如源对象和 EventArgs)将请求参数传递给事件接收器。以下代码片段显示了此类派生类。

  [Serializable]
  public class RequestEventArgs : ExternalDataEventArgs
  {
      private ResourceDescriptor _rd;
      private object _resource;
      public RequestEventArgs
        (Guid InstanceId, ResourceDescriptor rd, object resource) :
        base(InstanceId)
      {
          this._resource = resource;
          this._rd = rd;
      }
      public object Resource
      {
          get { return _resource; }
          set { _resource = value; }
      }
      public ResourceDescriptor ResourceDescriptor
      {
          get { return _rd; }
          set { _rd = value; }
      }
  }

在上例的特定 EventArgs 对象中,我们传递了资源描述符和资源主体,这些是工作流进程中实现的资源工厂所需要的。

来自工作流的响应接口契约操作非常简单,只有一个用于传递响应参数的方法。此方法由 CallExternalMethod 工作流活动调用。

  [ExternalDataExchange]
  public interface IWSTransferLocalServiceOut
  {
      // from workflow
      void RaiseResponseEvent(Guid workflowInstanceId, 
                ResourceDescriptor rd, object resource);
  }

现在,我们可以创建一个本地服务,称之为 WSTransferLocalService。此类代表了从外部“宿主”层到工作流的通信层。其实现很简单,并且有简单的逻辑用于引发请求或响应的事件。注意,根据 ManualWorkflowScheduler Service,向工作流引发事件可以选择在同一线程中运行工作流和适配器。以下代码片段显示了 WS-Transfer 适配器的本地服务实现。

 public class WSTransferLocalService :
  IWSTransferLocalServiceIn, IWSTransferLocalServiceOut
 {
    private WorkflowRuntime _workflowRuntime;
    private ManualWorkflowSchedulerService _scheduler;
    public WorkflowRuntime WorkflowRuntime 
                { get { return _workflowRuntime; } }
    public ManualWorkflowSchedulerService Scheduler 
                { get { return _scheduler; } }

    public WSTransferLocalService(WorkflowRuntime workflowRuntime)
    {
      if (workflowRuntime == null)
         throw new ArgumentNullException("workflowRuntime");

      // the workflow runtime core
      _workflowRuntime = workflowRuntime;

      // manual workflow scheduler
      _scheduler = 
        _workflowRuntime.GetService<MANUALWORKFLOWSCHEDULERSERVICE>();
    }

    #region IWSTransferLocalServiceIn Members - to workflow
    public event EventHandler<RequestEventArgs> Request;
    public void RaiseRequestEvent
            (Guid workflowInstanceId, ResourceDescriptor rd)
    {
      RaiseRequestEvent(workflowInstanceId, rd, null);
    }
    public void RaiseRequestEvent(Guid workflowInstanceId,
      ResourceDescriptor rd, object resource)
    {
        // Create the EventArgs for this event
        RequestEventArgs e =
          new RequestEventArgs(workflowInstanceId, rd, resource);

        // Raise the event
        if (this.Request != null)
        {
            if (this.Scheduler == null)
            {
                this.Request(null, e);
            }
            else
            {
                this.Scheduler.RunWorkflow(workflowInstanceId);
                this.Request(null, e);
                this.Scheduler.RunWorkflow(workflowInstanceId);
            }
        }
    }
    #endregion

    #region IWSTransferLocalServiceOut Members - from workflow
    public event EventHandler<ResponseEventArgs> Response;
    public void RaiseResponseEvent(Guid workflowInstanceId,
      ResourceDescriptor rd, object resource)
    {
        // Create the EventArgs for this event
        ResponseEventArgs e =
          new ResponseEventArgs(workflowInstanceId, rd, resource);

        // Raise the event
        if (this.Response != null)
        {
            this.Response(null, e);
        }
    }
    #endregion
 }

以上是工作流通信层的内容。让我们看看另一边,即 WCF 服务。正如我在 WS-Transfer for WCF 文章中所述,处理物理资源(工厂和操作)的层被封装到适配器中。每条 WS-Transfer 消息都会调用适配器中的特定方法,并将响应返回给服务层,以便映射到 WS-Transfer 消息。方法中实现的逻辑很简单且轻量级,并提供以下功能:

  • 根据配置文件创建特定的工作流
  • 注册工作流响应的事件接收器
  • 向工作流引发请求事件
  • 等待工作流响应
  • 将结果返回给服务层

注意,上述步骤对于请求/响应消息交换模式是必需的。对于“即发即弃”事件,我们只需要将请求事件发送到工作流接收器。

以下代码片段显示了 Get 方法的实现,用于将消息传递到 WorkflowTypeGet

 public object Get(MessageHeaders resourceIdentifier)
 {
    // transaction context
    base.Properties["TransactionDependentClone"] = 
        Transaction.Current == null ?
        null : Transaction.Current.DependentClone
        (DependentCloneOption.BlockCommitUntilComplete);

    // create workflow
    WorkflowInstance wi = CreateWorkflow(Defaults.Keys.WorkflowTypeGet);

    // register response event
    AsyncResponseFromLocalService ar =
      new AsyncResponseFromLocalService
        (this.wstransferLocalService, wi.InstanceId);

    // resource identifier
    ResourceDescriptor rd = GetResourceIdentifier(resourceIdentifier);

    // fire request
    this.wstransferLocalService.RaiseRequestEvent(wi.InstanceId, rd);

    // processing
    ar.WaitForResponse(workflowTimeoutInSec);

    // result
    object result = ar.Response.Resource;

    // convert xml text formatted resource back to the XmlElement
    XmlDocument doc = new XmlDocument();
    doc.LoadXml(result as string);
    result = doc.DocumentElement;

    return result;
 }

在同步请求/响应消息交换中,响应以阻塞方式等待消息。以下代码片段显示了为此目的的内部类。调用 WaitForResponse 方法时,线程将等待工作流调用 localservice_FireResponse 方法的事件。此方法将存储响应消息,然后设置同步对象以解除线程阻塞。如果工作流抛出异常,此类将重新抛出异常。

  internal class AsyncResponseFromLocalService
  {
    AutoResetEvent _waitForResponse = new AutoResetEvent(false);
    WorkflowTerminatedEventArgs _e;
    WSTransferLocalService _localservice;
    ResponseEventArgs _response;
    Guid _istanceId;
    public AsyncResponseFromLocalService
        (WSTransferLocalService localservice, Guid instanceid)
    {
        _istanceId = instanceid;
        _localservice = localservice;

        // response handler
        _localservice.Response +=
          new EventHandler<ResponseEventArgs>(localservice_FireResponse);

        // exception handler
        _localservice.WorkflowRuntime.WorkflowTerminated +=
          new EventHandler<WorkflowTerminatedEventArgs>(OnWorkflowTerminated);
    }
    public void OnWorkflowTerminated
        (object sender, WorkflowTerminatedEventArgs e)
    {
        if (_istanceId == e.WorkflowInstance.InstanceId)
        {
            this._e = e;
            _waitForResponse.Set();
            _localservice.WorkflowRuntime.WorkflowTerminated -=
              new EventHandler<WorkflowTerminatedEventArgs>
                        (OnWorkflowTerminated);
        }
    }
    public void WaitForResponse(int secondsTimeOut)
    {
        bool retval = _waitForResponse.WaitOne(secondsTimeOut * 1000, false);
        if (_e != null)
            throw this._e.Exception;
        if (retval == false)
            throw new FaultException("The workflow timeout expired");
    }
    public ResponseEventArgs Response
    {
        get { return _response; }
    }
    public WorkflowTerminatedEventArgs WorkflowTerminatedEventArgs
    {
        get { return _e; }
    }
    public void localservice_FireResponse(object sender, ResponseEventArgs e)
    {
        if (_istanceId == e.InstanceId)
        {
            _response = e;
            _waitForResponse.Set();
            _localservice.Response -=
              new EventHandler<ResponseEventArgs>(localservice_FireResponse);
        }
    }
  }

测试

完整的解决方案已创建并解耦为小型项目。下图显示了其布局。

我复制了我先前文章 WS-Transfer for WCF 中的 WSTransfer 文件夹,并将其集成到解决方案中,使其成为一个统一的包。然后,我创建了 WFAdapter 文件夹用于实现 WCF 和工作流之间的连接。通过 WFServiceHostExtension,工作流已连接到宿主进程,并且是完全可重用的程序集。应用程序特定的工作流位于 WSTransferWorkflowLibrary 中。注意,这是一个测试库,必须为特定资源创建。我仅为了演示目的创建了 MemoryStorage 工作流。

最后,我们需要一个测试客户端和服务,因此我为该项目创建了一个单独的文件夹。此外,我还添加了我的 Logger,该 Logger 基于消息拦截器,并在宿主控制台程序上发布消息。

当然,要构建此解决方案,我们必须安装 WF 扩展。

下载解决方案并编译后,我们就可以进行测试了。服务宿主程序必须在客户端应用程序控制台程序之前启动。测试期间将产生以下屏幕截图。测试非常简单,即在由工作流中的活动处理的 MemoryStorage 中创建一个资源,然后客户端调用 get、put、delete 等操作。Logger 将通过 WFAdapter 和 Workflow 显示 WS-Transfer 对话。

结论

在本文中,我描述了如何为 WS-Transfer 服务连接 WCF 和工作流。将业务逻辑封装到工作流活动中,使我们能够创建逻辑业务模型,其中连接性完全透明,基于部署架构。带有 WFAdapter 的 WCF Transfer 服务统一了基于 WS-Transfer 规范的工作流之间的通信。基于此概念,我们可以构建其他 WS-* 规范驱动的工作流服务,并将它们插入“服务总线”中,从而利用 SOA 的优势。

© . All rights reserved.