65.9K
CodeProject 正在变化。 阅读更多。
Home

从 Azure Functions 调用工作流

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2017年7月21日

CPOL

16分钟阅读

viewsIcon

11441

downloadIcon

174

从 Azure Function 调用 WF4 的设计、实现和用法

目录

特点

  • 推送模型
  • 消息中介器(管道)使用 VETER 模式
  • 消息封装到声明式无状态工作流中介器
  • 无服务器要求
  • 业务集成器
  • 支持元数据驱动架构
  • 契约优先和模型优先,使用无类型消息传递
  • 敏捷部署
  • Azure 平台
  • WF4 技术
  • WCF 技术

引言

Microsoft Azure Function 代表了一种在云无服务器平台中运行小块代码(称为函数)的解决方案——函数即服务 (FaaS)。基本上,云提供商会给你一个“函数样板”,其中包含用于处理你的业务逻辑的输入,以及提供的输出。所有输入和输出都基于声明式要求进行绑定。无服务器架构模型中的函数基于事件驱动的触发器和/或 REST API 网关以“推送”方式处理,用于将消息推(转发)到特定函数。触发器和/或 API 网关是无服务器架构的一部分,它们对函数完全透明,换句话说,用户只需关注函数的正文及其输入/输出绑定。

无服务器架构基于推送模型,事件被推送到函数。这种模型可以很好地扩展,并为事件流提供高吞吐量。请注意,无服务器 FaaS 平台的消费者无需处理可伸缩性、重启、回收、触发、托管等应用程序。这完全是透明的,由云提供商(如 Microsoft Azure)处理。

这是云计算的一项伟大功能和下一步演进。

因此,以下屏幕截图显示了 Azure Function 集成样板

集成到无服务器模型中的函数样板基于触发器类型的选择,事件发生的位置以及输入/输出绑定,例如获取函数输入/输出数据的方式(如果需要)。每个函数都有自己的定义,存储在 .json 格式的文本文件中,例如 *function.json*。有关 Azure Functions 的更多详细信息,可以在此处找到。

正如你所见,这个样板(签名)非常类似于运行时处理器调用的方法,数据/引用被传递到方法(函数)进行处理,并通过输出(或返回值)返回给调用者。请注意,函数是一个无状态的消息中介原语,具有敏捷能力,换句话说,函数说:“给我输入,我就会给你输出”。

从函数代码的角度来看,代码在“无服务器主机环境”中执行,这意味着它在云 PaaS 的某个地方被加载(托管)和执行,其处理对环境是敏捷的。函数正文的类型可以是命令式的,例如通过代码,也可以是声明式的,由 XAML 描述。

因此,基于上述描述,以下屏幕显示了一个带有 XAML 的函数

上图显示函数中介器被封装在由 XML 格式文本描述的 XAML 定义中。此资源可以本地存储在函数文件夹中,也可以集中存储到存储库中,例如存储容器。我的文章 Azure Virtual Bridge 中描述了这一概念,并在生产环境中得到了成功验证。XAML 调用器的实现已集成到 WCF 代理堆栈中,作为自定义工作流通道。这个设计思想和概念很久以前就已创建,它展示了正确的路线图(请参阅我在参考文献中的旧文章)。

以下屏幕截图展示了无服务器架构中 API 网关的概念,其中无类型消息被路由到自定义工作流通道以呈现 XAML 定义

在上面的示例中,WCF 路由服务托管在 Web 角色中,工作流 XAML 渲染器也托管在那里。换句话说,工作流渲染器(无状态 XAML 中介器)可以在任何托管了样板代码的地方运行。

部署上述“API REST/Soap 网关”非常简单。它只需要一个 *web.config* 文件和一个程序集,如 *WorkflowChannel.dll*。请注意,此部署包可以部署在 Azure 或本地/IIS 服务器上。在这种情况下,函数调用入口点的方式与其他任何 REST/Soap 服务一样。

在本文中,我想向您展示另一种调用 `WorkflowChannel` 的方法,即通过函数“托管” XAML 中介器,换句话说,在函数正文中调用带有自定义通道的 WCF 客户端代理。

因此,以下屏幕截图显示了 Azure Function 样板中的一个工作流通道,它基本上托管在无服务器框架中,这对 XAML 完全透明

在函数代码中使用自定义 **WCF 工作流通道**允许将 XAML 定义封装到存储库存储中。函数输入以同步方式传递给 WCF 客户端代理,然后返回到输出。这是 WCF 堆栈的标准消息交换模式,就像调用 REST 服务等一样。WCF 工作流通道的连接是通过编程完成的,并且对于任何 XAML 中介器都是完全可重用的。

从工作流基金会技术来看,XAML 中介器是无状态的、非持久化的工作流,在内存中运行。状态可以通过值或引用的形式在消息中流动。XAML 中介器支持无服务器架构的所有原则。我建议阅读我以前的文章《WF4 Custom activities for message mediation》,其中我详细描述了使用 XAML 等声明式方法的消息中介器。

消息通过消息管道从输入流经函数到达输出。此消息管道可以被公式化为众所周知的模式,例如 VETER。

以下屏幕截图显示了无状态函数消息中介的逻辑模式

来自 `Trigger` 的传入消息会经过 `Validation`,然后可以通过其他输入进行 `Enlarged`(扩展),接着到 `Transformation`,在此可以使用消息原语和活动转换载荷,之后,消息可以被 `Enlarged`(例如,添加一些标头),最后 `Route` 到 `Outputs`。

拥有上述在 XAML 定义中定义的 VETER 模式,并通过 Azure Function 样板“托管”,使我们能够将业务部分封装到资源中,并由业务人员(而不是开发人员/编码人员)处理。业务模型可以分解为小型业务函数(中介器),实际部署在逻辑上集中的资源(XAML、XSLT、JSON 等)中,位于存储库中,物理上分散在函数中,并在无服务器框架中进行运行时呈现。在设计时,我们可以根据运行时呈现部署的其他元数据,使用合适的工具设计这些资源(元数据)。因此,这一概念完全支持元数据驱动架构。

为了更好地理解上述模型,以下屏幕截图显示了由 Azure Function “托管”的 VETER 消息中介器

正如你所见,有一个设计工具用于在 Blob 存储存储库中创建 VETER (XAML) 定义,以及一个 WCF 工作流通道用于拉取此资源以在 Azure Function 中进行运行时呈现。请注意,VETER 定义是一种动态活动类型,可以在设计时重新编译为自定义活动类型,并且其程序集可以存储在程序集容器存储中。在这种情况下,存储库将只包含运行时资源,如 XSLT(如果需要)。

以下屏幕截图显示了一个 Azure Function (AFN) 在 IoT 数据流管道中的工作流示例

函数与 IoT Hub Stream 的集成由 *function.json* 文件中存储的元数据描述,请参阅以下代码片段

{
  "scriptFile": "..\\bin\\Microservices.dll",
  "entryPoint": "RKiss.AFNLib.AFNLib.EventToEvent",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "ed",
      "direction": "in",
      "path": "ba2017-iotdemo",
      "connection": "iothub",
      "consumerGroup": "$Default"
    },
    {
      "type": "eventHub",
      "name": "outputEventHubMessage",
      "path": "outeventhub",
      "connection": "bridge",
      "direction": "out"
    }
  ],
  "disabled": false
}

正如你所见,上面的绑定有两个定义,一个用于输入,另一个用于输出。入口点在 *Microservices.dll* 程序集中声明。这意味着在函数冷启动期间,没有用于运行时编译的代码(XAML 中介器除外)。

请注意,上述函数元数据对于任何类型的事件中介都是相同的,因为业务工作已委托给 XAML 定义,其中可以使用预构建的原语、xslt 技术等以声明方式中介事件。

太棒了!

好的,让我们来描述一下它的概念和设计。我假设你对 Azure Functions、WCF4 和 WF4 技术有一定的了解。

概念与设计

Azure 工作流通道的概念基于已有的 WCF 自定义工作流通道,该通道此处有详细描述。请注意,Azure Functions 的无服务器框架不允许使用自定义 *web.config*,在该文件中可以配置客户端代理。换句话说,我们必须通过编程方式创建带有自定义绑定的 WCF 客户端代理以用于 `WorkflowChannel`。

以下代码片段显示了一个 C# `Function` Http Trigger 调用 XAML 代码的示例

正如你所见,上述函数实现起来非常直接。基本上,所有强大的魔幻逻辑都隐藏在 `Mediator` 方法中,该方法位于外部程序集 *Microservices.dll* 中,存储在 *wwwroot* 的自定义文件夹 *bin* 中。正如你稍后将看到的,`Mediator` 实际上是 WCF 自定义通道代理的 Http 包装器,为了 Azure Function 使用而进行了装饰。

以下屏幕截图显示了此函数其余的文件,例如 *function.json* 和 *project.json*(可选)

正如你所见,绑定和依赖关系是标准声明的,因此与自定义方法 `Mediator` 没有特别之处。

下图显示了 `Mediator` 在由 Http 触发的 C# Function 中的位置。业务逻辑在工作流定义中声明

当然,`Mediator` 可以用作预编译的 Http Trigger Function,请参阅以下声明

太棒了!只需要一个文件,如 *function.json*,就可以进行消息中介。请注意,Azure Function Apps 的当前版本仅允许 C# Functions 使用 Nuget 包,因此没有 *project.json* 文件。

为了最大限度地减少 Azure Function 中的编码,创建了以下类

所有方法都代表预编译的 C# Azure Functions,请参阅其签名

namespace RKiss.AFNLib
{
  public class Microservices
  {  
    public static async Task<HttpResponseMessage> 
    Mediator(HttpRequestMessage req, TraceWriter log, ExecutionContext executionContext)
    
    public static async Task<BrokeredMessage> 
    BMsgToBMsg(BrokeredMessage bmsg, TraceWriter log, ExecutionContext executionContext)
              
    public static async Task<EventData> 
    EventToEvent(EventData ed, TraceWriter log, ExecutionContext executionContext)
               
    public static async Task<EventData> 
    EventToStream(EventData ed, TraceWriter log, 
    ExecutionContext executionContext, Stream outputMediator)
              
    public static async Task<EventData> 
    EventToAppendBlob(EventData ed, TraceWriter log, 
    ExecutionContext executionContext, ICloudBlob appendBlob)   
  }
}

主方法是 `HTTP Trigger` 的 `Mediator`,如以下屏幕截图所示

其他方法(预编译函数)基本上是特定消息类型的包装器,如 `BrokeredMessage`、`EventData` 等。

数据流管道

本示例展示了预编译函数在 Event Hubs 和输出 blob 之间的集成,用于将事件归档在 `AppendBlob` 中

来自 IoT Hub 的每个事件都会被转发到工作流通道进行中介。中介结果(VETER 模式)会返回给下一个 Event Hub 和 Blob 存储。基本上,此连接需要两个声明,如 *function.json* 中的集成和工作流定义(XAML)。一旦完成了集成(例如,作为通用模板),我们就可以专注于事件中介过程。

对于数据流管道,`Microservices` 类内置了 `EventData` 包装器,围绕 `Mediator` 方法,请参阅 `EventToEvent` 方法。此方法允许我们将 XAML 中介器与 Event Hubs 集成。额外的输出可以在 C# Function 中作为特定输出的后中介代码完成。在 `Microservices` 类中包含了两个实现,请参阅 `EventToStream` 和 `EventToAppendBlob` 方法。

请注意,Azure Functions 目前不支持 `appendBlob` 类型,因此我的解决方法是使用接口 `ICloudBlob`(方向 `inout`)。辅助方法 `DataToAppendBlob` 负责将此接口处理为 `CloudAppendBlob` 对象。

数据流管道的 C# Function

本示例展示了一个围绕 `EventToEvent` 方法的通用 C# 样板

function.json

{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "ed",
      "direction": "in",
      "path": "ba2017-iotdemo",
      "connection": "iothub",
      "consumerGroup": "$Default"
    },
    {
      "type": "blob",
      "name": "outputMediator",
      "path": "archive/demo/transactions/{rand-guid}",
      "connection": "mySTORAGE",
      "direction": "out"
    },
    {
      "type": "eventHub",
      "name": "$return",
      "path": "eventhubdemo",
      "connection": "ehdemo",
      "direction": "out"
    }
  ],
  "disabled": false
}

run.csx

#r "Microsoft.ServiceBus"
#r "..\\bin\\Microservices.dll" 

using System;
using Microsoft.ServiceBus.Messaging;
using RKiss.AFNLib;

public static async Task<EventData> Run(EventData ed, TraceWriter log, 
              ExecutionContext executionContext, Stream outputMediator)
{
    log.Info($"[{executionContext.InvocationId}]EventHub 
                 trigger function EventToBlob: {ed.SequenceNumber}");

    // pre-mediator (e.g. routing message based on the payload data, headers, etc.)
    
    // mediator
    var response = await RKiss.AFNLib.Microservices.EventToEvent
                   (ed, log, executionContext);

    // post-mediator (e.g. routing response to the outputs, etc.)

    // return value
    var responseClone = response.Clone();

    //response payload
    var byteArray = response.GetBytes();
    await outputMediator.WriteAsync(byteArray, 0, byteArray.Length);

    // return event
    return responseClone;
}

正如你所见,上述 C# 函数体是一个非常直接的实现。根据需要,我们可以包含一个轻量级的预中介器和/或后中介器。例如,如果我们想根据输入事件调用特定的 XAML VETER 中介器,那么 `pre-mediator` 是正确的位置,并将 `ed.Properties["afn-mediator-name"]` 属性设置为处理此消息中介的工作流定义名称。

我希望上述示例能让你对 Azure Workflow Function 有所了解,其主体是通过自定义 WCF 通道实现的工作流调用器。

好的,该是时候了。让我们详细描述一下 Azure Workflow Function 如何配置以及准备好使用。我假设你对 Azure Functions 和 Workflow Foundation WF4 技术有一定的了解。

用法

使用 **Azure Workflow Function** 需要执行一些步骤,例如添加预编译函数的程序集以及用于工作流定义和自定义程序集存储库的应用设置。此连接的逻辑图如下所示

让我们一步一步开始,为我们的可运行 **Azure Workflow Function** 配置,以调用位于已配置存储库中的任何工作流定义。

步骤 1. 下载解决方案/二进制文件

在此步骤中,我们需要从(本文)下载完整的源代码解决方案或仅二进制文件。

Visual Studio 2017 解决方案

二进制包

上述解决方案有三个项目,例如

  • `Microservices` 项目,这是一个预编译的 Azure 工作流函数的库。请注意,该项目的一部分是 `WorkflowChannel` 程序集。
  • `WorklowActivities` 项目,这是一个用于工作流活动和类扩展的自定义程序集。
  • `ActivityLibraryTemplates` 项目,这是一个 XAML 工作流样本的项目。我们需要将这些 XAML 文件存储在 Blob 存储容器 `microservices` 中。

步骤 2. WorkflowDefinitionStorage (Blob Storage)

此步骤的目标是在你的 Blob 存储帐户中创建两个容器,例如容器 `microservices` 和 `assemblies`。将步骤 1 中的所有 XAML 文件复制到 `microservices` 容器中。使用以下 blob 命名和 `content-type`,如下图所示

将程序集 *WorkflowActivities.dll* 复制到 `assemblies` 容器中。请注意,blob 名称不带 .dll 扩展名。

现在,我们在 `microservices` 容器中只有三个示例,但你可以根据需要创建更多用于测试和工作,没有限制。

步骤 3. Loopback Workflow (用于测试目的)

此步骤展示了一个测试样本。它是一个非常简单直接的逻辑,其中输入有效负载被循环回到输出 JSON 有效负载。以下屏幕截图显示了这个 Loopback 工作流

请注意,`_outMessage` 响应消息已添加一个 `content-type` 标头,其值为 *application/json*。如你所见,上图显示了使用内置原语(如 `InvokeMethod` 活动)添加此标头的序列。使用自定义类扩展,我们可以简化消息标头的添加——稍后请参阅更多详细信息。

每个工作流所需的参数

`WorkflowChannel` 通过“硬编码参数”与动态工作流活动进行通信。每个工作流都必须声明以下参数,否则会抛出错误。

请注意,`_metadata` 参数是 blob(XAML)文件的名称/值对的集合,可用于局部变量。

步骤 4. Function Apps

为了在应用设置中进行配置,以下代码片段显示了主要预编译函数(如 `Mediator`)的所有可配置变量。

#region name of the host function
  string functionName = executionContext.FunctionName;         
#endregion

log.Info($"[{executionContext.InvocationId}]HTTP trigger function 
         Mediator hosted by {functionName}.");

#region storage for workflow definitions
  string storageAccount = string.Empty;
  if (ConfigurationManager.AppSettings.AllKeys.Contains
     ($"WorkflowDefinationStorage_{functionName}"))
    storageAccount = ConfigurationManager.AppSettings
                     [$"WorkflowDefinationStorage_{functionName}"];
  else if (ConfigurationManager.AppSettings.AllKeys.Contains
          ("WorkflowDefinationStorage"))
    storageAccount = ConfigurationManager.AppSettings["WorkflowDefinationStorage"];
  else
    return req.CreateErrorResponse(HttpStatusCode.BadRequest, 
           "Missing a Workflow Definition storage account");
#endregion

#region resolving name of the workflow definition           
  string xaml = string.Empty;
	
  // high priority to allow forcing a process based on the appSettings
  if (ConfigurationManager.AppSettings.AllKeys.Contains
     ($"WorkflowDefinationName_{functionName}"))
    xaml = ConfigurationManager.AppSettings[$"WorkflowDefinationName_{functionName}"];
	
  if (string.IsNullOrEmpty(xaml))
  {
    // parse query parameter
    xaml = req.GetQueryNameValuePairs()
	      .FirstOrDefault(q => string.Compare(q.Key, "xaml", true) == 0)
	      .Value;
	
    if (string.IsNullOrEmpty(xaml))
    {
      if (req.Headers.Contains("afn-mediator-name"))
        xaml = req.Headers.GetValues
               ("afn-mediator-name").First().Trim().Replace("$", "/");
      else if (ConfigurationManager.AppSettings.AllKeys.Contains
              ("WorkflowDefinationName"))
	xaml = ConfigurationManager.AppSettings["WorkflowDefinationName"];
      else
        return req.CreateErrorResponse(HttpStatusCode.BadRequest, 
               "Missing name of the microservice");
    }
  }
log.Info($"xaml={xaml}");
#endregion

上述 `Mediator` 函数中的配置逻辑基于变量的后缀名称。感谢 Azure Function 的 `ExecutionContext` 中新增的功能,该功能允许获取已执行函数的名称。这对于设置中的变量后缀来说非常完美。此后缀表示此配置设置是针对特定函数名称的,否则,如果不存在,它将使用没有后缀的变量,如默认名称。

上述逻辑还可以解析 XAML 工作流的名称,该名称可以在应用设置、URL 查询字符串或标头中配置。

最后,我们有一个 `Function` 步骤。

首先,我们需要为 `WorkflowChannel` 添加一些配置设置,我们的 XAML blob 位于此处,因此请将以下键添加到应用设置中

WorkflowDefinationStorage 		DefaultEndpointsProtocol=https;
                   AccountName=*****;AccountKey=*******;EndpointSuffix=core.windows.net
WorkflowDefinationName			template/loopback/1000 

特定函数的有效应用设置,键的名称带有函数名称的后缀,如下所示

WorkflowDefinationStorage_mediator	DefaultEndpointsProtocol=https;
AccountName=*****;AccountKey=*******;EndpointSuffix=core.windows.net 

好的,现在我们需要在 `wwwroot` 文件夹中为自定义程序集创建一个 *bin* 文件夹,该文件夹对所有函数都有效。将两个程序集,如 *Microservices.dll* 和 *WorkflowChannel.dll* 复制到 *bin* 文件夹中,如下屏幕截图所示

自定义程序集

现在,我们准备创建一个 Azure Function。转到门户并创建一个名为 `mediator` 的 `HttpTrigger` C# 函数。

function.json

{
  "scriptFile": "..\\bin\\Microservices.dll",
  "entryPoint": "RKiss.AFNLib.Microservices.Mediator",
  "bindings": [   
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ],
  "disabled": false
}

请注意,这是一个预编译的函数,因此删除 *run.csx* 文件。

完成后,你应该有一个 `Function`,如下图所示

步骤 5. 测试

Azure Workflow `HttpTrigger` Function 可以使用任何 REST 客户端进行测试,例如 Postman、Advanced REST Client 等。从门户函数,我们可以获取带授权代码的 **Get function URL** 地址。通过在 url 查询字符串中添加 `xaml=template/loopback/1000`,我们正在访问 Blob 存储中的工作流定义。访问 XAML 工作流的另一种方法是使用名为 `afn-mediator-name` 的标头。

请注意,工作流定义的名称格式如下,并且必须与存储在 `microservices` 容器中的 blob 文件名匹配,请参阅步骤 2。

{topic}/{name}/{version}

examples:
  template/loopback/1000
  demo/test/2000

以下屏幕截图显示了 Azure Workflow Function 与名为 mediator 的 REST API 调用

正如你在上面的响应中看到的,有效负载是输入循环,这正是工作流(步骤 3)所声明的。

注意:出于测试目的,您可以使用以下 URI 地址。它们有效期至 2017 年 8 月 15 日。

https://rk20170721-fnc.azurewebsites.net/api/mediator?code=06nMbUuvjZugxEQiCTer/4wfBDUUGCccuklLKQRt4sEqwapWMDuaoQ==&xaml=template/loopback/1000

https://rk20170721-fnc.azurewebsites.net/api/mediator?code=06nMbUuvjZugxEQiCTer/4wfBDUUGCccuklLKQRt4sEqwapWMDuaoQ==&xaml=template/test/1000

步骤 6. 在工作流中使用自定义程序集

在此步骤中,我将演示如何在工作流中使用自定义程序集。正如你在解决方案(步骤 1)中看到的,有一个 `WorkflowActivities` 项目。该项目包含一个自定义活动,如 `SendHttpRequest`,用于示例 *Test$1000.xaml*,以及一个类扩展,用于简化 XAML 表达式。程序集 *WorkflowActivities.dll* 存储在 `assemblies` 容器中,请参阅步骤 2。

以下代码片段显示了用于向消息添加标头的类扩展

public static class MessageExtension
{
  public static Message AddHeader(this Message message, XName name, string value)
  {
    if (message.Version == MessageVersion.None)
    {
      if (message.Properties.TryGetValue
         (HttpResponseMessageProperty.Name, out dynamic hrmp) == false)
      {
        hrmp = new HttpResponseMessageProperty();
        message.Properties.Add(HttpResponseMessageProperty.Name, hrmp);
      }
      hrmp.Headers.Add(name.LocalName, value);
    }
    else
    {
      var mh = new MessageHeader<string>(value);
      message.Headers.Add(mh.GetUntypedHeader(name.LocalName, name.NamespaceName));
    }
    return message;
  }
}

请注意,类扩展的方法签名返回相同类型的扩展(此输入/输出)。这种简单的模式允许在同一个 XAML 表达式中多次链接方法,请参阅以下图片

正如你所见,上面的 Loopback 工作流比步骤 3 中的前一个看起来好多了。

以下屏幕截图显示了自定义活动 `SendHttpRequest` 的用法,用于调用另一个 Azure Function

`SendHttpRequest` 是一个通用的 REST 调用异步活动,其使用非常直接,只需填充其属性即可

结论

本文描述了 Azure Function 的用法,用于托管在 Blob 存储库中定义的 XAML 资源中的无状态工作流。该概念基于自定义 WCF 工作流通道。使用这种连接堆栈技术,我们可以轻松地在许多不同的进程中托管无状态工作流,例如云、本地等。有关更多详细信息,请参阅我早期的文章,这些文章在下面的参考文献中有所提及。Azure Functions 是云无服务器架构的一部分,使用它们来托管工作流调用器可以为您的业务模型分解带来新的维度。希望你会觉得它很有用。

参考文献

历史

  • 2017 年 7 月 21 日:初始版本
© . All rights reserved.