65.9K
CodeProject 正在变化。 阅读更多。
Home

Clifton 方法 - 第 IV 部分

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2016 年 8 月 25 日

CPOL

18分钟阅读

viewsIcon

12411

downloadIcon

215

语义发布/订阅者

文章系列

引言

在前面关于模块管理器和服务管理器的文章中,我通过使用接口描述了依赖倒置,并通过实现服务的模块动态配置应用程序。这产生了以下高级应用程序架构:

这种架构的问题在于中间的“接口”气泡。在我看来,由于通用接口的依赖关系,应用程序与服务之间的耦合仍然太紧密。

服务通常分为三类:

  1. 请求信息,该请求要么立即返回,要么如果需要一些时间,应用程序可能会围绕该请求实现一个 Task 包装器。
  2. 启动一个“计算”(我将使用这个词而不是“进程”),其中应用程序不关心它何时返回,或者服务在计算完成时“触发一个事件”。
  3. 监视某个设备、端口或其他异步活动,当满足某个条件时,“触发一个事件”以启动对该数据进行“计算”,无论是发送给应用程序还是其他服务。

虽然有许多满足第一类的简单服务,但真正我想在这里讨论的是最后两类。对于更复杂的服务,我发现最灵活的架构是用发布/订阅组件替换“接口”气泡。

在这里,通过服务管理器通过依赖倒置(接口)访问服务当然仍然是可能的,但在许多情况下,发布/订阅机制成为主要的机制。与其拥有通用的接口规范,不如使用共享的语义“字典”。

使用发布/订阅者可以使应用程序和服务不必知道另一个服务的确切接口规范。此外,不同版本的服务可以实现不同的语义消息,这增加了应用程序对变化的弹性。

我在此提出的发布/订阅者实现具有以下特点:

  • 是语义化的——通过使用类型化的消息“信封”来实现到订阅者的路由。
  • 实现消息的自动记录,由你选择的记录器服务处理。
  • 订阅者异常由发布-订阅机制处理,并可以路由到异常处理服务。
  • 实现订阅者调用,可以是同步调用或异步调用。
  • 利用线程池进行异步处理,将调用排队到具有最少待处理订阅者调用的工作线程上。
  • 完全类型安全。
  • 未处理的消息(无订阅者)将被丢弃。
  • 订阅者(通常)为每个消息实例化,从而提高线程安全性。
  • 将消息发布者和订阅者隔离在“膜”中。
  • 它本身被实现为一个服务,因此所有其他服务都可以访问它。
  • 多个订阅者可以接收相同的消息。
  • 为实现 IDisposable 的无状态订阅者自动调用 IDispose

用例

我已非常成功地将此架构用于:

  • 实现 Web 服务器(事实上,它现在是我事实上的 Web 服务器实现的骨干。)
  • 处理 RabbitMQ 消息。
  • 处理来自硬件的异步事件,如信用卡读卡器、密码键盘、ID 扫描仪、 iButton 读卡器等。
  • 实现 ATM 交易处理。
  • 实现 CefSharp 和 .NET 的浏览器控件作为可交换服务,以创建 WinForm/WPF 托管的 Web 应用程序。

模块管理器、服务管理器和发布/订阅者组合允许我做的事情简要列表:

  • 模拟通信接口、硬件、数据库 I/O 等。
  • 模拟来自协议和硬件的输入,这极大地促进了工作流程测试。
  • 快速配置应用程序以适应各种硬件和计算配置。

语义发布/订阅者(虽然是早期版本)也是 Higher Order Programming 项目的核心,我之前写过

关于膜计算

发布/订阅者中的一个关键概念是“膜”。你可以将膜看作是一个容器、通道、囊泡等。但它会将消息保留在该膜的“空间”中,除非设置了渗透性规则(我不会在本篇文章中讨论)。“膜”一词来源于膜计算的概念。

膜计算处理分布式和并行计算模型,以局部方式处理符号对象的集合。因此,演化规则允许将演化对象封装到由膜定义的隔间中。隔间之间以及与环境的通信在过程中起着至关重要的作用。

膜概念背后的直觉是生物学中的三维囊泡。然而,概念本身更通用,膜被视为两个区域的分隔符。膜提供了两个区域之间的选择性通信。根据 Gheorghe Păun 的说法,分隔是将欧几里得空间划分为有限的“内部”和无限的“外部”。选择性通信是计算的来源。

来自生物学的各种建议以及定义基于膜的多集处理设备的架构和功能的可能性范围几乎是无限的。事实上,膜计算文献包含大量的模型。因此,MC 不仅仅是与特定模型相关的理论,它还是设计隔离模型的框架。

如果这个词很奇怪,那就接受吧。

一个简单的“Hello World”示例

最好从一个语义化的“hello world”示例开始。这是代码(不包括引导程序,它与前一篇文章中使用的相同)

using System;

using Clifton.Core.Semantics;

namespace SemanticPublisherSubscriberDemo
{
  static partial class Program
  {
    static void Main(string[] args)
    {
      InitializeBootstrap();
      Bootstrap((e) => Console.WriteLine(e.Message));

      ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();
      semProc.Register<SurfaceMembrane, Subscriber>();
      semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);
    }
  }

  public class ST_Message : ISemanticType
  {
    public string Text { get; set; }

    public ST_Message()
    {
      Console.WriteLine("Message Instantiated.");
    }
  }

  public class Subscriber : IReceptor
  {
    public Subscriber()
    {
      Console.WriteLine("Subscriber Instantiated.");
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)
    {
      Console.WriteLine(msg.Text);
    }
  }
}

并且 *modules.xml* 文件(参见前一篇文章)如下所示:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
</Modules>;

输出是:

剖析 Hello World 示例

以下是具体情况:

获取发布/订阅者

引导后,我们要做的第一件事就是获取语义发布/订阅者单例。

ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();

注册订阅者

订阅者被称为“Receptors”。记住这一点。

semProc.Register<SurfaceMembrane, Subscriber>();

SurfaceMembrane 是一个内置膜,用于方便地发送到订阅者的消息。

发布消息

最后一行

semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);

发布了一条消息。在此示例中,消息参数由发布/订阅者实例化消息后调用的 Action<T> 初始化。

在这里,我们提供可选值 true 来指示消息应在调用者的线程上处理。如果我们不这样做,我们简单的控制台应用程序将在没有给线程池足够时间处理消息的情况下结束!

消息类

所有消息都必须派生自 ISemanticType

public class ST_Message : ISemanticType

这仅仅是一个占位符,它提供了对注册和发布中使用的泛型参数的编译时类型检查。

public interface ISemanticType { }

所有接收者都收到相同的消息实例,因此消息应被视为不可变。

订阅者

所有订阅者都必须派生自 IReceptor(记住订阅者也称为“Receptors”),再次用于注册中使用的泛型参数的编译时类型检查。

public interface IReceptor { }

我倾向于在我的语义类型前加上“ST_”前缀,以将它们与其它类型区分开。

ISemanticType 一样,IReceptor 接口实际上只是一个用于编译时类型检查的占位符。

public interface IReceptor { }

由于订阅者(通常)为每个消息实例化,因此订阅者使用的任何非静态字段都特定于处理该消息的实例。当订阅者执行需要内部状态管理的复杂任务时,这对于线程安全非常有帮助。

订阅消息

每条消息都在重载的 Process 方法中接收。

public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)

还提供了发布/订阅者(语义处理器)以及发送消息的膜。这为响应者提供了在同一膜上发布消息的必要信息(如果它愿意)。

实现 Process 方法的类将仅接收在其声明的膜中发布的 P消息。虽然消息可以渗透膜,但我们不会在本篇文章中讨论此功能。

幕后

接下来,我们看看发布/订阅者是如何工作的。

注册订阅者

注册订阅者有多种方式,但两种常见的方式是带初始化程序或不带初始化程序。

public void Register<M, T>()
  where M : IMembrane, new()
  where T : IReceptor
{
  Register<T>();
  IMembrane membrane = RegisterMembrane<M>();
  membraneReceptorTypes[membrane].Add(typeof(T));
}

public void Register<M, T>(Action<IReceptor> receptorInitializer)
  where M : IMembrane, new()
  where T : IReceptor
{
  Register<T>();
  Type receptorType = typeof(T);
  IMembrane membrane = RegisterMembrane<M>();
  membraneReceptorTypes[membrane].Add(receptorType);
  receptorInitializers[new MembraneReceptor() 
     { Membrane = membrane, ReceptorType = receptorType }] = 
     new ReceptorInitializer() { Initializer = receptorInitializer };
}

如果您提供初始化程序,它将在 Receptor(subscriber 类)实例化时被调用,其中 IReceptor 是正在实例化的类的实例。初始化程序特定于包含 Receptor(subscriber)的膜,从而能够根据与其关联的膜(通道)以不同的参数初始化相同的 Receptor。

消息发布

当使用 ProcessInstance 发布消息时,可以提供消息的初始化程序,正如我们在上面的示例中看到的。

/// <summary>
/// Process a semantic type, allowing the caller to specify an initializer 
/// before processing the instance.
/// </summary>
public void ProcessInstance<M, T>(Action<T> initializer, bool processOnCallerThread = false)
  where M : IMembrane, new()
  where T : ISemanticType, new()
{
  T inst = new T();
  initializer(inst);
  ProcessInstance<M, T>(inst, processOnCallerThread);
}

public void ProcessInstance<M, T>(bool processOnCallerThread = false)
  where M : IMembrane, new()
  where T : ISemanticType, new()
{
  T inst = new T();
  ProcessInstance<M, T>(inst, processOnCallerThread);
}

另外请注意调用订阅者在调用者线程上执行的选项,该选项默认为 false

消息处理

核心消息处理器识别膜中的所有 Receptor(订阅者),实例化它们,然后立即执行调用或将其排队。

protected void ProcessInstance<T>
(IMembrane membrane, IMembrane caller, T obj, bool processOnCallerThread)
  where T : ISemanticType
{
  // We get the source object type.
  Type tsource = obj.GetType();

  // Then, for each target type that is interested in this source type, 
  // we construct the target type, then invoke the correct target's Process method.
  // Constructing the target type provides us with some really interesting abilities.
  // The target type can be treated as an immutable object. We can, for instance, execute
  // the Process call on a separate thread. Constructing the target type ensures that the
  // target is stateless -- state must be managed external of any type!

  // Stateless receptors:

  List<Type> receptors = GetReceptors(membrane, tsource);
  Log(membrane, obj);

  foreach (Type ttarget in receptors)
  {
    // We can use dynamic here because we have a <T> generic to resolve the call parameter.
    // If we instead only have the interface ISemanticType, 
    // dynamic does not downcast to the concrete type --
    // therefore it can't locate the call point because it implements the concrete type.
    dynamic target = Activator.CreateInstance(ttarget);

  ReceptorInitializer receptorInitializer;

  if (receptorInitializers.TryGetValue(new MembraneReceptor() 
     { Membrane = membrane, ReceptorType = ttarget }, out receptorInitializer))
  {
    receptorInitializer.Initializer(target);
  }

  // Call immediately?
  if (processOnCallerThread)
  {
    Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                             Proc = () => target.Process(this, membrane, obj) });
  }
  else
  {
    // Pick a thread that has the least work to do.
    threadPool.MinBy(tp => tp.Count).Enqueue(
      new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                          Proc = () => target.Process(this, membrane, obj) });
  }
}

  // Also check stateful receptors
  List<IReceptor> sreceptors = GetStatefulReceptors(membrane, tsource);

  foreach (IReceptor receptor in sreceptors)
  {
    dynamic target = receptor;
    // Call immediately?
    if (processOnCallerThread)
    {
      Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
        Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
    }
    else
    {
      threadPool.MinBy(tp => tp.Count).Enqueue(new DynamicCall() 
                      { SemanticInstance = obj, Receptor = target, 
        Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
    }
  }

  ProcessInnerTypes(membrane, caller, obj, processOnCallerThread);
  PermeateOut(membrane, caller, obj, processOnCallerThread);
}

乍一看这似乎很复杂,但实际上非常简单。

  1. 首先,获取指定膜中处理该消息的所有 Receptor(订阅者)。
  2. 记录消息,这可以由您实现的任何记录服务处理。
  3. 每个 Receptor(订阅者类)都被实例化,可选地初始化,然后执行回调或排队。
  4. 有状态的 Receptor(稍后讨论)也会被调用。
  5. 执行两个结束步骤:
    1. 实现内部语义类型的消息也会被发布。这是一个奇怪但有趣的功能,它允许应用程序订阅包装消息体内的语义消息。例如,一个地址消息可能包含邮政编码等语义类型。订阅者可能对其自身的目的感兴趣处理邮政编码。此机制有助于创建内部语义类型的自动处理。
    2. 与膜计算一样,如果配置得当,消息可以渗透到其他膜。此功能允许应用程序通过特定消息桥接膜(通道/容器),以便其他膜中的订阅者可以对这些消息执行额外的计算。

处理内部类型

由...处理

protected void ProcessInnerTypes(IMembrane membrane, IMembrane caller, 
                                 ISemanticType obj, bool processOnCallerThread)
{
  var properties = obj.GetType().GetProperties(BindingFlags.Instance | 
                   BindingFlags.Public).Where(
      pi => pi.PropertyType.GetInterfaces().Contains(typeof(ISemanticType)));

  properties.ForEach(pi =>
  {
    ISemanticType prop = (ISemanticType)pi.GetValue(obj);
    prop.IfNotNull((p) => ProcessInstance(membrane, caller, p, processOnCallerThread));
  });
}

简要了解渗透膜

来自维基百科 膜计算

在膜计算中,数据(或在我们的例子中,消息)可以向内渗透到内部膜,向外渗透到包含膜。在上图中,“环境”由 SurfaceMembrane 类型表示。

渗透膜

由...处理

protected void PermeateOut<T>(IMembrane membrane, IMembrane caller, 
                              T obj, bool processOnCallerThread)
  where T : ISemanticType
{
  List<IMembrane> pmembranes = ((Membrane)membrane).PermeateTo(obj);
  pmembranes.Where(m=>m != caller).ForEach((m) => ProcessInstance
                  (m, membrane, obj, processOnCallerThread));
}

通过忽略原始调用中的膜来防止无限递归。

入站和出站渗透性

膜(通道/容器)对外部膜和任何内部膜都具有渗透性。

/// <summary>
/// Given this membrane's outbound list, what membranes are inbound permeabe to the ST as well?
/// </summary>
public List<IMembrane> PermeateTo(ISemanticType st)
{
  List<IMembrane> ret = new List<IMembrane>();
  Type sttype = st.GetType();

  if (outboundPermeableTo.Contains(sttype))
  {
    // Can we traverse to the parent?
    if ((parent != null) && (parent.inboundPermeableTo.Contains(sttype)))
    {
      ret.Add(parent);
    }

    // Can we traverse to children?
    foreach (Membrane child in childMembranes)
    {
      if (child.inboundPermeableTo.Contains(sttype))
      {
        ret.Add(child);
      }
    }
  }

  return ret;
}
膜类型

自定义膜(通道)类型从不实现任何内容,它仅用于“通道”可以作为泛型类型指定。但它必须派生自 Membrane,例如:

public class LoggerMembrane : Membrane { }

基类处理渗透功能。

设置订阅者调用

在前面说明的代码中,对订阅者的调用是立即执行的:

Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, 
  Proc = () => target.Process(this, membrane, obj) });

或者排队在工作线程上:

// Pick a thread that has the least work to do.
threadPool.MinBy(tp => tp.Count).Enqueue(
  new DynamicCall() { SemanticInstance = obj, Receptor = target, 
                      Proc = () => target.Process(this, membrane, obj) });

调用被实现为一个由 DynamicCall 类包装的 Action

public class DynamicCall : ProcessCall
{
  public Action Proc { get; set; }

  public DynamicCall()
  {
    AutoDispose = true;
  }

  public override void MakeCall()
  {
    Proc();
  }
}

目标类型为 dynamic。我们使用此类型是为了将调用路由到订阅者中正确的重载 Process 方法。

出队工作

消息的出队由线程池中的线程处理。

protected void ProcessPoolItem(object state)
{
  ThreadSemaphore<ProcessCall> ts = (ThreadSemaphore<ProcessCall>)state;

  while (true)
  {
    ts.WaitOne();
    ProcessCall rc;

    if (ts.TryDequeue(out rc))
    {
      Call(rc);
    }
  }
}

请注意,我们没有使用 .NET 的线程池或 Task 机制,因为它们会引入处理工作的延迟,并且仅用于短暂的进程。因为应用程序的订阅者可能是长期运行的进程,所以线程池实际上是 Thread 实例的集合。

/// <summary>
/// Setup thread pool to for calling receptors to process semantic types.
/// Why do we use our own thread pool? Because .NET's implementation (and
/// particularly Task) is crippled and non-functional for long running threads.
/// </summary>
protected void InitializePoolThreads()
{
  for (int i = 0; i < MAX_WORKER_THREADS; i++)
  {
    Thread thread = new Thread(new ParameterizedThreadStart(ProcessPoolItem));
    thread.IsBackground = true;
    ThreadSemaphore<ProcessCall> ts = new ThreadSemaphore<ProcessCall>();
    threadPool.Add(ts);
    thread.Start(ts);
  }
}

进行调用

调用本身被包装在一个异常处理器中。

protected void Call(ProcessCall rc)
{
  try
  {
    rc.MakeCall();
  }
  catch (Exception ex)
  {
    Exception ex2 = ex;
    // Prevent recursion if the exception process itself throws an exception.
    if (!(rc.SemanticInstance is ST_Exception))
    {
      ProcessInstance(Logger, new ST_Exception(ex), true);
    }

    while (ex2.InnerException != null)
    {
      ex2 = ex2.InnerException;
      // Prevent recursion if the exception process itself throws an exception.
      if (!(rc.SemanticInstance is ST_Exception))
      {
        ProcessInstance(Logger, new ST_Exception(ex2), true);
      }
    }
  }
  finally
  {
    if ( (rc.Receptor is IDisposable) && (rc.AutoDispose) )
    {
      ((IDisposable)rc.Receptor).Dispose();
    }
  }
}

异常(包括内部异常)会在语义处理器为我们创建的 Logger 膜上发布。

public IMembrane Logger { get; protected set; }
...
Logger = RegisterMembrane<LoggerMembrane>();

请注意,在 finally 块中,如果 Receptor(订阅者)实现了 IDisposable 并且是无状态订阅者(由语义处理器(发布/订阅者)实例化的订阅者),则会调用其 Dispose 方法。

有状态订阅者

当需要维护订阅者整体状态时,有状态订阅者可能很有用(为了避免冗余)。一些用例包括管理连接的订阅者,我们希望保持连接打开,而不是每次处理涉及连接的消息时都打开和关闭它。

有状态 Receptor 的初始化示例如下:

semProc.Register<SurfaceMembrane>(new StatefulSubscriber());

请注意,订阅者类型的泛型参数被省略了,而是将订阅者的实例传递给了 Register 方法。

有状态订阅者的简单实现如下所示:

public class StatefulSubscriber : IReceptor
{
  protected int counter = 0;

  public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message2 msg)
  {
    Console.WriteLine(counter + ": " + msg.Text);
    ++counter;
  }
}

请注意,它正在处理类型为 ST_Message2 的消息,这仅仅是为了演示目的,是一种区分两个示例的方法。消息发布完全相同:

semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Hello World", true);
semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Goodbye World", true);

在这里,我们发布两条消息,结果是:

我们观察到订阅者实例被保留了。

内部魔法

发布/订阅者通过反射提取有状态订阅者处理的消息。

/// <summary>
/// Register a stateful receptor contained within the specified membrane.
/// </summary>
public void Register(IMembrane membrane, IReceptor receptor)
{
  statefulReceptors.Add(receptor);
  Type ttarget = receptor.GetType();

  MethodInfo[] methods = ttarget.GetMethods();

  foreach (MethodInfo method in methods)
  {
    // TODO: Use attribute, not specific function name.
    if (method.Name == "Process")
    {
      ParameterInfo[] parameters = method.GetParameters();
      InstanceNotify(receptor, parameters[2].ParameterType);
    }
  }

  membranes[membrane.GetType()] = membrane;
  membraneReceptorInstances[membrane].Add(receptor);
}

这会检查类中的每个 Process 方法,并假定这些方法将具有预期的签名(此代码可以改进)。提取消息类型是为了之后确定有状态订阅者是否处理已发布的消息。

性能

从上面的代码中,您可能已经注意到,在典型情况下,会为每条消息实例化(并可选地释放)一个订阅者,并执行一个可选的初始化程序。此外,还使用了 dynamic 订阅者实例来隐藏内部反射。还有嵌套级别的调用以及创建调用的开销,这些调用要么立即执行,要么排队到工作线程。复杂的配置,包括膜渗透和内部消息发布,增加了性能开销。

相反,此发布/订阅者的实现非常灵活、线程安全,并且应用程序对订阅者可能引发的异常具有弹性。在实现通用组件时,这始终是权衡——有用的功能以牺牲原始性能为代价。通常,发布/订阅者处理的消息来自低带宽事件,无论是来自硬件的用户输入,甚至是页面或 Web 服务请求。如果您确实需要高性能,那么有更简单的发布/订阅者实现,或者您甚至可能不想使用这种模式。

整合起来 - 一个例子

让我们使用模块管理器、服务管理器和发布/订阅者编写一个 Web 服务器!

首先,一个记录器服务

我们从 *modules.xml* 文件中定义的几个模块开始:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
</Modules>

能够记录事情(尤其是异常!)是任何开发过程都应该开始的第一件事。记录器服务应该能够处理作为服务对其进行的调用,以及记录发布/订阅者发布的异常消息。这是我们将要使用的记录器服务:

using System;

using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using ServiceInterfaces;

namespace ConsoleLoggerService
{
  public class LoggerModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IConsoleLoggerService, LoggerService>();
    }
  }

  public class LoggerService : ServiceBase, IConsoleLoggerService, IReceptor
  {
    public override void FinishedInitialization()
    {
      ISemanticProcessor semProc = ServiceManager.Get<ISemanticProcessor>();
      semProc.Register<LoggerMembrane, GenericTypeLogger>();
      semProc.Register<LoggerMembrane, LoggerService>();
    }

    public void Log(string msg)
    {
      Console.WriteLine(msg);
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Exception msg)
    {
      Log(msg.Exception.Message);
      Log(msg.Exception.StackTrace);
    }

    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Log msg)
    {
      Log(msg.Message);
    }
  }

  public class GenericTypeLogger : IReceptor
  {  
    public void Process(ISemanticProcessor semProc, IMembrane membrane, ISemanticType t)
    {
      if ( (!(t is ST_Log)) && (!(t is ST_Exception)) )
      {
        Console.WriteLine("Publishing type: " + t.GetType().Name);
      }
    }
  }
}

这个记录器有三点有趣之处:

  1. 它是一个服务,所以我们可以像对待服务一样对待它。
  2. 但是,该服务也实现了 IReceptor,使其能够处理应用程序发布的 ST_Log 消息以及发布/订阅者在其内部 Logger“通道”上发出的 ST_Exception 消息。
  3. 一个通用类型的记录器被实例化为一个单独的 Receptor,它(得益于发布/订阅者能够为基类型/接口发出消息的能力)始终记录消息类型。我们忽略了日志和异常消息类型,因为记录类型,然后是实际的日志或异常消息,似乎很愚蠢。

上面是一个有趣的实现,因为注册了一个单例服务,但发布/订阅者为每个日志消息创建一个实例!

一个测试应用程序显示了这一切是如何工作的:

static partial class Program
{
  static void Main(string[] args)
  {
    InitializeBootstrap();
    Bootstrap((e) => Console.WriteLine(e.Message));

    TestLogging();
    Console.WriteLine("Press ENTER to exit the server.");
    Console.ReadLine();
  }

  static void TestLogging()
  {
    serviceManager.Get<IConsoleLoggerService>().Log("Foobar");
    serviceManager.Get<ISemanticProcessor>().ProcessInstance<LoggerMembrane, 
                   ST_Log>(l => l.Message = "Hi there!", true);
    serviceManager.Get<ISemanticProcessor>().Register<SurfaceMembrane, ExceptionProcess>();
    serviceManager.Get<ISemanticProcessor>().
                   ProcessInstance<SurfaceMembrane, ST_TestException>();
  }
}

public class ST_TestException : ISemanticType { }

public class ExceptionProcess : IReceptor
{
  public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_TestException msg)
  {
    throw new ApplicationException("I Broke!");
  }
}

这是输出:

输出说明:

  1. 将记录器用作服务
  2. 发布一条日志消息
  3. 记录通用类型消息
  4. 发布/订阅者处理异常并将其发送给我们记录器

现在,我们完成了记录器模块。

接下来,一个 Web 服务器

我更喜欢使用自己的技术而不是 IIS、ASP.NET、Razor、MVC 等。所以我们将编写一个简单的 Web 服务器,它由几个服务构建而成。

HTTP 侦听服务

这是一个非常简单的 HTTP 侦听器的实现。它接收请求并将请求作为消息发布到发布/订阅者。

using System.IO;
using System.Net;
using System.Threading.Tasks;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace WebServerService
{
  public class WebServerModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IWebServerService, WebServer>();
    }
  }

  public class WebServer : ServiceBase, IWebServerService
  {
    protected HttpListener listener;
    protected ILoggerService logger;
    protected ISemanticProcessor semProc;
    protected bool httpOnly;

    public virtual void Start(string ip, int port)
    {
      logger = ServiceManager.Get<ILoggerService>();
      semProc = ServiceManager.Get<ISemanticProcessor>();
      listener = new HttpListener();
      string url = IpWithPort(ip, port);
      logger.Log("Listening on " + ip + ":" + port);
      listener.Prefixes.Add(url);
    }

    listener.Start();
    // Yes, this is a long running task. One of them isn't a problem.
    Task.Run(() => WaitForConnection(listener));
  }

    protected virtual void WaitForConnection(object objListener)
    {
      HttpListener listener = (HttpListener)objListener;

      while (true)
      {
        // Wait for a connection. Return to caller while we wait.
        HttpListenerContext context = listener.GetContext();
        string verb = context.Request.HttpMethod;
        string path = context.Request.RawUrl.LeftOf("?").RightOf("/");
        string parms = context.Request.RawUrl.RightOf("?");
        logger.Log(verb + ": " + path);

        string data = new StreamReader(context.Request.InputStream, 
                                       context.Request.ContentEncoding).ReadToEnd();
        ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane, 
                           ST_HttpRequest>(r =>
        {
	  r.Context = context;
          r.Verb = verb;
          r.Path = path;
          r.Parameters = parms;
          r.Data = data;
        });
      }
    }

    /// <summary>
    /// Returns the url appended with a / for port 80, otherwise, 
    /// the [url]:[port]/ if the port is not 80.
    /// </summary>
    protected string IpWithPort(string ip, int port)
    {
      string ret;

      if (port == 80)
      {
        ret = "http://" + ip + "/";
      }
      else
      {
        ret = "http://" + ip + ":" + port.ToString() + "/";
      }

      return ret;
    }
  }
}

消息在单独的线程上处理,让侦听器循环立即返回以等待另一个连接。

我们将此模块添加到 *modules.xml* 文件中:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
</Modules>

我们可以在应用程序中添加一行来测试服务器:

static void Main(string[] args)
{
  InitializeBootstrap();
  Bootstrap((e) => Console.WriteLine(e.Message));

  serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);

  Console.WriteLine("Press ENTER to exit the server.");
  Console.ReadLine();
}

即使没有订阅者,我们的记录器也会告诉我们它已收到 Web 请求消息。

当然,浏览器耐心地等待响应,而我们却没给它!

语义路由器服务

让我们添加一个语义路由器。它将负责将请求动词和路径映射到一个语义类型,而不是一个处理路径的方法。语义类型是动态实例化的,并且要么用 URL 本身上的查询值填充,要么用数据流中的 JSON 数据填充。我将组合一个相当简单的:

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Reflection;

using Newtonsoft.Json;

using Clifton.Core.Utils;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace SemanticWebRouterService
{
  // Struct, so it's key-able.
  public struct Route
  {
    public string Verb { get; set; }
    public string Path { get; set; }
  }

  public class SemanticWebRouterModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<ISemanticWebRouterService, WebRouterService>();
    }
  }

  public class WebRouterService : ServiceBase, ISemanticWebRouterService
  {
    protected Dictionary<Route, Type> semanticRoutes;

    public WebRouterService()
    {
      semanticRoutes = new Dictionary<Route, Type>();
    }

    public override void FinishedInitialization()
    {
      base.FinishedInitialization2();
      ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, RouteProcessor>();
    }

    public void Register<T>(string verb, string path) where T : ISemanticRoute
    {
      semanticRoutes[new Route() { Verb = verb.ToLower(), Path = path.ToLower() }] = typeof(T);
    }

    public void RouteRequest(ST_HttpRequest req)
    {
      Route route = new Route() { Verb = req.Verb.ToLower(), Path = req.Path.ToLower() };
      Type routeHandler;
      bool found = semanticRoutes.TryGetValue(route, out routeHandler);
      ISemanticRoute semanticRoute = null;

      if (found)
      {
        semanticRoute = InstantiateRouteHandler(routeHandler, req);
        semanticRoute.Context = req.Context;
        ServiceManager.Get<ISemanticProcessor>().
                       ProcessInstance<WebServerMembrane>(semanticRoute);
      }
      else
      {
        ServiceManager.Get<ILoggerService>().Log("Route not found.");
      }
    }

    protected ISemanticRoute InstantiateRouteHandler(Type routeHandler, ST_HttpRequest req)
    {
      ISemanticRoute semanticRoute = (ISemanticRoute)Activator.CreateInstance(routeHandler);

      if (!string.IsNullOrEmpty(req.Data))
      {
        // We assume data will be in JSON format.
        JsonConvert.PopulateObject(req.Data, semanticRoute);
      }
      else if (req.Verb.ToLower() == "get")
      {
        PopulateFromQueryString(req, semanticRoute);
      }

      return semanticRoute;
    }

    protected void PopulateFromQueryString(ST_HttpRequest req, ISemanticRoute semanticRoute)
    {
      NameValueCollection nvc = req.Context.Request.QueryString;

      foreach (string key in nvc.AllKeys)
      {
        PropertyInfo pi = semanticRoute.GetType().GetProperty(key, 
          BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase);

        if (pi != null)
        {
          object valOfType = Converter.Convert
          (Uri.UnescapeDataString(nvc[key].Replace('+', ' ')), pi.PropertyType);
          pi.SetValue(semanticRoute, valOfType);
        }
      }
    }
  }

  public class RouteProcessor : IReceptor
  {
    public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_HttpRequest req)
    {
      semProc.ServiceManager.Get<ISemanticWebRouterService>().RouteRequest(req);
    }
  }
}

这里的模式应该显而易见:

  1. 创建一个实现 IModule 的类。
  2. 注册服务。
  3. 在这种情况下,服务注册了一个 ST_HttpRequest 消息的订阅者。
  4. 消息订阅者只需回调服务进行处理。

测试

将其添加到 *modules.xml* 文件中:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
</Modules>

接下来,我们将编写注册一个路由来记录设置属性,以便我们至少可以测试查询参数初始化过程。这是我们注册路由的方式:

ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
router.Register<ST_Foobar>("get", "foobar");

请注意,我们没有注册一个方法来处理路由,而是注册了一个语义类型,它会在特定路由时被实例化。

这是一个简单的测试类型:

public class ST_Foobar : SemanticRoute
{
  public string Test
  {
    get { return Test; }
    set
    {
      Program.serviceManager.Get<ILoggerService>().Log
              ("test parameter set to: " + value.Quote());
    }
  }
}

现在我们可以尝试一下:

当然,这只是一个最基本示例。没有身份验证、授权、会话管理等。而且我们仍然没有响应浏览器请求!

一些有趣的点:

  • 路由器线程在找到(或未找到)路由后将退出,并且路由处理程序消息以新线程将接收调用的方式发布。
  • 因为我们使用的是发布/订阅者,所以路由可以由多个订阅者处理。为什么你想要这样做,我不确定。
  • 因为我们将路由与一个 *语义类型*(好吧,一个类的花哨名称)关联起来,所以我们可以将参数反序列化到该类中,并且因为它实际上是一个语义消息,所以它完美地契合发布/订阅者概念。
  • 理想情况下,我们将路由处理程序实现为模块本身,使应用程序免于进行路由注册。
    • 这样做的好处是可以随时通过添加定义给定路由的语义类型并实现这些消息的订阅者的模块来为 Web 服务器添加新行为。

响应者

最后,我们的 Web 服务器需要一些简单的响应者,同样作为模块实现。请注意,在这种情况下,服务接口是一个占位符,因为没有直接可调用的 public 方法。

using System.Text;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;

namespace WebResponseService
{
  // Here, we create a placeholder, because this service is not actually exposed.
  // All activities are handled as a subscriber.
  public interface IWebResponseService : IService { }

  public class WebResponseModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<IWebResponseService, WebResponseService>();
    }
  }

  public class WebResponseService : ServiceBase, IWebResponseService
  {
    public override void FinishedInitialization()
    {
      base.FinishedInitialization2();
      ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, WebResponder>();
    }
  }

  public class WebResponder : IReceptor
  {
    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_JsonResponse resp)
    {
      resp.Context.Response.StatusCode = resp.StatusCode;
      resp.Context.Response.ContentType = "text/json";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      byte[] byteData = resp.Json.to_Utf8();
      resp.Context.Response.ContentLength64 = byteData.Length;
      resp.Context.Response.OutputStream.Write(byteData, 0, byteData.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_HtmlResponse resp)
    {
      byte[] utf8data = resp.Html.to_Utf8();
      resp.Context.Response.ContentType = "text/html";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_CssResponse resp)
    {
      byte[] utf8data = resp.Css.to_Utf8();
      resp.Context.Response.ContentType = "text/css";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process
    (ISemanticProcessor proc, IMembrane membrane, ST_JavascriptResponse resp)
    {
      byte[] utf8data = resp.Javascript.to_Utf8();
      resp.Context.Response.ContentType = "text/javascript";
      resp.Context.Response.ContentEncoding = Encoding.UTF8;
      resp.Context.Response.ContentLength64 = utf8data.Length;
      resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
      resp.Context.Response.Close();
    }

    public void Process(ISemanticProcessor proc, IMembrane membrane, ST_RouteNotFound resp)
    {
      resp.Context.Response.StatusCode = 404;	// respond with page not found 404 error code.
      resp.Context.Response.Close();
    }
  }
}

现在让我们回到语义路由器,并为未定义的路由发布 ST_RouteNotFound 消息。

ServiceManager.Get<ILoggerService>().Log("Route not found.");
ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane, 
               ST_RouteNotFound>(r=>r.Context=req.Context);

我们将此模块添加到 *modules.xml* 文件中:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
  <Module AssemblyName='WebResponseService.dll'/>
</Modules>

现在我们有了对浏览器的第一个成功响应!

不再旋转!

响应来自文件的数据(HTML、CSS 等)

我们将再添加一个服务来响应来自 HTML/CSS 文件的数据。谁知道呢,您可能希望将其替换或扩展为从服务器返回的数据,因此将其制作成 YAM - Yet Another Module 是有意义的。同样,服务接口只是一个占位符,因为没有暴露的服务方法。我们目前只处理三种文件类型:HTML、CSS 和 JavaScript。

using System;
using System.IO;
using System.Net;

using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;

using Semantics;
using ServiceInterfaces;

namespace WebFileResponseService
{
  // Here, we create a placeholder, because this service is not actually exposed.
  // All activities are handled as a subscriber.
  public interface IFileResponseService : IService { }

  public class FileResponseModule : IModule
  {
    public void InitializeServices(IServiceManager serviceManager)
    {
      serviceManager.RegisterSingleton<FileResponseService, FileResponseService>();
    }
  }

    public class FileResponseService : ServiceBase, IFileResponseService
    {
      public override void FinishedInitialization()
      {
        base.FinishedInitialization2();
        ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, FileResponder>();
      }
    }
 
    public class FileResponder : IReceptor
    {
      public void Process(ISemanticProcessor proc, IMembrane membrane, ST_FileResponse resp)
      {
        ProcessFileRequest(proc, resp.Context);
      }

      protected void ProcessFileRequest(ISemanticProcessor semProc, HttpListenerContext context)
      {
        bool handled = false;
        string path = context.Request.RawUrl.LeftOf("?").RightOf("/").LeftOfRightmostOf('.');
        string ext = context.Request.RawUrl.RightOfRightmostOf('.');

        if (String.IsNullOrEmpty(path))
        {
          path = "index";
        }

        if (String.IsNullOrEmpty(ext))
        {
          ext = "html";
        }

        path = path + "." + ext;
        // Hardcoded folder path for the website!
        path = Path.Combine("Website", path);

        if (File.Exists(path))
        {
          switch (ext)
          {
            case "html":
              semProc.ProcessInstance<WebServerMembrane, ST_HtmlResponse>(r =>
              {
                r.Context = context;
                r.Html = ReadTextFile(path);
              });
              break;

            case "js":
              semProc.ProcessInstance<WebServerMembrane, ST_JavascriptResponse>(r =>
              {
                r.Context = context;
                r.Javascript = ReadTextFile(path);
              });
              break;

            case "css":
              semProc.ProcessInstance<WebServerMembrane, ST_CssResponse>(r =>
              {
                r.Context = context;
                r.Css = ReadTextFile(path);
              });
              break;
          }

        handled = true;
      }

      if (!handled)
      {
        semProc.ServiceManager.Get<ILoggerService>().Log("Route not found.");
        semProc.ProcessInstance<WebServerMembrane, ST_RouteNotFound>(r => r.Context = context);
      }
    }

    protected string ReadTextFile(string fn)
    {
      string text = File.ReadAllText(fn);
  
      return text;
    }

    protected byte[] ReadBinaryFile(string fn)
    {
      FileStream fStream = new FileStream(fn, FileMode.Open, FileAccess.Read);
      BinaryReader br = new BinaryReader(fStream);
      byte[] data = br.ReadBytes((int)fStream.Length);
      br.Close();
      fStream.Close();

      return data;
    }
  }
}

测试

再次,我们将此模块添加到 *modules.xml* 文件中:

<?xml version="1.0" encoding="utf-8" ?>
<Modules>
  <Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
  <Module AssemblyName='ConsoleLoggerService.dll'/>
  <Module AssemblyName='WebServerService.dll'/>
  <Module AssemblyName='SemanticWebRouterService.dll'/>
  <Module AssemblyName='WebResponseService.dll'/>
  <Module AssemblyName='WebFileResponseService.dll'/>
</Modules>

现在让我们对应用程序做一个简单的更改。我们将 ST_FileResponse 语义消息映射到路由“foobar”,以便加载特定的页面。这是整个程序:

static partial class Program
{
  static void Main(string[] args)
  {
    InitializeBootstrap();
    Bootstrap((e) => Console.WriteLine(e.Message));

    serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);

    ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
    router.Register<ST_FileResponse>("get", "foobar");

    Console.WriteLine("Press ENTER to exit the server.");
    Console.ReadLine();
  }
}

粗体行

router.Register<ST_FileResponse>("get", "foobar");

执行路由映射。

现在,让我们将一个 HTML 文件添加到我们的 Website 文件夹中,该文件夹位于 *bin\Debug*(是的,我在上面的代码中硬编码了网站路径,通常我会从 *app.config* 文件中检索它,使用您猜到的,我在此系列前一篇文章中描述的 AppConfigService)。

该文件很简单:

<h1>Foobar!</h1>

这是结果:

结论

好了,现在够了。我最初打算使用所有技术来编写一个简单的游戏,但我认为仅用 Web 服务器组件演示发布/订阅者就足以写这篇文章了!在演示代码中,您会注意到 Web 服务器演示实际上在 HuntTheWumpus 项目中。那就是我打算用来演示发布/订阅者的游戏,这很可能也是下一篇文章的内容。

回顾一下已经取得的成就:

  • 进一步解耦服务。除了核心服务(如记录器)和用于注册路由的路由服务的直接访问外,通信完全由发布/订阅者处理。事实上,一些服务甚至不实现服务方法,接口只是占位符。
  • 消息在独立线程上处理,这对于此类应用程序来说非常理想。
  • 为每条消息实例化订阅者,这在线程安全性至关重要的情况下是一个理想的功能。
  • 发布/订阅者实现了日志记录和异常处理。

因此,这四篇文章展示了构成 Clifton 方法的四个基础组件。无论我是在编写 Web 应用程序还是客户端应用程序,我几乎总是从这些组件以及我能够即插即用的不断增长的服务库开始。您可以在 GitHub 上查看整个工具包。那里提供的实现与这里介绍的内容存在细微差别。通过撰写这四篇文章的过程,我更新了 GitHub 存储库中的代码以反映错误修复和一些小改进。

历史

  • 2016 年 8 月 25 日:初始版本
© . All rights reserved.