分布式语义计算






4.90/5 (14投票s)
在多线程、类型优先开发 (TFD) 系统中进行分布式语义计算
引言
我之前在《高阶编程环境》(HOPE) 系列文章中写过关于语义计算的内容。
高阶编程环境
APOD 网页爬虫,HOPE 的一个演示
在 HOPE 中实现的狩猎温普斯游戏
一个正在运行的语义数据库
语义网与自然语言处理
在本文中,我将使用类型优先开发 (TFD) 方法来演示分布式语义计算。TFD 是 Tomas Petricek 在其博客文章《为什么类型优先开发很重要》中首次提出的一个术语。
在本文中,我重写了 HOPE 引擎以利用“类型声明式编程”。这是一种编程风格,它严重依赖泛型来声明式地描述应该做什么,而不是怎么做。这是 TFD 的另一面——除了首先开发类型之外,我们还实现操作泛型的进程,特别是那些实现特定接口的进程。“类型声明式编程”类似于事件、委托、回调等的用法,用于实现程序行为的控制反转,但它是一种用于实例化对象的控制反转。与在运行时在 XML 中声明类型并编译的 HOPE 不同,在这里我们使用代码本身实现的类型。由于 .NET 丰富的反射和程序集加载功能,这种差异对 HOPE 的总体目标无关紧要,但对开发人员来说差异却非常显著,特别是对于类型化语言在运行时提供的安全性,以及在开发过程中(Intellisense 和编译时检查)类型化语言的易用性。
源代码
代码可在 GitHub 上找到: https://github.com/cliftonm/SemanticProcessor
类型优先编程基础
基于类型的编程的核心原则是它是声明式的。使用泛型,我们描述“我们想要实例化什么”而不是“如何/何时实例化”。正如 Petricek 先生关于类型优先编程所说的:“…在设计程序时,您开始考虑代码处理的数据的(数据)类型……开发不是由类型驱动的。它以类型开始……”这种区别在技术上非常简单。
如何
Foo foo = new Foo();
什么
Foo foo = Proggy.Make<Foo>();
虽然上面的“什么”示例看起来微不足道,但考虑到在一个简单的日志记录示例中,这为您提供了什么。
public static class Proggy { public static T Make<T>() where T : new() { Console.WriteLine("Making " + typeof(T).Name); return new T(); } } class Program { static void Main(string[] args) { Proggy.Make<StringBuilder>(); } }
语义计算基础
语义计算也归结为两个非常简单的概念:
- 类型是“语义”的——它不仅描述,而且限定了其结构的含义。
- 计算与语义类型相关联。
这与面向对象编程是正交的。在 OOP 中,对象会携带一组方法,这些方法实现对其他(通常不是语义的,而是本地的)类型的计算!例如,在一个简单的 OOP 类中
非语义示例
public class Receipt { public decimal Total(decimal amount, decimal taxes) { return amount * (1 + taxes); } } class Program { static void Main(string[] args) { Console.WriteLine("Non-semantic: " + new Receipt().Total(1M, .07M)); } }
语义示例
要将其转换为适合语义计算的内容,我们需要引入几个概念:作为语义类型的类,以及处理语义类型的类。
语义处理器实际上只是一个复杂的发布/订阅系统。
语义类型类
我们通过使用接口和具体类来实现语义类型。
public interface ISemanticType { } public class Purchase : ISemanticType { public decimal Total { get; set; } public decimal Taxes { get; set; } }
从技术上讲,甚至
Total
和 Taxes
也应该是/可能成为语义类型,为它们的语言原生类型提供语义意义。
语义类型
- 没有显式构造函数。
- 不实现计算方法。
- 实现一个声明该类为语义类型的接口——原因稍后会说明。
- 该接口没有任何方法或属性,它只是描述“这个东西是一个语义类型”的一种方式。
处理语义类型:接收器
我们需要实现“某个东西”来处理语义类型。借鉴 HOPE,“某个东西”被称为“接收器”。
public interface IReceptor { } public interface IReceptor<T> : IReceptor where T : ISemanticType { void Process(T semanticType); } public class Computation : IReceptor<Purchase> { public void Process(Purchase p) { Console.WriteLine("Semantic:" + p.Total * (1 + p.Taxes)); } }
我们注意到这里有几个关键点(我应该使用注释图标还是关键图标?)
- 我们有一个没有成员的
IReceptor
接口。 - 我们提供一个
IReceptor<T>
接口来声明一个具有ISemanticType
参数的特定Process
方法。虽然不是必需的,但这对于需要实现Process
方法的具体接收器来说是一个有用的声明。 - 我们实现一个具体接收器来处理
Purchase
类型。
介绍语义处理器
我们需要一个东西,当语义类型被实例化时,它会调用接收器上的 Process
方法来接收语义类型。
public class SemanticProcessor { protected Dictionary<Type, List<Type>> typeReceptors; public SemanticProcessor() { typeReceptors = new Dictionary<Type, List<Type>>(); } public void Register<T, R>() where T : ISemanticType where R : IReceptor { List<Type> receptors; Type ttype = typeof(T); Type rtype = typeof(R); if (!typeReceptors.TryGetValue(ttype, out receptors)) { receptors = new List<Type>(); typeReceptors[ttype] = receptors; } receptors.Add(rtype); } public void ProcessInstance<T>(Action<T> initializer) where T : ISemanticType, new() { Type ttype = typeof(T); T semType = new T(); initializer(semType); foreach (Type rtype in typeReceptors[ttype]) { dynamic receptor = Activator.CreateInstance(rtype); receptor.Process(semType); } } }
整合
使用 SemanticProcessor
包括一个两步过程:
- 将语义类型注册到一个或多个接收器。
- 当我们需要对语义类型执行某些计算时,调用
ProcessInstance
方法,并使用Action<T>
初始化器来初始化语义类型的属性。
看起来是这样的:
static void Main(string[] args) { // non-semantic computation: Console.WriteLine("Non-semantic: " + new Receipt().Total(1M, .07M)); // semantic computing: SemanticProcessor sp = new SemanticProcessor(); sp.Register<Purchase, Computation>(); sp.Process<Purchase>((t) => { t.Total = 1M; t.Taxes = 0.07M; }); }
我们观察到:
- 接收器是按需实例化的。这是一个巨大的优势,因为我们不再需要自己管理实例集合——扔掉你的依赖注入器!
- 我们利用
dynamic
关键字,让 C# 处理反射来调用所需语义类型实例的正确Process
方法。 - 我们的程序本身不实例化任何东西。
接收器由
SemanticProcessor
实例化这一事实,使我们能够在稍后更复杂的实现中:
- 将
Process
调用包装在 try-catch 块中,以提供统一的异常处理机制。 - 记录所有处理过程。
- 调用完成后,自动调用实现
IDisposable
的接收器的Dispose
方法。 - 异步执行调用——调用接收器来处理语义类型可以放到其他线程上。
- 接收器(由语义处理器专门为处理语义类型而构建)本质上是无状态的(有时我们需要有状态的接收器——语义处理器的实际实现支持这一点)。
- 将调用分发到网络上的其他接收器进行处理。
最后一点,“将调用分发到网络上的其他接收器”,为分布式语义计算开启了巨大的潜力!
实现真正的语义处理器
上述代码对于开发真正的语义应用程序来说过于简单。我们需要:
- 处理无状态(由语义处理器实例化)和有状态(由应用程序实例化)接收器的能力。
- 语义类型应在工作线程上异步(默认行为)以及在调用者的线程上处理,以满足同步需求。
- 除了语义类型本身,其子类型也应被任何感兴趣的方(接收器)处理。这使我们能够在保留使用子类型实现的行为的同时,创建新的语义类型。
- 为了管理语义类型在接收器之间的交换,我们需要“容纳”接收器之间通信的容器。在 HOPE 中,这些容器被称为膜,我在这里也会使用这个术语,借鉴于膜计算领域。
我们还希望该实现提供上一节末尾所述的功能。
- 将
Process
调用包装在 try-catch 块中,以提供统一的异常处理机制。 - 记录所有处理过程。
- 调用完成后,自动调用实现
IDisposable
的接收器的Dispose
方法。 - 异步执行调用——调用接收器来处理语义类型可以放到其他线程上。
- 接收器(由语义处理器专门为处理语义类型而构建)本质上是无状态的(有时我们需要有状态的接收器——语义处理器的实际实现支持这一点)。
- 将调用分发到网络上的其他接收器进行处理。
集成测试是说明语义处理器功能的绝佳方式——我将使用 NUnit 来运行集成测试。我还会时不时地深入研究一些更具吸引力的代码。
为什么我称它们为集成测试?因为它们演示了配置和测试特定的场景,而不是单个方法。这更有用,因为:
- 它说明了一个现实生活中的用例。
- 它锻炼了整个系统,而不是离散的方法。
- 在现实生活中,大多数有用的测试实际上都是集成测试。
膜注册
膜是接收器系统的容器,我称之为计算岛。膜也具有一些高级功能,我们稍后会讨论。但就目前而言,重要的是要知道,在任何一个语义处理器“系统”中,每种膜类型在该系统中只能存在一次。
在后台,实现看起来是这样的:
public IMembrane RegisterMembrane<M>() where M : IMembrane, new() { IMembrane membrane; Type m = typeof(M); if (!membranes.TryGetValue(m, out membrane)) { membrane = new M(); membranes[m] = membrane; membraneReceptorTypes[membrane] = new List<Type>(); membraneReceptorInstances[membrane] = new List<IReceptor>(); } return membrane; }
请注意,膜实例会立即实例化并存储在与类型关联的集合中。还会初始化其他集合,用于管理膜内的无状态接收器类型和有状态实例。
我们可以测试膜按类型是不同的。
/// <summary> /// Registering a membrane creates an instance of that membrane. /// </summary> [Test] public void RegisterMembraneType() { SemanticProcessor sp = new SemanticProcessor(); IMembrane membrane = sp.RegisterMembrane<TestMembrane>(); Assert.That(sp.Membranes.Contains(membrane), "Expected membrane instance."); } /// <summary> /// Registering the same membrane type returns the same instance. /// </summary> [Test] public void RegisterSameMembraneType() { SemanticProcessor sp = new SemanticProcessor(); IMembrane membrane1 = sp.RegisterMembrane<TestMembrane>(); IMembrane membrane2 = sp.RegisterMembrane<TestMembrane>(); Assert.That(membrane1 == membrane2, "Expected the same membrane instance."); }
无状态接收器
当语义类型被“发布”时,接收器会处理它。无状态接收器由语义处理器按需创建和销毁(考虑到它们通常在自己的线程上运行,这避免了有状态、持久接收器中可能出现的任何可变、跨线程问题——换句话说,系统会保护你免受自身的影响)。为了测试无状态接收器的一些基本操作,我们需要:
- 一个测试膜
- 一个测试语义类型
- 一个测试接收器
在我们的测试夹具中,接收器将实现 IDisposable,以便我们也可以测试调用后是否已处理该接收器。我们还将实现一个接口和一个子类化的接收器,以测试传递实现接口的对象。以下是基本组件:
public class TestMembrane : Membrane { } public class TestSemanticType : ISemanticType { } public interface ITestSemanticType { }; public class InterfaceTestSemanticType : ISemanticType, ITestSemanticType { } public class TestReceptor : IReceptor, IDisposable { public bool AFlag { get; set; } public TestReceptor() { constructorCalled = true; } public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t) { callSuccess = true; } public void Dispose() { disposeCalled = true; } } public class TestReceptor2 : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t) { callSuccess2 = true; } } public class DerivedTestReceptor : TestReceptor { } // IReceptor type is optional, but good practice to make sure you implement Process on the semantic type. public class InterfaceTestReceptor : IReceptor<ITestSemanticType> { public void Process(ISemanticProcessor proc, IMembrane membrane, ITestSemanticType t) { callSuccess = true; } }
集成测试的注释应充分说明测试的作用。在每个测试中,请检查膜、语义类型和接收器的设置方式。请注意,所有这些测试都是在“立即执行”模式下进行的,而不是将处理附加到线程上。这使得集成测试容易得多。
/// <summary> /// Given a receptor in a membrane, a semantic type put into that membrane is received by that receptor. /// </summary> [Test] public void ReceptorReceivesSemanticTypeOnItsMembrane() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected TestReceptor.Process to be called."); } /// <summary> /// Given a semantic type put into one membrane, the receptor in another membrane does not receive it. /// </summary> [Test] public void ReceptorDoesNotReceiveSemanticTypeOnAnotherMembrane() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane2, TestSemanticType>(true); Assert.That(!callSuccess, "Expected TestReceptor.Process to NOT be called."); } /// <summary> /// Test that when we remove a semantic type from a membrane's receptor, the receptor no longer gets Process calls. /// </summary> [Test] public void RemoveType() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.RemoveTypeNotify<TestMembrane, TestReceptor, TestSemanticType>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(!callSuccess, "Expected TestReceptor.Process to NOT be called."); } /// <summary> /// Verify that when processing a semantic type, the receptor, registered by type, is created and destroyed. /// </summary> [Test] public void ReceptorTypeCreateDestroy() { constructorCalled = false; disposeCalled = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(constructorCalled, "Expected constructor to be called."); Assert.That(disposeCalled, "Expected Dispose to be called."); } /// <summary> /// Test that a semantic instance initializer is called when the semantic type is constructed. /// </summary> [Test] public void InitializerCalledForSemanticTypeConstruction() { bool initializerCalled = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane, TestSemanticType>((t) => initializerCalled = true, true); Assert.That(initializerCalled, "Expected semantic type initializer to be called."); } /// <summary> /// Test that the base class' Process method gets called for a type that it handles, /// even though we instantiated a sub-class. /// </summary> [Test] public void BaseClassProcessCalled() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, DerivedTestReceptor>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected TestReceptor.Process to be called."); } /// <summary> /// Test that a receptor that implements Process on an interface gets called. /// </summary> [Test] public void ReceptorOfInterfaceTypeCalled() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, InterfaceTestReceptor>(); sp.ProcessInstance<TestMembrane, InterfaceTestSemanticType>(true); Assert.That(callSuccess, "Expected TestReceptor.Process to be called."); }
/// <summary> /// Verify that more than one receptor (but of different types in the same membrane) receives the Process call for the same semantic type. /// </summary> [Test] public void MultipleProcessCalls() { callSuccess = false; callSuccess2 = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>(); sp.Register<TestMembrane, TestReceptor2>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected TestReceptor.Process to be called."); Assert.That(callSuccess2, "Expected TestReceptor2.Process to be called."); } /// <summary> /// Verify that the receptor initializer is called when a stateless receptor is instantiated. /// </summary> [Test] public void ReceptorInitialization() { receptorInitializerCalled = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, TestReceptor>((ir) => { // Unfortunately, a cast is required, because ir is type declared as IReceptor // and I don't think it's possible to fix that because of the late callback. TestReceptor r = (TestReceptor)ir; r.AFlag = true; receptorInitializerCalled = true; }); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(receptorInitializerCalled, "Expected TestReceptor initializer to be called to be called."); }
幕后有一些有趣的事情在发生。首先是
ProcessInstance
方法本身(我只展示了该方法的一部分)。
protected void ProcessInstance<T>(IMembrane membrane, IMembrane caller, T obj, bool processOnCallerThread) where T : ISemanticType { Type tsource = obj.GetType(); List<Type> receptors = GetReceptors(membrane, tsource); Log(membrane, obj); foreach (Type ttarget in receptors) { dynamic target = Activator.CreateInstance(ttarget); ReceptorInitializer receptorInitializer; if (receptorInitializers.TryGetValue(new MembraneReceptor() { Membrane = membrane, ReceptorType = ttarget }, out receptorInitializer)) { receptorInitializer.Initializer(target); } // Call immediately? if (processOnCallerThread) { Call(new DynamicCall() { SemanticInstance = obj, Receptor = target, Proc = () => target.Process(this, membrane, obj) }); } else { // Pick a thread that has the least work to do. threadPool.MinBy(tp => tp.Count).Enqueue(new DynamicCall() { SemanticInstance = obj, Receptor = target, Proc = () => target.Process(this, membrane, obj) }); } }
另一半是调用本身,该调用要么立即执行,要么排队到一个拥有最少工作量的线程上。调用被包装在一个 try-catch 块中,如果接收器实现了 IDisposable
,则在处理完成后立即调用 Dispose
方法。
protected void Call(ProcessCall rc) { try { rc.MakeCall(); } catch (Exception ex) { // Prevent recursion if the exception process itself throws an exception. if (!(rc.SemanticInstance is ST_Exception)) { ProcessInstance(Logger, new ST_Exception(ex), true); } } finally { if ( (rc.Receptor is IDisposable) && (rc.AutoDispose) ) { ((IDisposable)rc.Receptor).Dispose(); } } }
如您所见,异常处理使用语义处理器——异常被包装成一个语义类型,并放入 Logger
膜中。Logger
膜是语义处理器为您创建的两个膜之一(另一个是 Surface
)。
日志记录接收器
既然我在上一节的结尾提到了异常日志记录,那么演示基本日志记录和异常日志记录的集成测试似乎是合理的。集成测试的脚手架包括一个测试接收器来抛出异常,以及另外两个接收器,一个用于正常事件日志记录,另一个用于异常日志记录。
public static bool stLogged; public static bool exLogged; public class TestMembrane : Membrane { } public class TestSemanticType : ISemanticType { } public class TypeThrowsException : ISemanticType { } public class TestReceptor : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t) { } public void Process(ISemanticProcessor proc, IMembrane membrane, TypeThrowsException t) { throw new ApplicationException("Receptor exception"); } } public class LoggerReceptor : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, ISemanticType t) { stLogged = t is TestSemanticType; } } public class ExceptionReceptor : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, ST_Exception ex) { exLogged = true; } }
标准日志记录器处理
ISemanticType
——由于所有语义类型都继承自这个接口,日志记录器的 Process 方法将收到对每个实例化到语义处理器中的语义类型的通知。
我们有两个集成测试,一个用于正常事件日志记录,一个用于异常日志记录。
/// <summary> /// Verify the a process call is logged. /// </summary> [Test] public void ProcessCallIsLogged() { stLogged = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<LoggerMembrane, LoggerReceptor>(); sp.Register<LoggerMembrane, ExceptionReceptor>(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(stLogged, "Expected Process call to be logged."); } /// <summary> /// Verify that an exception log is generated when a receptor process creates an exception. /// </summary> [Test] public void ExceptionIsLogged() { exLogged = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<LoggerMembrane, LoggerReceptor>(); sp.Register<LoggerMembrane, ExceptionReceptor>(); sp.Register<TestMembrane, TestReceptor>(); sp.ProcessInstance<TestMembrane, TypeThrowsException>(true); Assert.That(exLogged, "Expected Exception call to be logged."); }
有状态接收器
有状态接收器是处理单元,它们不是由语义处理器实例化的,而是由您实例化的。有状态接收器有很多好的理由:
- 日志记录——创建和销毁日志事件接收器会开始影响性能。
- 必须持久化以接收来自外部源消息的服务——我们将在分布式语义过程测试中看到一个例子。
- 复杂的初始化和/或有状态要求。
- etc
有状态接收器的集成测试与无状态接收器的集成测试本质上是相同的,所以我只展示一个来说明语法上的差异。
/// <summary> /// Given a receptor in a membrane, a semantic type put into that membrane is received by that receptor. /// </summary> [Test] public void ReceptorReceivesSemanticTypeOnItsMembrane() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane>(new TestReceptor()); sp.ProcessInstance<TestMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected TestReceptor.Process to be called."); }
注意 Register
函数有一个膜类型泛型参数,但传递的是一个接收器实例。
复杂类型处理
语义类型可以由其他语义类型组成。与 HOPE 一样,当一个复杂类型被实例化到膜空间时,不仅应该调用处理该基本类型的接收器,还应该调用处理其组成类型的接收器。这使我们能够在处理复杂类型所组成的较低级别类型的同时,创建复杂的类型系统。
为了测试这一点,我们需要一些脚手架:
public static bool simpleTypeProcessed; public static bool complexTypeProcessed; public class TestMembrane : Membrane { } public class SimpleType : ISemanticType { } public class ComplexType : ISemanticType { public SimpleType ASimpleType { get; set; } public ComplexType() { ASimpleType = new SimpleType(); } } public class ComplexReceptor : IReceptor<ComplexType> { public void Process(ISemanticProcessor pool, IMembrane membrane, ComplexType obj) { complexTypeProcessed = true; } } public class SimpleReceptor : IReceptor<SimpleType> { public void Process(ISemanticProcessor pool, IMembrane membrane, SimpleType obj) { simpleTypeProcessed = true; } }
我们有一个单一的集成测试来验证当复杂类型被实例化到膜中时,内部的“简单”类型也会被处理。
[Test] public void ComplexTypePropertyProcessing() { simpleTypeProcessed = false; complexTypeProcessed = false; SemanticProcessor sp = new SemanticProcessor(); sp.Register<TestMembrane, ComplexReceptor>(); sp.Register<TestMembrane, SimpleReceptor>(); sp.ProcessInstance<TestMembrane, ComplexType>(true); Assert.That(complexTypeProcessed, "Expected ComplexReceptor.Process to be called."); Assert.That(simpleTypeProcessed, "Expected SimpleReceptor.Process to be called."); }
在后台,使用反射来发现实现 ISemanticType 的公共属性。
/// <summary> /// Any public properties that are of ISemanticType type and not null are also emitted into the membrane. /// </summary> protected void ProcessInnerTypes(IMembrane membrane, IMembrane caller, ISemanticType obj, bool processOnCallerThread) { var properties = obj.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public). Where(pi => pi.PropertyType.GetInterfaces().Contains(typeof(ISemanticType))); properties.ForEach(pi => { ISemanticType prop = (ISemanticType)pi.GetValue(obj); if (prop != null) { ProcessInstance(membrane, caller, prop, processOnCallerThread); } }); }
膜渗透性
虽然我之前说过膜是接收器的容器,但它们也是语义类型的分层过滤器。语义类型可以渗透到一个膜中,或者它可以渗透出去,进入另一个膜。我们将在讨论分布式语义计算时使用这种行为,但首先,我们有一些测试来确保膜渗透性按照我们期望的方式工作。一如既往,我们有一些脚手架:
public static bool callSuccess; class TestMembrane : Membrane { } class OuterMembrane : Membrane { } class InnerMembrane : Membrane { } class InnerMembrane2 : Membrane { } public class TestSemanticType : ISemanticType { } public class TestReceptor : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, TestSemanticType t) { callSuccess = true; } }
我在这里添加了一些图片来解释正在发生的事情。
即使是分层的,您也可以认为膜是三维的,这样即使内部膜对类型具有向外渗透性,外部膜(它所在的膜)也必须对该类型具有向内渗透性!
向外渗透
/// <summary> /// Verify that, when the inner membrane is permeable outbound to a type, /// that a receptor in the outer membrane, permeable inbound to that type, receive the type. /// </summary> [Test] public void TypePermeatesOut() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>(); sp.InboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.AddChild<OuterMembrane, InnerMembrane>(); sp.Register<OuterMembrane, TestReceptor>(); sp.ProcessInstance<InnerMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected receptor in outer membrane to process the ST placed in the inner membrane."); }
向内渗透
/// <summary> /// Verify that, when the inner membrane is permeable inbound to a type, /// that a receptor in the inner membrane receives the type. /// </summary> [Test] public void TypePermeatesIn() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.OutboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.InboundPermeableTo<InnerMembrane, TestSemanticType>(); sp.AddChild<OuterMembrane, InnerMembrane>(); sp.Register<InnerMembrane, TestReceptor>(); sp.ProcessInstance<OuterMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected receptor in inner membrane to process the ST placed in the outer membrane."); }
横向渗透
鉴于向外渗透和向内渗透有效,横向渗透,通过另一个膜,也应该有效。
/// <summary> /// Verify that a type issued in one inner membrane can cross over to /// an adjacent inner membrane via outbound permeability on the source /// and inbound permeability on the target membrane. /// </summary> [Test] public void TypePermeatesAcross() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>(); sp.InboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.OutboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.InboundPermeableTo<InnerMembrane2, TestSemanticType>(); sp.AddChild<OuterMembrane, InnerMembrane>(); sp.AddChild<OuterMembrane, InnerMembrane2>(); sp.Register<InnerMembrane2, TestReceptor>(); sp.ProcessInstance<InnerMembrane, TestSemanticType>(true); Assert.That(callSuccess, "Expected receptor in inner membrane to process the ST placed in the adjacent inner membrane."); }
非渗透性测试
最后,我们要确保如果未建立向外或向内渗透性,语义类型不会从其膜渗透到外部或内部膜。防止渗透的代码已被注释掉。
/// <summary> /// Outer membrane does not receive semantic type if inner membrane is not outbound permeable to it. /// </summary> [Test] public void NotPermeableOut() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); // sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>(); sp.InboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.AddChild<OuterMembrane, InnerMembrane>(); sp.Register<OuterMembrane, TestReceptor>(); sp.ProcessInstance<InnerMembrane, TestSemanticType>(true); Assert.That(!callSuccess, "Expected receptor in outer membrane to NOT receive the ST placed in the inner membrane."); } /// <summary> /// Outer membrane does not receive semantic type if it is not inbound permeable to it. /// </summary> [Test] public void NotPermeableIn() { callSuccess = false; SemanticProcessor sp = new SemanticProcessor(); sp.OutboundPermeableTo<InnerMembrane, TestSemanticType>(); // sp.InboundPermeableTo<OuterMembrane, TestSemanticType>(); sp.AddChild<OuterMembrane, InnerMembrane>(); sp.Register<OuterMembrane, TestReceptor>(); sp.ProcessInstance<InnerMembrane, TestSemanticType>(true); Assert.That(!callSuccess, "Expected receptor in outer membrane to NOT receive the ST placed in the inner membrane."); }
在后台,整个渗透性问题由一个小方法处理。
/// <summary> /// Traverse permeable membranes without calling back into the caller. While membranes should not be bidirectionally /// permeable, this does stop infinite recursion if the user accidentally (or intentionally) configured the membranes thusly. /// </summary> protected void PermeateOut(IMembrane membrane, IMembrane caller, ISemanticType obj, bool processOnCallerThread) { List<IMembrane> pmembranes = ((Membrane)membrane).PermeateTo(obj); pmembranes.Where(m => m != caller).ForEach((m) => ProcessInstance(m, membrane, obj, processOnCallerThread)); }
为了阻止反弹(A 对 B 具有渗透性,B 对 A 具有渗透性),我们跟踪调用者是谁,以便当我们遍历膜层次结构向上或向下时,我们不会遍历回自己!
当然,真正的功臣是 PermeateTo 方法。
/// <summary> /// Given this membrane's outbound list, what membranes are inbound permeabe to the ST as well? /// </summary> public List<IMembrane> PermeateTo(ISemanticType st) { List<IMembrane> ret = new List<IMembrane>(); Type sttype = st.GetType(); if (outboundPermeableTo.Contains(sttype)) { // Can we traverse to the parent? if ((parent != null) && (parent.inboundPermeableTo.Contains(sttype))) { ret.Add(parent); } // Can we traverse to children? foreach (Membrane child in childMembranes) { if (child.inboundPermeableTo.Contains(sttype)) { ret.Add(child); } } } return ret; }
分布式语义计算
DSC 膜:分布式语义计算膜
DCR:分布式计算接收器
ST:语义类型
重头戏是创建一种实现 Web 服务器的有状态接收器的能力(在本例中是之前我写过的基础 Web 服务器)。使用Newtonsoft的 Json.NET 序列化,我们可以轻松地将语义类型序列化为 JSON 或从 JSON 反序列化。虽然不是最高效的序列化格式,但我选择这种格式是因为它让你了解我的下一步方向——Web 上的语义计算。但就目前而言,回到演示我们如何分布式计算语义类型的集成测试。
同样,我们需要为膜和接收器准备脚手架。
public static string received; public class TestMembrane : Membrane { } public class TestReceptor : IReceptor { public void Process(ISemanticProcessor proc, IMembrane membrane, TestDistributedSemanticType t) { received = t.Message; } } public class DistributedProcessMembrane : Membrane { } // For unit test support. Normally, each distributed system would either declare its own types // or share types through a common assembly. public class TestDistributedSemanticType : ISemanticType { public string Message { get; set; } }
这里没什么特别的。然而,测试的设置有些复杂。
/// <summary> /// Verify that a semantic type is received on a "remote" semantic processor. /// </summary> [Test] public void DistributedComputation() { SemanticProcessor spOut = new SemanticProcessor(); SemanticProcessor spIn = new SemanticProcessor(); received = ""; OutboundDistributedComputingReceptor dcrOut = new OutboundDistributedComputingReceptor(4002); InboundDistributedComputingReceptor dcrIn = new InboundDistributedComputingReceptor(4002, spIn); // Create an "emitter" in which a semantic type emitted on the TestMembrane permeates // into the inner DistributedProcessMembrane for our test type. spOut.AddChild<TestMembrane, DistributedProcessMembrane>(); spOut.OutboundPermeableTo<TestMembrane, TestDistributedSemanticType>(); spOut.InboundPermeableTo<DistributedProcessMembrane, TestDistributedSemanticType>(); // The stateful DCR out lives in the distributed process membrane. spOut.Register<DistributedProcessMembrane>(dcrOut); // Create a "receiver" in which a semantic type is received on the inner DistributedProcessMembrane // and the test type permeates out to a "handler" receptor. spIn.AddChild<TestMembrane, DistributedProcessMembrane>(); spIn.OutboundPermeableTo<DistributedProcessMembrane, TestDistributedSemanticType>(); spIn.InboundPermeableTo<TestMembrane, TestDistributedSemanticType>(); // The stateful DCR in lives in the distributed process membrane. spIn.Register<DistributedProcessMembrane>(dcrIn); // The responding receptor lives in the TestMembrane spIn.Register<TestMembrane, TestReceptor>(); // Put a semantic type instance on the outbound side. spOut.ProcessInstance<TestMembrane, TestDistributedSemanticType>((t) => { t.Message = "Hello World"; }); // Wait a bit for threads to do their thing and Http posts to do their things. // !*!*!*!* Sometimes this wait must be longer -- the unit test engine can really slow things down. // !*!*!*!* This is particularly true when running the test in the debugger! // !*!*!*!* If this delay isn't long enough for the server's message to be processed, you will get // !*!*!*!* errors related to accessing objects on an unloaded AppDomain. // !*!*!*!* In real life this woudn't happen -- this is an artifact of unit testing a complex // !*!*!*!* multi-threaded process. //Thread.Sleep(500); // Because we know it works, we could actually do this, which is particularly useful when we're // debugging and single stepping through code -- we do not want the test in this AppDomain // to exit prematurely! while (String.IsNullOrEmpty(received)) { Thread.Sleep(0); } Assert.That(received == "Hello World", "Expected to receive 'Hello World'"); }
这里有很多内容。
- 创建两个语义处理器,一个用于出站消息,一个用于入站消息。
- 在出站处理器上,声明:
- 两个膜
- 出站分布式计算接收器
- 两个膜的渗透性
- 在入站处理器上,声明:
- 两个膜
- 出站分布式计算接收器
- 处理语义类型的测试接收器。
- 两个膜的渗透性
后台是分布式计算接收器。
出站分布式语义接收器
此接收器负责序列化语义类型并将其发布到我们的服务器。
public class OutboundDistributedComputingReceptor : IReceptor<ISemanticType> { protected int outboundPort; public OutboundDistributedComputingReceptor(int outboundPort) { this.outboundPort = outboundPort; } public void Process(ISemanticProcessor proc, IMembrane membrane, ISemanticType obj) { string url = String.Format("https://:{0}/semanticType", outboundPort); string json = JsonConvert.SerializeObject(obj); // Insert our type name: json = "{\"_type_\":\"" + obj.GetType().FullName + "\"," + json.Substring(1); HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url); request.Method = "POST"; request.ContentType = "application/json"; request.ContentLength = json.Length; Stream st = request.GetRequestStream(); byte[] bytes = Encoding.UTF8.GetBytes(json); st.Write(bytes, 0, bytes.Length); st.Close(); } }
我们采用了一些技巧:
- 接收器接收所有语义类型对象,因此我们依赖膜过滤将我们想要远程处理的语义类型仅传递到我们的内部膜。
- 我们将“_type_”注入 JSON,以便在另一端知道要反序列化成什么类型。
入站分布式语义接收器
在入站端,我们将服务器设置为监听“/semanticType”路径并重新水化语义类型。它被发布到第二个语义处理器的内部膜,我们再次依赖膜过滤将所需类型渗透到外部膜,我们的测试接收器就在那里,等待适当的语义类型。
public class InboundDistributedComputingReceptor : IReceptor { protected SemanticProcessor sp; // the processor for the inbound types. protected Server server; protected int outboundPort; public InboundDistributedComputingReceptor(int inboundPort, SemanticProcessor sp) { this.sp = sp; server = new Server(); server.OnRequest = (session, context) => { session.Authenticated = true; session.UpdateLastConnectionTime(); }; server.AddRoute(new Route() { Verb = Router.POST, Path = "/semanticType", Handler = new AnonymousRouteHandler(server, ProcessInboundSemanticType) }); server.Start("", inboundPort); } protected ResponsePacket ProcessInboundSemanticType(Session session, Dictionary<string, object> parms) { string json = parms["Data"].ToString(); JObject jobj = JObject.Parse(json); string type = jobj["_type_"].ToString(); // strip off the _type_ so we can then instantiate the semantic type. json = "{" + json.RightOf(','); // Requires that the namespace also matches the remote's namespace. Type ttarget = Type.GetType(type); ISemanticType target = (ISemanticType)Activator.CreateInstance(ttarget); JsonConvert.PopulateObject(json, target); sp.ProcessInstance<DistributedProcessMembrane>(target); ResponsePacket ret = new ResponsePacket() { Data = Encoding.UTF8.GetBytes("OK"), ContentType = "text" }; return ret; } }
因为太神奇了,所以这是通过的集成测试。
结论
类型优先开发(由 Tomas Petricek命名)既适用于命令式语言也适用于函数式语言。我们可以使用 C# 的类型系统来创建丰富的类型,并声明式地建立类型与处理这些类型的方法之间的关系。我们还可以创建容器(膜)来创建计算岛,并控制类型实例在计算岛之间的流动。通过使用语义处理器,在“语义系统”中声明的膜、类型和接收器成为一个富有表现力的计算单元。专用接收器,如本文所示的分布式接收器,演示了创建分布式语义计算系统是多么容易。
好了,说了这么多高大上的话。这玩意儿太酷了!