65.9K
CodeProject 正在变化。 阅读更多。
Home

通过单元测试学习 Windows Workflow Foundation 4.5:WorkflowApplication 持久化

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2016年4月8日

CPOL

4分钟阅读

viewsIcon

11485

具有 WorkflowApplication 的持久性。

背景

我之前开始学习 Windows Workflow Foundation。我更倾向于通过系统的学习来掌握主要的科技框架,而不是到处搜索。然而,我发现大多数写得好的书籍和文章都发表在 2006-2009 年之间,已经过时了,尤其缺少 .NET 4 和 4.5 的新特性;而近年来出版的关于 WF 4.0 和 4.5 的书籍则写得很糟糕。虽然我通常更喜欢系统化、枯燥和抽象的学习,但这次我会为学习准备一些实际的材料。

引言

支持长时间运行的流程是 WF 闪光点之一。从技术上讲,为了克服各种困难支持长时间运行的流程,持久化工作流至关重要。对于持久化,WF 3.5、4.0 和 4.5 有着相当大的区别。搜索可能会返回混合信息,既有旧的也有新的,非常令人困惑。在这篇文章中,我想展示一些单元测试用例,以演示持久化的行为。

这是该系列的第 5 篇文章。源代码可在 https://github.com/zijianhuang/WorkflowDemo 获取。

本系列的其他文章如下:

 

本文关于持久化的内容使用了 WorkflowApplication 和 WorkflowServiceHost,因为两者都可以完全访问 WF 运行时,特别是持久化功能。

Using the Code

源代码可在https://github.com/zijianhuang/WorkflowDemo找到。

先决条件

  1. Visual Studio 2015 Update 1 或 Visual Studio 2013 Update 3
  2. xUnit(包含)
  3. EssentialDiagnostics(包含)
  4. 工作流持久化 SQL 数据库,默认本地数据库为 WF。

本文中的示例来自一个测试类:WorkflowApplicationPersistenceTests。

带有可持久化工作流的 WorkflowApplication

书签

    public sealed class ReadLine : NativeActivity<string>
    {
        public ReadLine()
        {
        }

        public InArgument<string> BookmarkName { get; set; }

        protected override bool CanInduceIdle
        {
            get
            {
                return true;
            }
        }

        protected override void Execute(NativeActivityContext context)
        {
            string name = this.BookmarkName.Get(context);

            if (name == null)
            {
                throw new ArgumentException(string.Format("ReadLine {0}: BookmarkName cannot be null", this.DisplayName), "BookmarkName");
            }

            context.CreateBookmark(name, new BookmarkCallback(OnReadComplete));
        }

        void OnReadComplete(NativeActivityContext context, Bookmark bookmark, object state)
        {
            string input = state as string;

            if (input == null)
            {
                throw new ArgumentException(string.Format("ReadLine {0}: ReadLine must be resumed with a non-null string"), "state");
            }

            context.SetValue(base.Result, input);
        }
    }


        [Fact]
        public void TestPersistenceWithBookmark()
        {
            var x = 100;
            var y = 200;
            var t1 = new Variable<int>("t1");

            var plus = new Plus()
            {
                X = x,
                Y = y,
                Z = t1,  //So Output Z will be assigned to t1
            };
            var bookmarkName = NewBookmarkName();
            var a = new System.Activities.Statements.Sequence()
            {
                Variables =
                        {
                            t1
                        },
                Activities = {
                            new Multiply()
                            {
                                X=3, Y=7,
                            },

                            new ReadLine()
                            {
                                BookmarkName=bookmarkName,
                            },

                            plus,

                        },
            };

            bool completed1 = false;
            bool unloaded1 = false;
            bool isIdle = false;

            AutoResetEvent syncEvent = new AutoResetEvent(false);

            var app = new WorkflowApplication(a);
            app.InstanceStore = WFDefinitionStore.Instance.Store;
            app.PersistableIdle = (eventArgs) =>
            {
                return PersistableIdleAction.Unload;//so persist and unload
            };

            app.OnUnhandledException = (e) =>
            {

                return UnhandledExceptionAction.Abort;
            };

            app.Completed = delegate (WorkflowApplicationCompletedEventArgs e)
            {
                unloaded1 = true;

            };

            app.Aborted = (eventArgs) =>
            {

            };

            app.Unloaded = (eventArgs) =>
            {
                unloaded1 = true;
                syncEvent.Set();
            };

            app.Idle = e =>
            {
                Assert.Equal(1, e.Bookmarks.Count);
                isIdle = true;
            };

            //  app.Persist();//This is optional, since Workflow runtime will persist when the execution reach to ReadLine.
            var id = app.Id;
            app.Run();
            syncEvent.WaitOne();

            Assert.False(completed1);
            Assert.True(unloaded1);
            Assert.True(isIdle);
            //At this point, DB WF/InstancesTable has a new record, and the value of column BlockingBookmark contains the bookmarkName

            //Now to use a new WorkflowApplication to load the persisted instance.
            LoadWithBookmarkAndComplete(a, id, bookmarkName, "abc");
            //The record is now deleted by WF runtime.
        }

        static IDictionary<string, object> LoadWithBookmarkAndComplete(Activity workflowDefinition, Guid instanceId, string bookmarkName, string bookmarkValue)
        {
            bool completed2 = false;
            bool unloaded2 = false;
            AutoResetEvent syncEvent = new AutoResetEvent(false);
            IDictionary<string, object> outputs = null;

            var app2 = new WorkflowApplication(workflowDefinition)
            {
                Completed = e =>
                {
                    if (e.CompletionState == ActivityInstanceState.Closed)
                    {
                        outputs = e.Outputs;
                    }
                    completed2 = true;
                },

                Unloaded = e =>
                {
                    unloaded2 = true;
                    syncEvent.Set();
                },

                InstanceStore = WFDefinitionStore.Instance.Store,
            };

            app2.Load(instanceId);
            var br = app2.ResumeBookmark(bookmarkName, bookmarkValue);
            Assert.Equal(BookmarkResumptionResult.Success, br);

            syncEvent.WaitOne();

            Assert.True(completed2);
            Assert.True(unloaded2);

            return outputs;
        }

 

显式调用 app.Persist() 是可选的,因为 WF 运行时会在到达 ReadLine 活动时持久化工作流。在运行 LoadAndComplete 之前,工作流会卸载且未完成。然后,此示例中会创建另一个 WorkflowApplication,提取存储的实例,并使用书签和预期数据继续执行。

备注

在这种情况下,你会看到工作流的第二次运行获取了第一次运行之前定义的工作流定义。在现实世界的场景中,让定义在整个应用程序生命周期中都存在有时不太好。最好将定义持久化到某个地方,这样第二次运行只需要 instanceId 和 bookmarkName 就能从上次断点处恢复。

持久化工作流定义

当您使用 Visual Studio 的工作流设计器时,您可能会注意到自定义工作流是一个 XAML 文件,它在设计时被转换为 C# 代码。C# 代码在您构建项目时编译。在 WF 中,有一个函数可以将内存中的工作流定义转换为 XAML。因此,XAML 是持久化和重新加载工作流的自然方式。

一些用于序列化和反序列化工作流的辅助函数

   /// <summary>
    /// Persist activity for purpose of resuming later.
    /// </summary>
    /// <remarks>The functions here are for bookmark and long running only, not for general purpose.</remarks>
    /// <example>inspired by https://msdn.microsoft.com/en-us/library/ff458319%28v=vs.110%29.aspx</example>
    public static class ActivityPersistenceHelper
    {
        /// <summary>
        ///
        /// </summary>
        /// <param name="activity"></param>
        /// <param name="stream">External XML UTF8 stream. Caller is responsible to set position if the stream supports random seek, and to dispose.</param>
        public static void SaveActivity(Activity activity, Stream stream)
        {
            using (var streamWriter = new StreamWriter(stream, Encoding.UTF8, 512, true))
            using (var xw = ActivityXamlServices.CreateBuilderWriter(new System.Xaml.XamlXmlWriter(streamWriter, new System.Xaml.XamlSchemaContext())))
            {
                System.Xaml.XamlServices.Save(xw, activity);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="stream">XAML stream defining an Activity</param>
        /// <returns></returns>
        public static Activity LoadActivity(Stream stream)
        {
            var settings = new ActivityXamlServicesSettings()
            {
                CompileExpressions = true,
            };

            var activity = ActivityXamlServices.Load(stream, settings);
            return activity;
        }

        public static Activity LoadActivity(byte[] bytes)
        {
            var settings = new ActivityXamlServicesSettings()
            {
                CompileExpressions = true,
            };

            using (var stream = new MemoryStream(bytes))
            {
                var activity = ActivityXamlServices.Load(stream, settings);
                return activity;
            }
        }

用于将实例 ID 与工作流定义映射的字典

    public class WFDefinitionStore
    {
        private static readonly Lazy<WFDefinitionStore> lazy = new Lazy<WFDefinitionStore>(() => new WFDefinitionStore());

        public static WFDefinitionStore Instance { get { return lazy.Value; } }

        public WFDefinitionStore()
        {
            InstanceDefinitions = new System.Collections.Concurrent.ConcurrentDictionary<Guid, byte[]>();

            Store = new SqlWorkflowInstanceStore("Server =localhost; Initial Catalog = WF; Integrated Security = SSPI")
            {
                InstanceCompletionAction = InstanceCompletionAction.DeleteAll,
                InstanceEncodingOption = InstanceEncodingOption.GZip,

            };
        }

        public System.Collections.Concurrent.ConcurrentDictionary<Guid, byte[]> InstanceDefinitions { get; private set; }

提示

在实际应用中,您可能有其他结构来根据您的技术需求持久化工作流定义。此外,在服务应用程序中,可能存在多个相同工作流定义的实例,因此存储多个相同定义的副本效率低下,因此您可能正在设计一个在程序逻辑或持久化中存储一次定义并支持版本控制的存储。此字典对于演示和小型应用程序来说已经足够了。

带有延迟的工作流

此测试用例演示了

  • WorkflowApplication 的基本行为
  • 持久化的性能
  • 使用上面的演示辅助类持久化工作流定义
  • 如何从工作流中获取结果/OutArgument

 

        [Fact]
        public void TestPersistenceWithDelayAndResult()
        {
            var a = new Fonlow.Activities.Calculation();
            a.XX = 3;
            a.YY = 7;

            bool completed1 = false;
            bool unloaded1 = false;

            AutoResetEvent syncEvent = new AutoResetEvent(false);

            var app = new WorkflowApplication(a);
            app.InstanceStore = WFDefinitionStore.Instance.Store;
            app.PersistableIdle = (eventArgs) =>
            {
                return PersistableIdleAction.Unload;//so persist and unload
            };

            app.OnUnhandledException = (e) =>
            {
                Assert.True(false);
                return UnhandledExceptionAction.Abort;
            };

            app.Completed = delegate (WorkflowApplicationCompletedEventArgs e)
            {
                completed1 = true;
                Assert.True(false);
            };

            app.Aborted = (eventArgs) =>
            {
                Assert.True(false);
            };

            app.Unloaded = (eventArgs) =>
            {
                unloaded1 = true;
                syncEvent.Set();
            };

            var id = app.Id;
            stopwatch.Restart();
            stopwatch2.Restart();
            app.Run();
            syncEvent.WaitOne();

            stopwatch.Stop();
            Assert.True(stopwatch.ElapsedMilliseconds < 2500, String.Format("The first one is executed for {0} milliseconds", stopwatch.ElapsedMilliseconds));
            //the ellipsed time depends on the performance of the WF runtime when handling persistence. The first case of persistence is slow.

            Assert.False(completed1);
            Assert.True(unloaded1);

            stopwatch.Restart();
            var t = WFDefinitionStore.Instance.TryAdd(id, a);
            stopwatch.Stop();
            Trace.TraceInformation("It took {0} seconds to persist definition", stopwatch.Elapsed.TotalSeconds);

            //Now to use a new WorkflowApplication to load the persisted instance.
            LoadAndCompleteLongRunning(id);
        }

        void LoadAndCompleteLongRunning(Guid instanceId)
        {
            bool completed2 = false;
            bool unloaded2 = false;
            AutoResetEvent syncEvent = new AutoResetEvent(false);

            var app2 = new WorkflowApplication(WFDefinitionStore.Instance[instanceId])
            {
                Completed = e =>
                {
                    completed2 = true;
                    var finalResult = (long)e.Outputs["Result"];
                    Assert.Equal(21, finalResult);
                },

                Unloaded = e =>
                {
                    unloaded2 = true;
                    syncEvent.Set();
                },

                InstanceStore = WFDefinitionStore.Instance.Store,
            };

            stopwatch.Restart();
            app2.Load(instanceId);
            Trace.TraceInformation("It took {0} seconds to load workflow", stopwatch.Elapsed.TotalSeconds);

            app2.Run();
            syncEvent.WaitOne();
            stopwatch2.Stop();
            var seconds = stopwatch2.Elapsed.TotalSeconds;
            Assert.True(seconds > 3, String.Format("Activity execute for {0} seconds", seconds));//But if the long running process is fired and forgot, the late load and run may be completed immediately.

            Assert.True(completed2);
            Assert.True(unloaded2);

        }

 

备注

  • 第一次持久化运行速度很慢,可能是由于 WF 运行时或 SQL 连接的一些预热导致的。如果您知道原因,请留言。
  • 在持久化并卸载长时间运行的可持久化工作流之后,需要一个管家程序通过定期检查持久化层来唤醒它。这将是下一篇文章中要讨论的另一个重要主题。

 

无效存储引发异常

如果 InstanceStore 在 WorkflowApplication.Persist() 期间出现问题,则会抛出 System.Runtime.DurableInstancing.InstancePersistenceCommandException。
    
        [Fact]
        public void TestPersistWithWrongStoreThrows()
        {
            var a = new Multiply()
            {
                X = 3,
                Y = 7,
            };

            AutoResetEvent syncEvent = new AutoResetEvent(false);

            var app = new WorkflowApplication(a);
            app.InstanceStore = new SqlWorkflowInstanceStore("Server =localhost; Initial Catalog = WFXXX; Integrated Security = SSPI");
            app.PersistableIdle = (eventArgs) =>
            {
                Assert.True(false, "quick action no need to persist");//lazy
                return PersistableIdleAction.Persist;
            };

            //None of the handlers should be running
            app.OnUnhandledException = (e) =>
            {
                Assert.True(false);
                return UnhandledExceptionAction.Abort;
            };

            var ex = Assert.Throws<System.Runtime.DurableInstancing.InstancePersistenceCommandException>
               (() => app.Persist(TimeSpan.FromSeconds(2)));

            Assert.NotNull(ex.InnerException);
            Assert.Equal(typeof(TimeoutException), ex.InnerException.GetType());
        }


实例存储出现问题的原因可能有很多,因此您的应用程序需要很好地处理此异常。

 

© . All rights reserved.