通过单元测试学习 Windows Workflow Foundation 4.5:工作流定义持久化、版本控制和 WorkflowIdentity
持久化、版本控制和 WorkflowIdentity
概述
本文档侧重于长时运行/可持久化工作流的定义持久化和版本控制。
背景
以下是与本文相关的其他文章
- 通过单元测试学习 Windows Workflow Foundation 4.5:WorkflowApplication 持久化
- 通过单元测试学习 Windows Workflow Foundation 4.5:工作流服务
对于真正长时运行的工作流,使用 WorflowServiceHost 和工作流持久化是唯一的选择。然而,工作流 4 和 4.5 对持久化提供了“较弱”的支持,并且由于 AppFabric 的弃用,工作流管理服务不再是一个可行的解决方案。我们应用程序开发人员必须编写自己的自定义解决方案来管理工作流的生命周期。
对于一些在 4.0 之前的早期版本工作流基金会中有经验的开发人员来说,这种管理工作流生命周期的“弱”支持是一个坏消息,会给额外的维护工作和相应的思维模式转变带来巨大的负担,除非开发人员仅在 Biztalk 和 Sharepoint 中进行复杂的流托管工作。
备注
我认为 WF 4 中不再内置持久化工作流定义的解决方案是一个明智的举措,因为我认为许多工作流应用程序不需要在持久化层中持久化工作流定义,而相应的函数知道在恢复工作流实例时使用哪个工作流定义,因此无需序列化和反序列化工作流定义。对于需要持久化工作流定义的场景,编写自定义解决方案(例如字典)并不困难。我认为最棘手的部分是在不使用 Sharepoint 或 Biztalk 的情况下,扫描持久化层中休眠的工作流并及时加载它们。
参考文献
Using the Code
源代码可在https://github.com/zijianhuang/WorkflowDemo找到。
必备组件
- Visual Studio 2015 Update 1 或 Visual Studio 2013 Update 4
- xUnit(包含)
- EssentialDiagnostics(包含)
- FonlowTesting(包含)
- 工作流持久化 SQL 数据库,默认本地数据库为 WF。
本文中的示例来自一个测试类:WorkflowServiceHostTests, WorkflowServiceHostPersistenceTests, WFServiceTests, WCFWithWorkflowTests, NonServiceWorkflowTests。
WFDefinitionIdentityStore
在文章 通过单元测试学习 Windows Workflow Foundation 4.5:使用 WorkflowApplication 进行持久化 中,我介绍了一个字典类,它可以通过实例 ID 来查找工作流定义。在一个服务应用程序中,可能存在同一工作流定义的多个实例,因此存储同一定义的多个副本效率低下,所以您可能需要设计一个存储(在程序逻辑或持久化中),只存储一次定义并支持版本控制。虽然设计一个不存储同一定义重复副本的工作流定义存储有很多方法,但是,如果您想对长时运行的工作流进行版本控制,使用 WorkflowIdentity 是一个不错的选择,并且定义身份默认会存储在 SQL 表 DefinitionIdentityTable 中。
public class WFDefinitionIdentityStore
{
private static readonly Lazy<WFDefinitionIdentityStore> lazy = new Lazy<WFDefinitionIdentityStore>(() => new WFDefinitionIdentityStore());
public static WFDefinitionIdentityStore Instance { get { return lazy.Value; } }
public WFDefinitionIdentityStore()
{
InstanceDefinitions = new System.Collections.Concurrent.ConcurrentDictionary<WorkflowIdentity, byte[]>();
Store = new SqlWorkflowInstanceStore("Server =localhost; Initial Catalog = WF; Integrated Security = SSPI")
{
InstanceCompletionAction = InstanceCompletionAction.DeleteAll,
InstanceEncodingOption = InstanceEncodingOption.GZip,
};
var handle = Store.CreateInstanceHandle();
var view = Store.Execute(handle, new CreateWorkflowOwnerCommand(), TimeSpan.FromSeconds(50));
handle.Free();
Store.DefaultInstanceOwner = view.InstanceOwner;
}
public System.Collections.Concurrent.ConcurrentDictionary<WorkflowIdentity, byte[]> InstanceDefinitions { get; private set; }
public System.Runtime.DurableInstancing.InstanceStore Store { get; private set; }
public bool TryAdd(WorkflowIdentity definitionIdentity, Activity a)
{
using (MemoryStream stream = new MemoryStream())
{
ActivityPersistenceHelper.SaveActivity(a, stream);
stream.Position = 0;
return InstanceDefinitions.TryAdd(definitionIdentity, stream.ToArray());
}
}
public bool TryAdd<T>(WorkflowIdentity definitionIdentity, ActivityBuilder<T> ab)
{
using (MemoryStream stream = new MemoryStream())
{
ActivityPersistenceHelper.SaveActivity(ab, stream);
stream.Position = 0;
return InstanceDefinitions.TryAdd(definitionIdentity, stream.ToArray());
}
}
public Activity this[WorkflowIdentity definitionIdentity]
{
get
{
return ActivityPersistenceHelper.LoadActivity(InstanceDefinitions[definitionIdentity]);
}
}
}
备注
您在网上找到的许多教程可能都提供了一个映射 WorkflowIdneity 和 Activity 的字典,但是 CLR 的二进制 Activity 很难真正持久化到持久化层而不是内存中,只有 XAML 表示的 Activity 才适合持久化。修改上述代码并将定义存储在 SQL 或 NoSql 中应该不难。
WFDefinitionIdentityStore 示例
IDictionary<string, object> LoadAndCompleteLongRunning(Guid instanceId, WorkflowIdentity definitionIdentity)
{
bool completed2 = false;
bool unloaded2 = false;
AutoResetEvent syncEvent = new AutoResetEvent(false);
var instance = WorkflowApplication.GetInstance(instanceId, WFDefinitionIdentityStore.Instance.Store);
var definition = WFDefinitionIdentityStore.Instance[definitionIdentity];
IDictionary<string, object> dic = null;
var app2 = new WorkflowApplication(definition, instance.DefinitionIdentity)
{
Completed = e =>
{
completed2 = true;
if (e.CompletionState== ActivityInstanceState.Closed)
{
dic = e.Outputs;
}
},
Unloaded = e =>
{
unloaded2 = true;
syncEvent.Set();
},
InstanceStore = WFDefinitionIdentityStore.Instance.Store,
};
stopwatch.Restart();
app2.Load(instance);
Trace.TraceInformation("It took {0} seconds to load workflow", stopwatch.Elapsed.TotalSeconds);
app2.Run();
syncEvent.WaitOne();
stopwatch2.Stop();
var seconds = stopwatch2.Elapsed.TotalSeconds;
Assert.True(completed2);
Assert.True(unloaded2);
return dic;
}
[Fact]
public void TestWaitForSignalOrDelayVersion1()
{
var a = new WaitForSignalOrDelay()
{
Duration=TimeSpan.FromSeconds(10),
BookmarkName="Wakeup",
};
var definitionIdentity = new WorkflowIdentity("WaitForSignalOrDelay", new Version(1, 0), null);
AutoResetEvent syncEvent = new AutoResetEvent(false);
bool completed1 = false;
bool unloaded1 = false;
var app = new WorkflowApplication(a, definitionIdentity)
{
InstanceStore = WFDefinitionStore.Instance.Store,
PersistableIdle = (eventArgs) =>
{
return PersistableIdleAction.Unload;
},
OnUnhandledException = (e) =>
{
return UnhandledExceptionAction.Abort;
},
Completed = delegate (WorkflowApplicationCompletedEventArgs e)
{
completed1 = true;
syncEvent.Set();
},
Unloaded = (eventArgs) =>
{
unloaded1 = true;
syncEvent.Set();
},
};
var id = app.Id;
app.Run();
syncEvent.WaitOne();
Assert.False(completed1);
Assert.True(unloaded1);
WFDefinitionIdentityStore.Instance.TryAdd(definitionIdentity, a);
Thread.Sleep(5000); // from 1 seconds to 9 seconds, the total time of the test case is the same.
var outputs = LoadAndCompleteLongRunning(id, definitionIdentity);
Assert.False((bool)outputs["Result"]);
}
[Fact]
public void TestWaitForSignalOrDelayVersion2()
{
var a = new WaitForSignalOrAlarm()
{
AlarmTime = DateTime.Now.AddSeconds(10),
BookmarkName = "Wakeup",
};
var definitionIdentity = new WorkflowIdentity("WaitForSignalOrDelay", new Version(2, 0), null);
AutoResetEvent syncEvent = new AutoResetEvent(false);
bool completed1 = false;
bool unloaded1 = false;
var app = new WorkflowApplication(a, definitionIdentity)
{
InstanceStore = WFDefinitionStore.Instance.Store,
PersistableIdle = (eventArgs) =>
{
return PersistableIdleAction.Unload;
},
OnUnhandledException = (e) =>
{
return UnhandledExceptionAction.Abort;
},
Completed = delegate (WorkflowApplicationCompletedEventArgs e)
{
completed1 = true;
syncEvent.Set();
},
Unloaded = (eventArgs) =>
{
unloaded1 = true;
syncEvent.Set();
},
};
var id = app.Id;
app.Run();
syncEvent.WaitOne();
Assert.False(completed1);
Assert.True(unloaded1);
WFDefinitionIdentityStore.Instance.TryAdd(definitionIdentity, a);
Thread.Sleep(5000); // from 1 seconds to 9 seconds, the total time of the test case is the same.
var outputs = LoadAndCompleteLongRunning(id, definitionIdentity);
Assert.False((bool)outputs["Result"]);
}
备注:
工作流版本控制是一个逻辑概念。如上例所示,“WaitForSignalOrDelay”的两个版本由两个不同的类实现,版本 2 的实现不一定是版本 1 的派生类。版本信息在工作流定义的实例化过程中注入。
由宿主应用程序(更准确地说,您作为应用程序开发人员)负责在恢复实例时提供正确的工作流定义,并确保映射的唯一性。
WorkflowDefinitionIdentityFactory
WorkflowIdentity 为工作流定义持久化和版本控制提供了出色的关注点分离。有时在某些应用程序中,您可能不想手动定义每个 WorkflowIdentity 对象,而可能希望使用工作流类名作为定义身份,特别是在您拥有相当清晰的 .NET 组件设计和发布控制实践的情况下。
此类具有与 WorkflowDefinitionIdentityStore 几乎相同的接口,但是,它使用 CLR 类型和程序集作为持久化介质,而不是 XAML。可以通过静态函数 GetWorkflowIdentity() 生成工作流的 DefinitionIdentity。
public class WFDefinitionIdentityFactory
{
private static readonly Lazy<WFDefinitionIdentityFactory> lazy = new Lazy<WFDefinitionIdentityFactory>(() => new WFDefinitionIdentityFactory());
public static WFDefinitionIdentityFactory Instance { get { return lazy.Value; } }
public WFDefinitionIdentityFactory()
{
InstanceDefinitions = new System.Collections.Concurrent.ConcurrentDictionary<WorkflowIdentity, Activity>();
Store = new SqlWorkflowInstanceStore("Server =localhost; Initial Catalog = WF; Integrated Security = SSPI")
{
InstanceCompletionAction = InstanceCompletionAction.DeleteAll,
InstanceEncodingOption = InstanceEncodingOption.GZip,
};
var handle = Store.CreateInstanceHandle();
var view = Store.Execute(handle, new CreateWorkflowOwnerCommand(), TimeSpan.FromSeconds(50));
handle.Free();
Store.DefaultInstanceOwner = view.InstanceOwner;
}
public System.Collections.Concurrent.ConcurrentDictionary<WorkflowIdentity, Activity> InstanceDefinitions { get; private set; }
public System.Runtime.DurableInstancing.InstanceStore Store { get; private set; }
public bool TryAdd(WorkflowIdentity definitionIdentity, Activity a)
{
return InstanceDefinitions.TryAdd(definitionIdentity, a);
}
public Activity this[WorkflowIdentity definitionIdentity]
{
get
{
Activity activity = null;
var found = InstanceDefinitions.TryGetValue(definitionIdentity, out activity);
if (found)
return activity;
var assemblyFullName = definitionIdentity.Package;
var activityTypeName = definitionIdentity.Name;
System.Diagnostics.Trace.Assert(assemblyFullName.Contains(definitionIdentity.Version.ToString()));
var objectHandle= Activator.CreateInstance(assemblyFullName, activityTypeName);//tons of exceptions needed to be handled in production
activity = objectHandle.Unwrap() as Activity;
if (activity==null)
{
throw new InvalidOperationException("You must have been crazy.");
}
InstanceDefinitions.TryAdd(definitionIdentity, activity);
return activity;
}
}
public static WorkflowIdentity GetWorkflowIdentity(Activity activity)
{
var type = activity.GetType();
var name = type.FullName;
var assembly = type.Assembly;
var package = assembly.FullName;
var version = assembly.GetName().Version;
return new WorkflowIdentity(name, version, package);
}
}
假定 WorkflowIdentity 中,Name 必须是完整的类名,Version 必须是程序集版本,Package 必须是程序集的 FullName。因此,您必须严格遵循 .NET 组件设计。对于工作流版本控制,您可以考虑以下设计:
- 在下一个版本的工作流中使用数字后缀,例如,您有一个工作流类 BuyWorkflow,那么下一个版本可以是 BuyWorkflow2。
- 修改 WFDefinitionIdentityFactory 并使用 Activity.DisplayName 而不是 Type.FullName 来表示 WorkflowIdentity.Name,并且 WorkflowIdentity.Package 将存储 Type.AssemblyQualifiedName。
示例
[Fact]
public void TestPersistenceWithDelayAndResult()
{
var a = new Fonlow.Activities.Calculation();
a.XX = 3;
a.YY = 7;
bool completed1 = false;
bool unloaded1 = false;
AutoResetEvent syncEvent = new AutoResetEvent(false);
var definitionIdentity = WFDefinitionIdentityFactory.GetWorkflowIdentity(a);
var app = new WorkflowApplication(a, definitionIdentity);
app.InstanceStore = WFDefinitionStore.Instance.Store;
app.PersistableIdle = (eventArgs) =>
{
return PersistableIdleAction.Unload;//so persist and unload
};
app.OnUnhandledException = (e) =>
{
return UnhandledExceptionAction.Abort;
};
app.Completed = delegate (WorkflowApplicationCompletedEventArgs e)
{
completed1 = true;
};
app.Aborted = (eventArgs) =>
{
};
app.Unloaded = (eventArgs) =>
{
unloaded1 = true;
syncEvent.Set();
};
var id = app.Id;
stopwatch.Restart();
stopwatch2.Restart();
app.Run();
syncEvent.WaitOne();
stopwatch.Stop();
Assert.True(stopwatch.ElapsedMilliseconds < 2500, String.Format("The first one is executed for {0} milliseconds", stopwatch.ElapsedMilliseconds));
//the ellipsed time depends on the performance of the WF runtime when handling persistence. The first case of persistence is slow.
Assert.False(completed1);
Assert.True(unloaded1);
stopwatch.Restart();
var t = WFDefinitionIdentityFactory.Instance.TryAdd(definitionIdentity, a);
stopwatch.Stop();
Trace.TraceInformation("It took {0} seconds to persist definition", stopwatch.Elapsed.TotalSeconds);
//Now to use a new WorkflowApplication to load the persisted instance.
var dic = LoadAndCompleteLongRunning(id, definitionIdentity);
var finalResult = (long)dic["Result"];
Assert.Equal(21, finalResult);
}
IDictionary<string, object> LoadAndCompleteLongRunning(Guid instanceId, WorkflowIdentity definitionIdentity)
{
bool completed2 = false;
bool unloaded2 = false;
AutoResetEvent syncEvent = new AutoResetEvent(false);
var instance = WorkflowApplication.GetInstance(instanceId, WFDefinitionIdentityStore.Instance.Store);
var definition = WFDefinitionIdentityFactory.Instance[definitionIdentity];
IDictionary<string, object> dic = null;
var app2 = new WorkflowApplication(definition, instance.DefinitionIdentity)
{
Completed = e =>
{
completed2 = true;
if (e.CompletionState== ActivityInstanceState.Closed)
{
dic = e.Outputs;
}
},
Unloaded = e =>
{
unloaded2 = true;
syncEvent.Set();
},
InstanceStore = WFDefinitionIdentityStore.Instance.Store,
};
stopwatch.Restart();
app2.Load(instance);
Trace.TraceInformation("It took {0} seconds to load workflow", stopwatch.Elapsed.TotalSeconds);
app2.Run();
syncEvent.WaitOne();
stopwatch2.Stop();
var seconds = stopwatch2.Elapsed.TotalSeconds;
Assert.True(completed2);
Assert.True(unloaded2);
return dic;
}
[Fact]
public void TestWaitForSignalOrDelay()
{
var a = new WaitForSignalOrDelay()
{
Duration=TimeSpan.FromSeconds(10),
BookmarkName="Wakeup",
};
var definitionIdentity = WFDefinitionIdentityFactory.GetWorkflowIdentity(a);
AutoResetEvent syncEvent = new AutoResetEvent(false);
bool completed1 = false;
bool unloaded1 = false;
var app = new WorkflowApplication(a, definitionIdentity)
{
InstanceStore = WFDefinitionStore.Instance.Store,
PersistableIdle = (eventArgs) =>
{
return PersistableIdleAction.Unload;
},
OnUnhandledException = (e) =>
{
return UnhandledExceptionAction.Abort;
},
Completed = delegate (WorkflowApplicationCompletedEventArgs e)
{
completed1 = true;
syncEvent.Set();
},
Unloaded = (eventArgs) =>
{
unloaded1 = true;
syncEvent.Set();
},
};
var id = app.Id;
app.Run();
syncEvent.WaitOne();
Assert.False(completed1);
Assert.True(unloaded1);
Thread.Sleep(5000); // from 1 seconds to 9 seconds, the total time of the test case is the same.
var outputs = LoadAndCompleteLongRunning(id, definitionIdentity); //at this point, the workflow definition is not yet added into the dictionary of WFDefinitionIdentityFactory.
Assert.False((bool)outputs["Result"]);
}
在第二个测试用例中,工作流的初始运行不会将工作流定义持久化到 WFDefinitionIdentityFactory 的字典中。然而,在重新加载时,WorkflowIdentity 就足以通过反射创建定义的一个实例。当工作流实例持久化时,WorkflowIdentity 对象通过工作流运行时保存在 WF SQL 数据库的 DefinitionIdentityTable 中。
Web 托管工作流服务中的并行版本控制
有很多关于并行版本控制的优秀文章
如果您希望工作流服务的多个版本并行存在,并且每个版本都接收新请求并创建新实例,您可以:
- 在项目文件夹下创建一个新文件夹,因此文件夹名将成为新版本工作流服务新终结点地址的路径段,
- 将 xamlx 文件复制到那里
- 使用相同的契约、相同的绑定,但不同的终结点地址来更新功能。
但是,您一定不能将新文件夹命名为“app_code”,因为 WF 运行时出于特殊目的使用此文件夹,如下 MSDN 所述。
MSDN
在 .NET Framework 4.5 中引入的 WorkflowServiceHost 并行版本控制提供了在单个终结点上托管工作流服务的多个版本的能力。提供的并行功能允许配置工作流服务,以便使用新的工作流定义创建工作流服务的新实例,同时运行的实例使用现有定义完成。
因此,无法为旧工作流定义创建新的工作流实例。
如果您使用自托管工作流服务,您将需要执行与上面“WFDeiniitionIdentityStore 示例”中相同的维护工作,每个版本都在不同的活动类中实现,并使用 WorkflowIdentity 来声明它们是相同的服务但处于不同版本。
如果您使用 Web 托管工作流服务,WF 运行时将为您处理大部分的维护工作。当服务实例化时,主服务的 SupportedVersions 属性将在运行时填充。在收到具有现有会话 ID 的客户端请求以获取已持久化在 SQL 数据库中的现有工作流实例时,WF 运行时将加载 ~/app_code/MyVersionedService/MyVersionedServiceVx.xamlx 中存储的旧版本工作流定义以进一步执行同一工作流实例。
备注
由于旧版本会继续存在实例而新版本会创建新实例的性质,因此无法通过自动测试来演示这些行为,因为在单个服务构建内这些行为不是可重复的。但是,您可以参考 WorkflowServiceHost 中的并行版本控制 中描述的两个版本工作流服务的开发周期来观察这些行为。
关注点
工作流定义的持久化
类 `WFDefinitionStore` 主要弥补了 WF 3.5 中存在但在 WF 4.0 和 4.5 中缺失的功能,即,将工作流定义与工作流的每个实例一起持久化。如果您有遗留的 WF 3.5 应用程序并将迁移到 WF 4.5,此类可能是您以最小的努力进行迁移的一个很好的起点。
类 `WFDefinitionIdentityStore` 允许为工作流的所有实例存储一个工作流定义。
`WFDefinitionStore` 和 `WFDefinitionIdentityStore` 都将工作流序列化为 XAML。当应用程序加载并运行持久化的工作流时,应用程序甚至不需要知道并加载工作流类型及其程序集,因为 XAML 就足够提供工作流逻辑,并且反序列化不会返回原始工作流类型而是 Activity 类型。
类 `WFDefinitionIdentityFactory` 具有与 `WFDefintionIdentityStore` 几乎相同的接口,但持久化工作流定义的方式截然不同。`WFDefinitionIdentityFactory` 使用 .NET 程序集作为最终的持久化介质,而不是 XAML,因此无需 SQL 数据库或 NoSql 进行持久化。
WorkflowIdentity 和版本控制
WF 4 中不再内置持久化工作流定义的解决方案是一个明智的举措/设计,因为许多工作流应用程序不需要在持久化层中持久化工作流定义,而相应的函数知道在恢复工作流实例时使用哪个工作流定义,因此无需序列化和反序列化工作流定义。对于需要持久化工作流定义的场景,`WFDefinitionStore`、`WFDefinitionIdentityStore` 和 `WFDefinitionIdentityFactory` 可能会给您一些启发。WorkflowIdentity 支持的版本控制的灵活性可能会在各种场景下提高工作流定义的加载性能。
工作流服务与 WCF
如果您阅读过 WCF for the Real World, Not Hello World 或类似的文章,您会发现您可以将 WCF 服务库和 WCF 服务应用程序分成独立的项目。您可以在 WCF 服务库中进行大部分开发,并将 WCF 服务应用程序作为简单的外观容器,其中包含多个 WCF 服务库程序集,以便在部署期间进行进一步配置。因此,WCF 服务库可以自由地托管在 IIS 或自托管应用程序中。因此,您可以在开发和部署之间实现非常清晰的关注点分离,也就是说,在开发过程中,您作为开发人员无需考虑部署。
对于 WCF 工作流服务,您可能需要在 WCF 工作流服务应用程序项目中进行大部分开发,并在 XAMLX 文件中设计大多数工作流逻辑。WF 中似乎没有类似 WCF 服务库的构造器或接口,后者可以同时用于 IIS 托管的部署和自托管的部署。因此,在 SDLC 中,您可能需要在早期阶段考虑工作流或工作流服务的部署。如果您有其他想法,请留言。