65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Azure Lease Blob

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2013年5月23日

CPOL

25分钟阅读

viewsIcon

40373

downloadIcon

169

本文描述了 Azure 租用 Blob 如何在分布式事件驱动的发布/订阅架构中,在运行时帮助业务模型的组成。

 

 

目录

 

特点

  • 推送模型
  • 分布式事件流处理
  • 基于 Azure 租用 Blob
  • 松散耦合模式
  • 为运行时业务组合、故障排除、监控等做好准备。
  • 无数据库

 

 

引言

Windows Azure 存储客户端库 2.0(包含在 Windows Azure for .NET SDK 2.0 中)引入了新功能,用于扩展 Azure 存储对 2012-02-12 REST API 版本的支持,并实现了 Blob 和容器租用。这项新功能使您能够设计一个事件驱动的分布式架构,其中业务处理器可以建立和管理 Azure 存储 Blob 和容器的锁定,以进行写入和删除操作。本文描述了如何将这些存储功能应用于基于 WF 4.X 声明式编程的现代元数据驱动架构。

让我们从一个简单的处理示例开始,其中一个有状态消息在三个业务处理器(如预处理器、处理器和后处理器)的管道中进行处理,如下图所示

 

 

如上图所示,消息在无状态服务(业务处理器)的管道中进行处理,每个处理器都可以更新消息。换句话说,消息可以保存业务处理器的状态。该过程是无状态的,并且业务上下文以顺序方式进行处理。

让我们假设传入消息包含多个部分,可以并行处理。下图显示了这个示例

如上图所示,预处理器是消息的拆分器,后处理器是消息的聚合器。聚合器负责根据处理器处理每条消息期间存储在数据库中的状态创建最终消息。此解决方案需要一个数据库来存储特定的业务上下文状态。 

业务上下文的复杂性可能会增加,例如:业务处理器可以是嵌套的拆分器等。下图显示了这个示例

基本上我们可以看到,服务和消息是无状态的,但是我们有一个带有应用程序特定模式的数据库来保存业务上下文状态。业务上下文状态通过聚合器(它也是一个在数据库中共享数据的无状态服务)持久化在后处理器之后。当然,我们可以创建一个模型,其中每个处理器将负责通过其处理发布业务上下文状态。此场景需要数据库服务的特定实现,其中存储业务上下文状态。

基于业务上下文的复杂性和对吞吐量和低延迟的要求,我们可以使用(而不是聚合器)Microsoft StreamInsight 技术进行复杂事件处理(CEP),请参阅 Windows Azure 的 StreamInsight 服务。在此场景中,业务处理器是复杂事件处理的源,用于生成诸如聚合消息之类的操作。我建议查阅 Windows Azure 的 StreamInsight 服务 测试 Windows Azure 的 StreamInsight 服务以获取有关云解决方案中事件处理的详细描述。

在事件驱动的分布式架构中保存业务上下文状态的另一种方法是使用 Azure 存储租用 Blob。下图显示了我们的租用 Blob 示例

基本上,租用 Blob 是 Azure 存储中的一个 Blob,具有独占写入操作。Blob 的内容是业务上下文中源生成的事件集合。与一个中央事件引擎处理器(如 CEP 中使用的)不同,此场景基于分布式事件处理,其中每个服务将在存储在租用 Blob 中的事件流上处理业务上下文状态。

假设我们有如上图所示的三个业务处理器 1、2 和 N。它们是在预处理器拆分器中创建的。每个服务都有责任在事件流中更新当前事件,并在过程继续时初始化下一个事件。基于此,预处理器/拆分器将创建事件流(如果尚不存在),更新/创建事件 0 并初始化三个事件,例如事件 1、2 和 N。当处理器正在处理时,其在事件流中的事件被更新并创建下一个事件,依此类推。如上图所示,处理器 2 是拆分器的最后一个处理器,因此它的职责是为后处理器创建消息并将事件 N+1 写入事件流。最后,后处理器将更新其在事件流中的事件并生成最终聚合消息。

存储在租用 Blob 中的事件流表示业务上下文的运行时行为。此流可以在运行时用于故障排除、监控以及看门狗处理,其中每个增量业务流程都可以在看门狗计时器下。换句话说,如果分布式进程中发生某些事情,例如连接丢失等,事件流会知道过程恢复的详细信息。事件流的另一个功能是允许根据用户请求中止正在运行的业务流程。

感谢 Azure 存储中的租用 Blob 功能,它简化了分布式模型中共享资源的实现,并具有独占写入功能,类似于应用程序域中的锁定资源。此功能内置于 Blob 服务中,因此其使用者需要为其操作(如写入、删除等)获取此特定 Blob 的租约。

好的,让我们继续讨论使用 Azure 租用 Blob 的概念和设计。我假设您对 Windows Azure 平台和 WF 技术有工作知识。

 

概念与设计

在分布式事件驱动架构中使用 Azure 租用 Blob 的概念基于在 Azure 存储中获取租用 Blob 的写入或删除的独占所有权。基本上,事件驱动的分布式过程并行执行,其中业务过程在业务上下文中并发运行。为了控制这种业务上下文,例如消息聚合,根据上下文状态触发事件等,我们需要对这些业务过程状态有一个小的知识库。这个“知识库”是事件到租用 Blob 的运行时日志,这个内容被称为事件流。    

以下代码片段显示了存储在事件流中的通用事件对象

[XmlRoot(ElementName = "Event")]
public class EventP
{
  [XmlAttribute(AttributeName = "name")]
  public string Name { get; set; }

  [XmlAttribute(AttributeName = "status")]
  public string Status { get; set; }

  [XmlAttribute(AttributeName = "id")]
  public string Id { get; set; }

  [XmlAttribute(AttributeName = "topic")]
  public string Topic { get; set; }

  [XmlAttribute(AttributeName = "key")]
  public string Key { get; set; }

  [XmlAttribute(AttributeName = "msg")]
  public string Msg { get; set; }

  [XmlAttribute(AttributeName = "trackingId")]
  public string TrackingId { get; set; }

  [XmlAttribute(AttributeName = "created")]
  public string Created { get; set; }

  [XmlAttribute(AttributeName = "timestamp")]
  public string Timestamp { get; set; }

  [XmlAttribute(AttributeName = "timeout")]
  public string Timeout { get; set; }

  [XmlAttribute(AttributeName = "ref")]
  public string Ref { get; set; }

  [XmlAttribute(AttributeName = "tag")]
  public string Tag { get; set; }

  [XmlAttribute(AttributeName = "prev")]
  public string Prev { get; set; }

  [XmlAttribute(AttributeName = "next")]
  public string Next { get; set; }
}

如您所见,上面的事件对象(元素)具有许多属性,用于在业务处理的特定位置捕获业务状态。这些属性允许引用消息、遍历事件流、持有看门狗计时器、主题、键等。

从抽象的角度来看,事件驱动的分布式业务模型可以在处理过程中发布事件,如下图所示

 

事件按照业务处理器发布的顺序存储在 blob 中,例如,处理器 P0 在时间 T1,P1/T2 等。在并发业务处理中,当多个业务流程并发运行时,不保证事件的精确序列,换句话说,两个或更多发布者希望同时存储其事件。为了避免 blob 存储上的写入冲突,每个发布者都需要对 blob 存储的独占访问。这是 Azure 存储 Blob 服务的一项强大功能,例如租用 Blob。  

Azure 租用 Blob

根据 MSDN 文档租用 Blob

租用 Blob 操作建立并管理对 Blob 的写入和删除操作的锁定。锁定期限可以是 15 到 60 秒,也可以是无限期。

租用 Blob 操作可以通过以下五种模式之一调用
获取,请求新的租用。
续约,续约现有租用。
更改,更改现有租用的 ID。
释放,如果不再需要,则释放租用,以便另一个客户端可以立即获取对 Blob 的租用。
中断,终止租用,但确保在当前租用期到期之前,其他客户端无法获取新租用。



好的,回到我们的抽象模型。我们知道,事件驱动的分布式业务模型将逐个将事件发布(存储)到租用 Blob 中。此事件集合称为事件流,它表示业务模型处理的运行时行为。基本上,每个进入业务模型的消息(请求)都可以有一个事件流,用于在其业务上下文中捕获状态。基于此事件流,业务流程可以被控制、监控、恢复、故障排除等。  

事件驱动分布式模型中业务处理器的每个发布者都必须拥有更新 Blob 存储中事件流的独占访问权限。为了获得这种写入排他性,发布者需要在操作之前获取 Blob 以进行租用(锁定)。写入/删除排他性 Blob 访问操作由LeaseId表示,客户端(发布者)必须将其包含在 Blob 操作中。租用在获取租用时指定的期限内授予,期限可以是 15 秒到一分钟之间,也可以是无限期。当然,我们不希望阻止其他发布者将其事件发布到事件流中,因此每个发布者都有责任尽快释放活动租用。

下图显示了发布者中的租用 BLOB 作用域功能

 

如上图所示,租用 Blob 范围开始调用 Blob 请求以获取租用。此请求在轮询周期中重复,直到 Azure Blob 服务获取到LeaseId。这是在分布式互联网连接模型中获取租用的方式(例如轮询)。一旦范围对 Blob 具有独占写入访问权限,我们就可以将事件流获取到内存中,进行必要的处理并写回(Put)到 Blob。之后,必须调用释放租用操作来释放范围。

基本上,范围操作花费的时间很短(几百毫秒),包括 Get/Put 操作,因此在分布式模型中锁定事件流并不关键,但在活动范围崩溃的情况下,所有其他客户端(发布者)必须等待 Azure Blob 服务释放租用 Blob,这最多可能需要 60 秒。     

正如我前面提到的,LeaseBlobScope将对存储在租用 Blob 中的 Get/Put事件流具有独占访问权限。此过程将占用很少的处理时间。它在内存中执行,因此在释放作用域后,我们内存中有一个更新的事件流,它可以用于其他操作。

下图显示了这个场景。

如上图所示,LeaseBlobScope需要更新存储在租用 Blob 中的事件流。基本上,此操作包括获取事件流、从事件流中选择当前事件、更新此当前事件、更新事件流中的状态,最后一个可选步骤是添加新事件(如果需要)。一旦事件流(位于内存中)已更新,我们可以将其存储回 Azure 存储上的租用 Blob。租用 Blob 在退出LeaseBlobScope序列期间会自动释放,因此它可用于另一个获取请求。

LeaseBlobScope之后的下一个序列可用于更详细的事件流处理,例如,触发如上图所示的操作。请注意,事件流包含业务上下文中的所有状态,因此可以扫描事件流状态以获取最后一个业务并行处理器,以便为下一个操作触发消息。      

为了简化在 WF 技术中处理 Windows Azure 存储中的租用 Blob,自定义LeaseBlobScope活动可以提供帮助。下图显示了此自定义活动,其实现包含在本文中

以下图片显示了使用自定义类扩展处理事件流的几个表达式编辑器分配示例: 

有关自定义活动和事件流库的更多详细信息,请参见实现部分。 

 

租用 Blob 中的事件流可以由任何对租用 Blob 具有权限访问的企业组件共享。这些组件可以使用事件流来实现仪表板功能(显示进度状态等)和/或用于控制进程,例如用户取消、通知等。这些功能的使用是完全透明的,并且与业务模型松散解耦。 

下图显示了连接到事件流的另外两个组件,即用户和看门狗

 

根据用户请求,用户发布者可以发布一个事件来取消正在运行的后台业务流程。在这种情况下,事件流会更新为取消状态,业务流程将在事件流处理期间采取行动。

上图中的第二个组件是看门狗服务。此服务负责检查事件流的过期时间。如果由于某种原因,业务流程未在指定的过期时间内执行,看门狗发布者将中止此事件或/和事件流。

 

事件流

事件流是源(如客户端、业务处理器等)在分布式模型的业务上下文中发布的事件的集合。每个事件都表示业务模型的状态。业务模型可以在运行时顺序和/或并发地组成,其中多个业务处理器并行运行。为了更好地组织和查询事件流中的事件,使用带有分隔符字符“.”的格式化 ID 值来表示事件流中的新组(或线程),例如:01.00

以下图片显示了针对几个业务处理步骤的 SearchOrder 事件流示例

步骤 1. 这是初始化步骤,其中 OrderStart 处理器正在创建事件流和接下来的 3 个事件以进行进一步处理,例如 SearchStart。这是一个拆分器场景,用于并行运行搜索。

如您所见,事件流有一个根事件(名称=OrderStart,id=00,status=inprocess)。OrderStart 为 SearchStart 处理器生成 3 条消息。此步骤的事件流状态为inprocess-init-init-init

步骤 2. 在此步骤中,调用 SearchStart 进程,并在事件流中更新事件 SearchStart,并为 SearchDone 创建新事件。下图显示了此场景

如您所见,上图显示了更多的运行时业务组合,其中第二个线程 02.00 处理速度更快。此线程具有 SearchDone 并且有 3 个结果可用。请注意,线程 03.00 失败并已中止,这是事件的最终状态。

步骤 3. 在此步骤中,我们有更多的业务组合,例如完成、中止和基于对接收结果的分析进行的下一个嵌套搜索处理

请注意,事件在完成、中止或取消的情况下会最终确定。如上图所示,事件流正在等待业务线程 01.02.00、01.04.00 和 02.03.00 中的业务流程最终确定。

事件流代表了运行时业务模型组合的小型知识库。基于事件,我们可以看到每个业务处理器的性能、消息流、数据等。如果进程崩溃,业务进程可以从崩溃点重新启动并继续。  

事件流以 xml 格式的文本存储在 Azure 存储租用 Blob 中。

 

好的,让我们看一下在 xaml 工作流和自定义 EventStream 库中处理租用 Blob 的第一个自定义活动。我假设您对 Windows Azure 平台和 WF 技术有工作知识。 

实现

将以下自定义活动添加到工作流设计器工具箱中,我们可以使用工作流编排中的租用 Blob 功能。

如您所见,上述自定义活动库包含 5 个基本的租用 Blob 操作和一个用于LeaseBlobScope的组合操作。LeaseBlobScope自定义原生活动允许执行一系列具有独占写入/删除租用 Blob 操作的活动。此自定义范围活动负责在正常或故障退出时释放租用 Blob。

让我们从实现的角度看一下LeaseBlobScope自定义活动。 

 

LeaseBlobScope 自定义活动

下图显示了LeaseBlobScope自定义活动的设计和属性。

LeaseBlobScope需要一些属性来创建租用 Blob,例如 BlobAddress、用于帐户的 ConfigurationName 等。

LeaseBlobScope自定义活动重要的是,在从 Azure Blob 服务收到冲突错误 409 的情况下,它会轮询租用 Blob 的获取请求。请注意,无限轮询取决于获取相同租用 Blob 的客户端数量以及活动租用 Blob 的快速释放。在分布式互联网连接中,这是我们如何获得对 Azure 存储 Blob 上资源进行写入/删除的独占访问权限的方式。   

以下代码片段显示了LeaseBlobScope自定义活动的Execute方法的实现

protected override void Execute(NativeActivityContext context)
{
  if (this.activities != null && this.Activities.Count > 0)
  {
    NoPersistHandle handle = this.noPersistHandle.Get(context);
    handle.Enter(context);

    CloudBlobClient client = StorageAccountHelper.GetCloudStorageAccount(this.ConfigurationName.Get(context)).CreateCloudBlobClient();
    CloudBlockBlob blob = client.GetBlockBlobReference(this.BlobAddress.Get(context));

    // lock the blob
    Func<CloudBlockBlob, TimeSpan, string, string> delegateFunc = new Func<CloudBlockBlob, TimeSpan, string, string>(LockingBlob);
    var ar =  delegateFunc.BeginInvoke(blob, this.LeaseTime.Get(context), this.ProposedLeaseId.Get(context), null, null);
    ar.AsyncWaitHandle.WaitOne();
    string lid = (string)delegateFunc.EndInvoke(ar);

    this.LeaseId.Set(context, lid);
    this.Result.Set(context, blob);

    // safety work on this blob
    context.ScheduleActivity(this.Activities[0], this.onChildComplete, OnFaulted);
  }
}

如您所见,在运行作用域中的第一个活动之前,我们需要获取一个租用 Blob。此请求通过委托函数LockingBlob异步完成

private string LockingBlob(CloudBlockBlob blob, TimeSpan timeout, string proposedLeaseId)
{
 int ii = 0;
 DateTime startDT = DateTime.Now;

 while (++ii > 0)
 {
   string lid = blob.TryAcquireLease(timeout, proposedLeaseId);
   if (string.IsNullOrEmpty(lid))
   {
     var delayTime = TimeSpan.FromMilliseconds(new Random(Guid.NewGuid().GetHashCode()).Next(750, 2500));
     Thread.Sleep(delayTime);
   }
   else
   {
     Trace.WriteLine(string.Format("TryAcquireLease[{0}, {1}ms]", ii, DateTime.Now - startDT));
     return lid;
   }
 }
 return null;
}

LockingBlob 函数只有一个职责,即从获取的租用 Blob 中获取leaseId。如果值为 null,它将等待随机时间,然后再次尝试获取租用 Blob。请注意,Thread.Sleep在委托线程中休眠,这与工作流线程不同。 

以下代码片段显示了活动范围内的故障处理程序。您可以看到租用 Blob 是如何释放的

private void OnFaulted(NativeActivityFaultContext context, Exception exception, ActivityInstance faultedInstance)
{
  this.Result.Get(context).ReleaseLease(AccessCondition.GenerateLeaseCondition(this.LeaseId.Get(context)));
  NoPersistHandle handle = this.noPersistHandle.Get(context);
  handle.Exit(context);
}

 

AcquireLeaseBlob 自定义活动

AcquireLeaseBlob自定义活动可用于简单地获取租用 Blob

 

这是一个异步自定义活动,只有一个请求用于获取租用 Blob。在发生冲突错误(代码 409)的情况下,我们可以在那里休眠指定的时间,然后退出。以下是其实现的代码片段

protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
  CloudBlobClient client = StorageAccountHelper.GetCloudStorageAccount(this.ConfigurationName.Get(context)).CreateCloudBlobClient();
  CloudBlockBlob blob = client.GetBlockBlobReference(this.BlobAddress.Get(context));
  context.UserState = blob;
  return blob.BeginAcquireLease(this.LeaseTime.Get(context), this.ProposedLeaseId.Get(context), callback, state);
}

protected override string EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
  CloudBlockBlob blob = (CloudBlockBlob)context.UserState;
  try
  {
    return blob.EndAcquireLease(result);
  }
  catch (StorageException ex)
  {
    if (ex.RequestInformation.HttpStatusCode == 409)
    {
        Trace.WriteLine(string.Format("AcquireLease conflict - sleeping for {0}]", this.RetryTime.Get(context)));
        Thread.Sleep(this.RetryTime.Get(context));
        return null;
    }
    else
        throw;
  }
}

如您所见,这是 Begin/End 方法的直接异步实现。对于此自定义活动,工作流编排将负责下一次轮询租用 Blob(使用 While 循环、重试时间等)。请注意,工作流有责任使用基本自定义活动 ReleaseLeaseBlob 或库函数释放租用 Blob。

正如我之前提到的,事件流是 xml 元素(EventP对象)的集合。事件流以 xml 格式的文本存储在租用 Blob 中。为了在 xaml 工作流中使用 Assign 活动处理事件流,已经实现了以下库。

事件流库

这是一个非常轻量级的库,用于对事件流中的事件进行基本操作,例如更新事件属性、检查状态等。下图显示了事件对象(EventP)的类视图、带有有用方法的事件容器(EventStreamProcess)以及EventStreamProcess的扩展类。

EventStreamLibrary的设计理念是基于在表达式文本中使用方法和属性,以便在 Assign 活动中进行链式操作。例如:在 Event-Stream 中添加事件并从 Event-Stream 中选择特定事件可以在一个 Assign 活动中完成

 

 

请注意,要选择事件流中的特定事件,我们还可以根据 xpath 表达式文本使用SelectMustSelect方法

 

一旦我们从事件流(esp)中获得了特定业务状态的事件,我们就可以在一个 Assign 活动中对其进行更新,请参阅以下示例

以下代码片段显示了EventStreamProcessExtension类中 Update 方法的实现

public static EventStreamProcess Update(this EventStreamProcess esp, XElement element, string name, string value)
{
  if (element == null || element.Name.LocalName != "Event")
      throw new ArgumentException("element");

  element.SetAttributeValue(name, value);
  element.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());

  XElement evnt = element;
  if (name == "status" && (value == "completed" || value == "aborted" || value == "canceled"))
  {
    while (esp.IsRoot(evnt) == false)
    {
      Trace.WriteLine("EventStream.UpdateStatus: {0}", evnt.ToString());

      evnt = esp.EventStream.XPathSelectElement(string.Format("./Event[@id='{0}' and @status='inprocess']", evnt.Attribute("prev").Value));
      if (evnt == null)
        break;

      List<string> next = evnt.Attribute("next").Value.Split('|').ToList().Select(s => s.Trim()).ToList();
      if (next.Count == 1 || esp.IsDone(next))
      {
        if (element.Attribute("status") != null && element.Attribute("status").Value != "completed")
          evnt.SetAttributeValue("msg", "WithError");
          
        evnt.SetAttributeValue("status", "completed");
        evnt.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());
      }
      else
      {
        break;
      }
    }
     
    // update root status
    if (esp.IsRoot(evnt))
    {
      List<string> next = evnt.Attribute("next").Value.Split('|').ToList().Select(s => s.Trim()).ToList();
      if (next.Count == 1 || esp.IsDone(next))
      {
        if (element.Attribute("status") != null && element.Attribute("status").Value != "completed")
            evnt.SetAttributeValue("msg", "WithError");
        evnt.SetAttributeValue("status", "completed");
        evnt.SetAttributeValue("timestamp", DateTime.UtcNow.ToString());
      }
    }   
  }
  return esp;
}     

如您所见,Update 方法中有一个特殊情况,即更新 status 属性。在这种情况下,此方法将更新 EventThread 中从特定 Event 到根 Event 的所有 Event 的状态。

事件状态可以具有以下值之一,其中完成、中止和取消是事件流中事件对象的最终状态。

 

为了测试 Event-Stream 的最终状态,我们可以使用IsDone方法,如下例所示

以下代码片段显示了EventStreamProcess类中IsDone方法的实现

public bool IsDone()
{
    var root = this.EventStream.Elements("Event").ElementAt(0);
    if (root.Attribute("prev") != null)
        throw new ArgumentException("Internal error: The root event must not have attribute 'prev'");

    return root.Attribute("status").Value == "completed" || root.Attribute("status").Value == "aborted" || root.Attribute("status").Value == "canceled";
}

 

CloudBlockBlobExtensions

这是对CloudBlockBlob类的扩展,用于自定义处理租用 Blob。下图显示了所有扩展方法

上述扩展类的实现非常简单,例如:以下代码片段展示了如何将 blob 检索为 XElement 返回值

public static XElement ToXml(this CloudBlockBlob blob, string leaseId)
{
    using (var ms = new MemoryStream())
    {
        blob.DownloadToStream(ms, AccessCondition.GenerateLeaseCondition(leaseId));
        ms.Position = 0;
        return XElement.Load(ms);
    }           
}

在 xaml 工作流声明中使用此扩展是通过 Assign 活动实现的,请参阅以下加载和保存EventStreamProcess对象的示例

 

 

请注意,EventStream库旨在简化对事件流中事件集合的处理。基本上,您可以使用任何声明式编程技术来获取更新放置事件流到/从租用 Blob,例如 LINQ。请记住,一旦您处于LeaseBlobScope中,业务处理必须尽快完成,因为您独占写入租用 Blob,换句话说,LeaseBlobScope会锁定其他客户端,使其无法请求相同的租用 Blob 以进行获取租用操作。

好的,让我们来看一些使用这些自定义活动和库的租用 Blob 测试用例。

 

用法与测试

首先,以下是先决条件:

AzureLeaseBlob解决方案包括 2 个项目和一个 Test 解决方案文件夹,其中存储了用于我们测试的程序集和示例。下图显示了从本文下载后的解决方案

 

 

如您所见,该解决方案包括一个WorkflowDesignerTester应用程序,这是一个重新托管在 Windows Form 上的工作流设计器,有关此小工具的更多详细信息,请参阅我的文章WF4 消息中介自定义活动。我们进行测试还需要另一个工具是Azure 存储浏览器。基本上,您可以使用任何您喜欢的 Azure 存储 Blob 浏览器。

在第一次测试之前,我们需要在您的帐户中创建一个空 blob。下图显示了公共 BlockBlob 资源,例如temp/eventstream

 

让我们从一个非常简单的基本示例开始。

 

测试 A. - 基本示例

这是LeaseBlobScope自定义活动在无限循环中的基本测试。请按照以下说明步骤进行操作

步骤 1. 启动 WorkflowDesignerTester45 程序

我们将使用 WD 4.5 版本演示 AzureLeaseBlob 自定义活动。以下屏幕截图是打开解决方案 Samples 文件夹中的 LeaseBlobScopeBasic_Test45.xaml 文件后的 WD。

 

如您所见,左侧是工作流文档大纲视图面板,工具箱侧是自定义库AzureLeaseBlob.ActivityLibrary。我们的测试序列在设计器工作区中。此序列显示在以下屏幕截图中

这就是测试循环的全部内容。DoWhile循环正在循环一个带有随机延迟(时间介于 750-2000 毫秒之间)的LeaseBlobScope自定义活动。延迟表达式是

TimeSpan.FromMilliseconds(new Random(Guid.NewGuid().GetHashCode()).Next(750, 2000))

作用域内有一个活动,例如WriteLine,用于在控制台屏幕上显示 LeaseID。

 

步骤 2. Azure 存储帐户

在运行此测试之前,我们需要在applicationStorageAccount变量中填充一个 Azure 存储帐户。如果使用另一个容器 Blob 进行测试,请同时更改EventStreamRef变量的值。下图是我们在测试中使用的变量的屏幕截图

 

步骤 3. 运行

按下Run按钮,创建控制台自托管进程并加载 xaml 文档以执行。我们的测试需要启动两个进程,以查看LeaseBlobScope如何以并发方式处理可共享的租用 Blob。 

请按两次“运行”按钮。

 

以下两个控制台程序已启动并显示 LeaseID。

在执行LeaseBlobScope自定义活动时,范围对写入租用 Blob(temp/eventstream)具有独占访问权限。此租用 Blob 状态通过显示其 LeaseID 来表示。请注意,其他LeaseBlobScope正在以轮询方式获取租用 Blob。

为了查看租用 Blob 的获取延迟,可以使用以下实用程序DebugView。日志消息显示了总获取轮询时间和轮询计数器。

如您所见,上图显示了LeaseBlobScope需要重复获取租用 Blob 请求的 3 种情况。

这就是我们同时运行两个并发进程的情况。现在我们可以运行更多进程,例如 5 个,并查看跟踪器以获取获取池值。

您可以玩转工作流进程,在作用域内添加更多活动,例如向 blob 写入一些内容等。

好的,让我们进行更高级的测试,其中LeaseBlobScope将与租用 Blob 主体一起工作。

 

测试 B. 租用 Blob 高级测试

此测试演示了LeaseBlobScopeEventStreamLibrary如何在作用域内并发处理EventStream。循环中的测试用例是找到EventStreamstatus=init的 Event 对象并将其更新为status=complete。当 EventStream 中的所有 Event 都更新为status=complete时,循环结束。要重复测试,必须在容器中删除 Blob。

首先,我们需要打开一个新测试文件。

步骤 1. 打开 LeaseBlobScope_v45.xaml 文件

单击文件/打开以从示例解决方案文件夹中选择工作流文档LeaseBlobScope_v45.xaml文件。下图显示了此循环的测试序列

基本上,测试序列分为两部分:DoWhile 循环和 Init。Init 部分负责创建包含特定数量事件(默认值为 10)且status=initEventStream。DoWhile 循环也很简单直接。

第一个活动是从租用 Blob 获取事件流

esp = new EventStreamProcess(blob.ToXml(leaseId))

然后选择 status=init 的 Event 对象

curEvent = esp.Select("./Event[@status='init']")

下一步是更新此 curEvent

esp = esp.Update(curEvent, "key", workerID).Update(curEvent, "status", "completed")

此序列的最后一步是将更新后的EventsStream存储回租用 Blob

esp.EventStream = blob.FromXml(esp.EventStream, leaseId)

以下屏幕截图显示了此序列

请注意,DoWhile 条件是一个非常简单的表达式,使用 EventStreamLibrary,例如esp.IsDone=false

在开始测试之前,我们必须设置 Azure 存储帐户。

步骤 2. Azure 存储帐户

以下突出显示的变量必须更改或修改

变量applicationStorageAccount必须设置为您的 Azure 存储帐户。其他变量是可选的,例如EventStreamRefmaxEvents。请注意,容器temp是在我们的第一次测试中手动创建的。基本上,您可以为此测试创建另一个容器。这取决于您的需求。EventStream中的事件数量设置为 10,但最大为 99。它是一个字符串值。

 

步骤 3. 创建事件流

此步骤用于创建一个包含事件流的租用 Blob,因此请按运行按钮启动进程。以下屏幕截图显示了初始化序列的结果

如您所见,我们有一个包含 10 个 Event 对象的EventStream,已准备好进行测试。如果我们在键盘上按 Enter 键,DoWhile循环将启动,并且只有此进程将更新我们不希望的EventStream。我们的测试必须通过多个运行进程来证明,因此请按照下一步操作。

步骤 4. 运行测试循环

在此步骤中,我们需要执行两个操作:在 WorkflowDesigner 上按下“运行”按钮以创建第二个进程,然后返回到第一个进程并在键盘上按下 Enter 键。这些操作必须快速完成。

下图显示了两个正在运行的进程

如您所见,每个进程都以并发方式更新了事件流中的一些 Event 对象,而没有发生任何共享租用 Blob 的冲突。 

一旦所有 Event 的EventStream都更新完毕,我们的测试就完成了。下图显示了这种情况

 

您可以看到 Event 对象中的key属性,它当时是租用 Blob 的独占所有者。

在租用 Blob 被删除后,例如使用第三方工具如Azure 存储浏览器,可以再次重复上述测试。

可以针对更多事件和/或正在运行的进程重复测试,以查看LeaseBlobScope如何与租用 Blob 一起工作。

我们的测试到此为止,当然您可以根据您的应用程序特定的EventStream需求继续。 

 

结论

本文描述了使用 Azure 存储租用 Blob 组合事件驱动的分布式业务流程。该概念基于运行时业务组合期间的事件流,其中每个业务处理器(worker)将其状态(事件)发布到称为事件流的可共享资源,用于特定的业务上下文。在分布式互联网连接模型中,事件流的所有发布者和消费者都是松散耦合的。对于此场景,租用 Blob 是存储具有独占写入操作的事件流的最佳解决方案。本文包括用于 Azure 存储租用 Blob 的自定义活动和用于处理租用 Blob 的 EventStream 库。所描述的使用事件流进行运行时业务流程组合的概念可以帮助您的应用程序在分布式云驱动架构中实现可伸缩性和并行性。希望您喜欢它。

 

参考文献:

[1] Windows Azure

[2] Windows Azure SDK 2.0 for .Net

[3] Storage Client Library for .NET (version 2.0) 中的新功能

[4] Windows Azure Storage Client Library 2.0 for .NET 和 Windows Runtime 简介

[5] 租用 Blob

[6] 新的 Blob 租约功能:无限租约、更短的租约时间等等

[7] 用于 Windows Azure 的 StreamInsight 服务

[8] Azure 中的 StreamInsight

[9] 企业集成模式和最佳实践

[10] Windows Azure 中的新增功能

[11] Azure 存储资源管理器

© . All rights reserved.