65.9K
CodeProject 正在变化。 阅读更多。
Home

Castle Dynamic Proxy Interceptors 构建可重启动态流

starIconstarIconstarIconstarIconstarIcon

5.00/5 (7投票s)

2020年10月31日

CPOL

10分钟阅读

viewsIcon

10529

使用 Castle Dynamic Proxy 框架的代理和拦截器的另一种方法

您可以在以下链接找到关于动态代理的第一篇文章

引言

大家好,欢迎阅读另一个关于代理和拦截器的故事。

引用

我必须感谢Larry Ross向 Pro Coders 团队介绍了面向切面编程。

如果您对我之前关于模型更改跟踪和规则执行的博客文章不感兴趣,我将尝试在第二次让您印象深刻。让我们假设您拥有一种神奇的技术,可以中断一个 .NET 方法,保存其状态,然后从保存点稍后或在另一台机器上重新启动。您可能认为可以使用 Windows 休眠功能、将 Windows 映像保存到一个大文件、复制它,然后在另一个虚拟机上恢复它来做到这一点。但不!我谈论的是一个大约一百字节的小状态,可以保存到数据库中,由另一台机器读取,然后从保存点重新启动。

您感兴趣吗?

那么,让我们写一个用户故事。

用户故事 #4:在另一个 AppDomain 流程中创建可中断和可重启的流程

  • 流程步骤的逻辑和顺序应在 C# 方法中
  • 每个步骤都应具有中断流程执行的能力
  • 中断流程和流程代码的状态应足以在另一个应用程序域中重启流程

实现 - 通用流程

首先,我创建了一个新的 .NET Core 3.1 项目DemoCatleProxy.RestartableFlow并添加了Castle.Core NuGet 包。

在架构上,我以这样的方式思考这个问题:我们可能有许多不同的流程和一个引擎,该引擎将用于执行这些流程中的任何一个,并在需要时从保存点重新启动它们。如果从业务文档流转的角度来考虑——每种类型的文档都将有一个独特的流程,并且它将包含不同的步骤,如提交、审查、批准、二次批准等,并且这些步骤的顺序也不同。

现在我想为我们将要实现的流程定义一个模板。首先,我们需要一个接口来定义流程,我们将在流程引擎中使用它来引用流程

    public interface IFlow
    {
        object UntypedModel { get; }
        void Execute();
        internal void SetModel(object model);
    }

Execute方法是流程逻辑的容器,其余的声明用于设置我们的模型。

您可以将模型视为文档本身(在流转中)以及所有相关的支持文档——签名、签名日期、当前状态、下一个交付地址等。

我们执行的每个新流程都应该有自己的模型。因此,我的下一个接口是一个带有模型参数的通用模板

    public interface IFlow<M> : IFlow
        where M : class
    {
        M Model { get; }
    }

因此,每次为新文档定义新流程时,您都会提供一个模型参数,例如

public class MyFlow : IFlow<MyModel>

现在为了简化未来的流程定义,我们创建一个实现基本功能的基类

    public abstract class Flow<M> : IFlow<M>
        where M : class, new()
    {
        protected M _model;
        public M Model => _model;
        public object UntypedModel => _model;

        public Flow()
        {
            _model = new M();
        }

        void IFlow.SetModel(object model)
        {
            _model = model as M;
        }

        public abstract void Execute();
    }

基类允许使用类型化的Model,并在构造函数中创建Model的实例。

最后,我们可以尝试定义我们的第一个流程(和模型),看看它是如何工作的,我像往常一样想在单元测试中完成它。我创建了一个新的 xUnit 项目DemoCatleProxy.RestartableFlow.Tests并向主项目添加了项目引用。现在让我们添加流程代码

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class Model1 
    {
        public string ReceivedMessage { get; set; }
        public string Signature { get; set; }

        public bool IsSubmitted { get; set; }
        public bool IsLoaded { get; set; }
    }

    public interface IDemoDataService
    {
        string LoadReceivedMessage();
        bool IsMessageApproved(string message);
        string GetSignature(string message);
        bool Submit(string message, string signature);
    }

    public class DemoFlow1 : Flow<Model1>
    {
        private readonly IDemoDataService _dataService;

        public DemoFlow1()
        {
        }

        public DemoFlow1(IDemoDataService dataService)
        {
            _dataService = dataService;
        }

        public override void Execute()
        {
            LoadData();

            CheckIfApproved();

            AddDigitalSignature();

            SubmitData();
        }

        public virtual void LoadData()
        {
            if (Model.IsLoaded)
            {
                throw new FlowFatalTerminateException();
            }

            Model.ReceivedMessage = _dataService.LoadReceivedMessage();
            Model.IsLoaded = true;
        }

        public virtual void CheckIfApproved()
        {
            if (!_dataService.IsMessageApproved(Model.ReceivedMessage))
            {
                throw new FlowStopException();
            }
        }

        public virtual void AddDigitalSignature()
        {
            Model.Signature = _dataService.GetSignature(Model.ReceivedMessage);
        }

        public virtual void SubmitData()
        {
            if (!_dataService.Submit(Model.ReceivedMessage, Model.Signature))
            {
                throw new FlowStopException();
            }
        }
    }
}

我定义了Model1DemoFlow1,为了更好地真实性,我还添加了IDemoDataService契约(接口),流程将使用它与外部世界通信。

如果你看一下Execute方法,你会发现它是一个包含四个方法的序列,每个方法([LoadDataCheckIfApprovedAddDigitalSignatureSubmitData)都必须是虚拟的,我将解释原因。

我们将使用代理拦截器来拦截每个所用方法的执行。当我们拦截对代理对象的调用时,我们可以决定是否允许该调用或跳过它。如果我们要重新启动流程,我们应该能够跳过所有调用,直到我们到达上一个停止点。因此,我们从Execute调用的方法必须是虚拟的。

如果你看一下LoadData,它检查Model是否以前没有加载过,以确保我们从一个干净且新鲜的Model开始流程。这里还有一个额外的检查:如果Model表示它已经加载,该方法会抛出FlowFatalTerminateException(我们稍后会定义异常)——这个流程将被标记为损坏,但如果Model尚未加载,我们从数据服务读取消息并设置ModelIsLoaded标志。

序列中的下一步是CheckIfApproved方法。它询问数据服务消息是否被批准,如果没有,它会抛出另一个异常FlowStopException或继续执行。如果流程被停止(通过FlowStopException),流程引擎将返回状态IsStopped,并且此流程可以稍后重新启动。

重要提示:当我谈论可重启流程时,我指的是我们创建并运行一次的实例可以成功完成,或者它可以返回状态 [IsStopped],然后我们可以将这个已停止的实例保存在数据库中,并尝试稍后重新启动它。

我们在流程中使用异常,因为这是在特定点停止执行的最有效方法,并且异常将会在调用堆栈中冒泡,直到有人捕获它。

让我们将异常代码添加到主项目

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowStopException : Exception
    {
    }

    public class FlowFatalTerminateException : Exception
    {
    }
}

我们将使用FlowStopException来通知流程引擎流程正常停止,可以稍后重新启动,我们将使用FlowFatalTerminateException来通知引擎流程已损坏。

实现 - 流程引擎

第一次流程运行的逻辑如下(以DemoFlow1为例)

  1. 我们创建一个DemoFlow1类的新实例(Model由我们继承的Flow基类自动创建),并将其提供给流程引擎。
  2. 流程引擎创建DemoFlow1实例的流程代理对象和流程数据——这是发生在DemoFlow1实例内的所有更改的完整历史记录。
  3. 然后,流程引擎执行流程代理Execute方法,该方法依次调用LoadDataCheckIfApprovedAddDigitalSignatureSubmitData
  4. 每次调用上述方法时都会被拦截,我们都会将方法调用时以及调用方法后获得的Model副本保存到流程数据中。
  5. 如果任何被调用的方法抛出异常,Execute方法将被中断,流程引擎返回流程数据。
  6. 如果Execute方法在没有任何中断的情况下完成,流程引擎将返回状态为IsFinished = true流程数据

重新启动流程的逻辑是

  1. 我们创建一个DemoFlow1的新实例,并根据上次运行的流程数据,将流程实例和流程数据提供给流程引擎。
  2. 流程引擎创建DemoFlow1实例的流程代理对象并使用提供的流程数据
  3. 然后,流程引擎执行流程代理Execute方法,该方法依次调用LoadDataCheckIfApprovedAddDigitalSignatureSubmitData
  4. 每次调用所述方法时都会被拦截,并且我们会检查被调用的方法是否已记录在流程数据历史中。如果已记录,则跳过此调用,并将当前的DemoFlow1实例模型替换为流程数据历史中的模型,使其与首次运行时调用此方法后的模型相同。
  5. 如果在流程数据历史中未找到该调用,我们将继续该调用并将其以及生成的Model存储到历史中,以与首次运行时相同的方式继续流程执行。

对于流程引擎,我们需要一个接口

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public interface IFlowEngine
    {
        FlowData RunFlow(IFlow flow);
        FlowData RestartFlow(IFlow flow, FlowData flowData);
    }
}

和流程数据类

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowData
    {
        public bool IsFinished { get; set; }
        public List<string> CallHistory { get; set; } = new List<string>();
        public List<object> ModelHistory { get; set; } = new List<object>();
        public bool IsStopped { get; set; }
        public Exception LastException { get; set; }
    }
}

现在我们可以展示流程引擎

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Text;
using System.Xml.Schema;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowEngine : IFlowEngine, IInterceptor
    {
        private readonly IProxyGenerator _proxyGenerator;
        private FlowData _flowData;
        private IFlow _flow;
        private int _counter;

        public FlowEngine(IProxyGenerator proxyGenerator)
        {
            _proxyGenerator = proxyGenerator;
        }

        public FlowData RunFlow(IFlow flow)
        {
            _flowData = new FlowData();
            return ProcessFlow(flow);
        }

        public FlowData RestartFlow(IFlow flow, FlowData flowData)
        {
            _flowData = flowData;
            return ProcessFlow(flow);
        }

        private FlowData ProcessFlow(IFlow flow)
        {
            var options = new ProxyGenerationOptions(new FreezableProxyGenerationHook(flow));
            var flowProxy = _proxyGenerator.CreateClassProxyWithTarget(flow.GetType(), 
                            flow, options, new IInterceptor[] { this }) as IFlow;
            _flow = flow;

            try
            {
                // clear previous statuses
                _counter = 0;
                _flowData.IsStopped = false;
                _flowData.LastException = null;

                // run flow
                flowProxy.Execute();
                _flowData.IsFinished = true;
            }
            catch (FlowStopException e)
            {
                _flowData.IsStopped = true;
            }
            catch (Exception e)
            {
                _flowData.LastException = e;
            }

            return _flowData;
        }

        public void Intercept(IInvocation invocation)
        {
            var method = invocation.Method.Name;
            _counter++;
            var historyRecord = $"{_counter}:{method}";

            var index = _flowData.CallHistory.IndexOf(historyRecord);

            if (index == -1)
            {
                // new call, proceed and update histories if no exceptions thrown
                invocation.Proceed();
                _flowData.CallHistory.Add(historyRecord);

                // Clone Model to store new independednt instance
                _flowData.ModelHistory.Add(_flow.UntypedModel.CloneObject());
            }
            else
            {
                // replay in vacuum: don't proceed call and substitute model for next call
                _flow.SetModel(_flowData.ModelHistory[index]);
            }
        }
    }
}

正如你所看到的,FlowEngine类在private字段中保持内部状态,所以它不能被声明为单例,每次需要时我们都需要创建引擎的新实例,并且它不能在多个线程之间共享。

如果你仔细阅读引擎的实现,你会发现它完成了我们上面描述的所有工作。

流程引擎使用CloneObject扩展来创建Model对象的新实例并复制其所有属性。请添加Newtonsoft.Json NuGet 包和此代码

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public static class ObjectExtension
    {
        public static T CloneObject<T>(this T source)
        {
            var jsonSerializerSettings = new JsonSerializerSettings
            {
                TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple,
                TypeNameHandling = TypeNameHandling.Objects
            };

            var json = JsonConvert.SerializeObject(source, jsonSerializerSettings);
            var result = JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
            return result;
        }
    }
}

也许这不是最有效的方法,但我不想让这篇文章过于复杂。

现在要添加的最后一点是钩子,它需要提供给代理生成方法选项。它设置了行为,当从其他方法([Execute调用LoadData])调用的所有方法都将在代理级别(而不是原始对象级别)执行时,这样我们将拦截所有调用。我从 Castle Dynamic Proxy 教程中获取了这一点。钩子代码是

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FreezableProxyGenerationHook : IProxyGenerationHook
    {
        private IFlow _flow;

        public FreezableProxyGenerationHook(IFlow flow)
        {
            _flow = flow;
        }

        public override int GetHashCode()
        {
            return _flow.GetHashCode();
        }

        public override bool Equals(object obj)
        {
            return _flow == (obj as FreezableProxyGenerationHook)._flow;
        }

        public bool ShouldInterceptMethod(Type type, MethodInfo memberInfo)
        {
            return memberInfo.Name != "Execute" && memberInfo.Name != "SetModel";
        }

        public void NonVirtualMemberNotification(Type type, MemberInfo memberInfo)
        {
        }

        public void MethodsInspected()
        {
        }

        public void NonProxyableMemberNotification(Type type, MemberInfo memberInfo)
        {
        }
    }
}

现在我们可以编译代码并开始测试它。

实现 - 单元测试

让我们实现一个单元测试来验证一个简单的场景

  1. 我们创建并首次运行流程,演示服务IsMessageApproved方法返回False,因此流程应在CheckIfApproved方法处中断。
  2. 在第一次运行后保留流程数据,我们尝试重新启动流程,但此时IsMessageApproved返回True,因此我们期望流程成功完成。

我已将代码添加到DemoFlowTests.cs文件

using Castle.DynamicProxy;
using System;
using Xunit;
using Moq;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class DemoFlowTests
    {
        [Fact]
        public void RunStopRestartFlowTest()
        {
            var flowEngine = new FlowEngine(new ProxyGenerator());

            var demoService = new Mock<IDemoDataService>();
            var flow = new DemoFlow1(demoService.Object);
            int approveTimes = 0;

            demoService.Setup(s => s.LoadReceivedMessage()).Returns("Important message 1");
            demoService.Setup(s => s.GetSignature(It.IsAny<string>())).Returns("0xAABBEFA7");
            demoService.Setup(s => s.Submit(It.IsAny<string>(), 
                                   It.IsAny<string>())).Returns(true);
           
            // the first time it returns false, the second time it returns true
            demoService.Setup(s => s.IsMessageApproved(It.IsAny<string>()))
                .Returns(() => 
                {
                    approveTimes++;
                    return approveTimes == 2; 
                });

            var flowData = flowEngine.RunFlow(flow);
            Assert.True(flowData.IsStopped);
            Assert.False(flowData.IsFinished);
            Assert.Single(flowData.ModelHistory);
            Assert.True((flowData.ModelHistory[0] as Model1)?.IsLoaded);

            // assume we saved flowData to a database and rerun the flow one day after
            var clonedFlowData = flowData.CloneObject();
            var newFlow = new DemoFlow1(demoService.Object);
            clonedFlowData = flowEngine.RestartFlow(newFlow, clonedFlowData);
            Assert.False(clonedFlowData.IsStopped);
            Assert.True(clonedFlowData.IsFinished);
        }
    }
}

当您使用依赖注入时,可以在单元测试中节省大量时间和精力。我使用Moq框架(请安装 Moq NuGet 包)通过接口IDemoDataService生成了一个演示数据服务。实际的演示数据服务尚未实现——我们不需要它进行测试。

我设置了所有四个方法,其中三个返回常量值,但IsMessageApproved第一次返回False,下一次返回True

我的测试第一次创建并运行了一个流程,并检查了结果流程数据,它应该有IsStopped = TrueIsFinished = False。流程数据包含中断流程的状态。这个状态是重新启动流程所需的一切。

我们克隆了流程数据(如你所记,我将其序列化为 JSON 并反序列化回来),以演示流程数据是可传输的,不依赖于AppDomain

最后,我调用了RestartFlow。现在测试期望IsStopped = FalseIsFinished = True

如果您在RestartFlow调用那一行设置断点并使用Step Into直到进入Intercept方法,您可以看到流程引擎的跳过逻辑是如何工作的

 

 

我建议使用调试器来查看流程是如何首次执行以及如何重新启动以完成的。

摘要

本文演示了使用 Castle Dynamic Proxy 框架中的代理和拦截器的另一种方法。我们实现了一个可中断和可重启的流程,它在概念上类似于 Microsoft Workflow Foundation 流程,但它非常轻量级、可调试,并且流程逻辑在 C# 方法中。此外,我们使用 Moq 进行单元测试,我们再次证明,为了验证和调试您的代码,您不需要花费时间创建和运行应用程序。

通过单元测试节省时间,感谢您的阅读。

附注

我从朋友那里得到了关于这篇博客的一些反馈,实际上,他们对为什么包含四个方法序列的简单直接流程以这种复杂的方式实现感兴趣? 

事情是这样的,这篇博客中的流程示例为了更好地理解而简化。在实际场景中,您的流程将包含if条件、goto和可能循环,并且即使对于复杂的流程,该方法也将继续有效。这是来自 Pro Coders 团队项目之一的流程示例

public override async Task Execute()
{
    await BeginAsync();
    await PopulateData();
    await FlowTask(typeof(InsuranceClaimValidationRulesTask));
            
dataEntry:
    if (TaskExecutionValidationIssues.Any())
    {
        await UserInput(typeof(InsuranceClaimEntryForm));
    }

    if (Model.Claim.ClaimAmount.Amount > 1000)
    {
        await UserInputReview(typeof(InsuranceClaimEntryForm));

        if (Model.ClaimRejected)
        {
            await SaveRejectedClaim();
            await GenerateRejectLetter();
            goto postDocuments;
        }

        if (TaskExecutionValidationIssues.Any())
        {
            goto dataEntry;
        }
    }

    if (Model.Claim.PaymentOptionCode == "EFT")
    {
        await MakeTransfer();
    }
    else if (Model.Claim.PaymentOptionCode == "CHQ")
    {
        await PrintBankCheque();
    }
    else if (Model.Claim.PaymentOptionCode == "FUT_CONTR")
    {
        await BuyOptions();
    }
    else
    {
        Fail($"Invalid Payment Option {Model.Claim.PaymentOptionCode}", _logStreamer);
    }

    await SaveClaim();
    await GenerateSuccessLetter();

postDocuments:
    await PostProducedDocuments();
    await EndAsync();
}

正如你所看到的,这个流程引擎也支持异步操作async-await和动态表单中的用户输入,但原理是相同的——流程可以在某个时候停止甚至失败,并在一天或一个月后在另一台机器上重新启动。

历史

  • 2020年10月31日:初始版本
© . All rights reserved.