远程框架的架构






4.95/5 (38投票s)
本文介绍了如果您想创建一个真正可扩展的 Remoting 框架,在架构上必须考虑的重要决策,而这些决策也可能适用于任何类型的框架。
背景
我曾用多种语言写过 Remoting 库和框架。通常我必须处理非常受限的环境,这些环境无法使用大型库、过多的内存,或者处理性能关键的场景,而那些常见的框架通常不适合这些环境。
我不会撒谎,我的框架并非总是完美(如果真的存在完美的话)。有时我过于注重功能实现,导致代码中产生错误的依赖。有时我过度关注性能,从而剥离了解决方案中的许多功能。现在,考虑到我之前的文章都关于架构和可扩展的解决方案,我将介绍一个 Remoting 框架的架构,它既小巧又非常可扩展,对于那些需要最小内存和性能开销的应用程序非常有益,并且在未来需求发生变化需要处理每秒数千次请求的服务器时也非常适用。
重要提示
在本文中,我将讨论我做出的一些重要决策,这些决策使得最终解决方案可以在不修改现有代码的情况下进行扩展。
尽管如此,本文并未提供所有可能的扩展或性能最佳的代码,仅是基本实现。我认为这是文章的最佳呈现方式,因为目的不是解释如何实现最佳性能或每个扩展如何工作,而是给出重要的提示,以便解决方案能够“封闭”以供修改,同时仍能实现大量扩展。
所以,如果您对 WCF 满意,不想尝试这个不完整的解决方案,我理解,但请记住,这里提供的代码不如我在生产环境中使用的代码完整(但这里提供的部分已经为生产环境做好了准备)。同样,如果您认为编写(甚至理解)另一个 Remoting/通信框架是浪费时间,因为它永远无法像其他解决方案那样完整,那么我希望它对于那些有兴趣学习其工作原理的人仍然有用,因为这里所做的架构决策适用于许多类型的解决方案。
定义基本接口
在第一步中,我将仅定义解决方案的接口。我们可以说这非常类似于使用 UML 创建类图,但实际上,当完成 UML 图后,人们通常会开始编写类,而我真的希望将事物保持为接口。众所周知,我们应该面向接口编程,而不是实现,但人们通常将接口理解为类型的公共方法,并认为我们可以重新实现这些方法而不改变客户端。
但我想提供一个可扩展甚至可替换的封闭解决方案,所以我首先提供接口,这些接口可以被实现或重新实现,而无需更改接口本身(或它们所在的库)。
为此,我首先考虑我对库的预期最基本的使用方式。我期望用户执行类似的操作:
ICalculator calculator = remotingClient.GetService<ICalculator>();
decimal result = calculator.Sum(55, 2);
如果他们实际上在客户端应用程序中有一个 ICalculator
接口,或者如果没有这样的接口,我期望他们做这样的事情:
object result = remotingClient.Invoke("Calculator", "Sum", 55, 2);
基于此,我可以轻松提取一个 IRemotingClient
接口,如下所示:
public interface IRemotingClient
{
T GetService<T>();
object Invoke(string serviceName, string method, params object[] parameters);
}
拥有这样的接口,我已经可以编写许多不同的实现。我可以编写一个返回本地对象的假实现,并可能使用反射调用本地方法;我可以编写一个通过 TCP/IP、命名管道通信的实现,使用 XML 序列化、二进制序列化等。
但是,这样一个简单的接口无法帮助接口的实现者正确地编写类。很容易有人编写一个仅支持 TCP/IP 的实现,仅支持按直接名称调用方法而不支持重载,可能使用特定的序列化器,例如 BinaryFormatter
,而没有更改它的可能性。
当然,我们可以将此视为实现错误。接口只告诉我们它需要这两个方法,但我想要的是一个尽可能可扩展的。实际的接口仅意味着我可以有不同的实现,但它并没有说明对现有实现的扩展。
其他接口
我可以看到,当我执行 GetService
调用时,代码必须创建一个代理。也就是说,代码必须创建一个实现给定接口的本地对象,然后重定向到远程服务器。
我还看到,要调用远程方法,代码必须创建某种消息,以某种格式(二进制、XML 等)对其进行序列化,并通过某种通信协议(tcp/ip、命名管道等)发送。
我认为这是很多职责,最好的做法是将这些职责委托给它们自己的接口。我们不希望 Remoting 客户端负责代理生成、消息序列化和传输。这将导致“类爆炸”,如果我们需要所有组合,或者更可能的是,这将使我们的解决方案仅限于某些情况,例如支持方法重载的二进制实现,使用命名管道,或者不支持方法重载的 XML 实现,并通过 TCP/IP 工作。
因此,我看到的职责如下:
- MessagePorts - 发送/接收消息;
- Serialization - 将消息转换为/从字节数组;
- Transport - 将字节数组发送到远程地址/从该地址接收数据;
- Proxy generation/loading - 请求服务时,必须返回一个本地代理,无论是通过加载预编译的代理库还是在运行时创建;
- Name provider - 服务名称的发现也必须委托。那么,它是通过名称
ICalculator
、Calculator
还是像“CalculatorService”这样的名称发现的ICalculator
? - Remoting Client 本身 - 其职责是通过消息端口创建要发送的消息并处理结果,将所有其他工作委托出去。
如果注意的话,我没有说 messageports 发送字节。它们发送消息(对象)。我不是强迫实现者先序列化数据然后再使用消息端口,我是故意这样做的,因为在同一应用程序域内的通信中,我们可以简单地将消息对象从一个线程传递到另一个线程,而无需实际序列化任何内容。
同样,序列化过程可以通过多种方式实现。某些序列化算法需要提供流,它们写入该流,并且根据情况,这实际上是直接发送数据。但这对 TCP/IP 来说是很糟糕的。对于 TCP/IP,最好在发送数据之前缓冲数据(这可以通过使用装饰器来完成,但这不是理想的方式)。序列化可以简单地将数据写入内存,然后返回一个字节数组。它甚至可以直接写入非托管内存。
因此,我没有强迫消息端口使用这些选项中的任何一个,但我提供了一个我认为最常见的版本的接口:内存序列化。我认为它是最常见的,因为远程方法调用通常大小有限。如果我们想传输 500MB 的数据,我们应该编写代码一次传输小块数据(所以我们可以说每次请求都是一个像 GetBlock(blockNumber)
这样的调用)。
我并没有真正强迫消息端口使用该接口,但如果它们使用了,我就可以编写大量的消息端口,使用标准的序列化方式,而无需指定使用哪个序列化框架。
将名称提供者与 WCF 属性进行比较
此时,我认为与 WCF 进行比较非常重要。我使用接口作为名称提供者。我没有使用 [OperationContract]
、[ServiceContract]
或类似的属性。
这样做有很多原因:
- 即使我还没有展示服务器,服务也必须在服务器上注册,说明它们通过哪个接口被找到。因此,没有必要在接口上放置
[ServiceContract]
,因为它需要显式注册; - 如果您不想发布接口中的所有方法,您始终可以创建一个包含较少方法的新接口,或者使用一个返回
null
名称的名称提供者来表示应发布的方法,但这些名称提供者可以自由使用任何客户端特定的规则来实现这一点,而不仅仅是属性; - 名称提供者还允许客户端使用相同的服务接口连接到两个或多个提供相同服务但使用不同命名约定的服务器。例如,一个服务器可能在所有方法前都使用前缀,而另一个服务器可能使用不同的前缀或根本不使用前缀;
- 最终,名称提供者仍然可以使用那些
[OperationContract]
和[ServiceContract]
属性,它只是不需要它们; - 从逻辑上讲,接口就是契约。我不认为在契约上放置契约有什么意义。我可以认为在接口方法上放置
[OperationContract]
就像在[OperationContract]
方法上放置另一个属性来表示它只对本地连接或外部连接可见,例如[WanPublished]
或[LanPublished]
属性一样糟糕。
我的结论是,通过委派服务和操作名称的发现方式,我们使客户端接口摆脱了属性的束缚,同时也使框架能够更好地处理不同的用例,这是一个好现象。
客户端接口
因此,到目前为止,客户端接口在此:
public interface IClientNameProvider
{
string GetServiceName(Type serviceType);
string GetActionName(MethodInfo method);
}
public interface IMemorySerializer:
IDisposable
{
byte[] Serialize(object data, int headerSize, out int offset, out int length);
object Deserialize(byte[] buffer, int offset, int length);
}
public interface IMessagePortClient
{
object SendMessage(object message);
}
public interface IRemotingProxyProvider
{
Type TryGetServiceType(IRemotingClient remotingClient, object proxyInstance);
object GetProxy(IRemotingClient remotingClient, Type serviceType);
T GetProxy<T>(IRemotingClient remotingClient);
}
public interface IRemotingClient
{
IClientNameProvider NameProvider { get; }
object GetService(Type serviceType);
T GetService<T>();
object InvokeAction(string serviceName, string actionName, params object[] parameters);
Type TryGetServiceType(object proxyInstance);
// This is the only method I don't like in this interface,
// but I think it is really useful and it allows me to
// forbid users from using the ProxyProvider directly.
}
工厂
所有接口最终都会被类实现。但我不希望强制用户了解这些类。在某些应用程序中,用户可能只想说“我想要一个 RemotingClient 连接到某个地址”。
为了做到这一点,并帮助大多数实现正常工作,我通过工厂来实现。IRemotingClient
的工厂如下所示:
public static class RemotingClientFactory
{
public static IRemotingClient Create(string remoteAddress)
{
return Create(remoteAddress, null, null);
}
public static IRemotingClient Create(string remoteAddress, IClientNameProvider nameProvider=null, IRemotingProxyProvider proxyProvider=null)
{
var messagePort = MessagePortClientFactory.Create(remoteAddress);
return Create(messagePort, nameProvider, proxyProvider);
}
public static IRemotingClient Create(IMessagePortClient messagePort, IClientNameProvider nameProvider=null, IRemotingProxyProvider proxyProvider=null)
{
if (messagePort == null)
throw new ArgumentNullException("messagePort");
var handler = RemotingClientFactoryConfiguration.Delegate;
if (handler == null)
throw new InvalidOperationException("The RemotingClient factory is not configured correctly.");
if (nameProvider == null)
nameProvider = ClientNameProviderFactory.Create();
if (proxyProvider == null)
proxyProvider = RemotingProxyProviderFactory.Create();
return handler(messagePort, nameProvider, proxyProvider);
}
}
public delegate IRemotingClient RemotingClientFactoryDelegate
(
IMessagePortClient messagePort,
IClientNameProvider nameProvider,
IRemotingProxyProvider proxyProvider
);
public static class RemotingClientFactoryConfiguration
{
public static RemotingClientFactoryDelegate Delegate { get; set; }
}
通过这种方法,我们有了一个起点。希望开始通信的用户可以简单地使用 RemotingClientFactory.Create()
并提供远程地址,或者如果他们想要更多的控制,可以提供代理提供程序、特定的名称提供者或消息端口(而无需提供所有这些),他们不需要知道实际实现 IRemotingClient
接口的类型。
拥有这样的参数也表明 IRemotingClient
的实现预期会支持它们。我们并没有真正强制实现者使用它们,但我们展示了一个好的实现应该支持什么,这就是我们的扩展保证。任何人都可以轻松地编写一个新的消息端口,它将能够与现有的 Remoting Client 一起工作,这要归功于此工厂提供的契约。
分析实际设计
此时,我们的解决方案大部分由接口和一些委托组成。我们拥有的类都是静态类,它们充当可配置的工厂。有些人认为静态类不好,但考虑到它们可以通过委托完全配置,我认为它们并不坏,我们只是为用户提供了一个起点,但他们可以自由地替换实现来创建用于测试的假对象或其他任何原因。
我还将工厂分成两个静态类,因为我期望用户经常只使用 RemotingClientFactory
。RemotingClientFactoryConfiguration
只应在整个应用程序生命周期中配置一次,因此不应经常看到它。
这种完全可替换的设计,在我看来,已经比许多大型且全球知名的解决方案要好,这些解决方案通常以一个无法配置的静态工厂或一个只有一些扩展点但大部分工作(如代理生成)都无法让用户提供更好实现的实际实现开始。
此外,这些接口可以看作是:
- 遵循 *单一职责原则* (SRP)。考虑到类一次只实现一个接口,它们将有一个非常有限的任务。TCP/IP 通信的更改不会影响负责获取服务名称或如何生成代理的类;
- 遵循 *开放-封闭原则* (OCP)。我不需要修改这些接口。我可以提供它们在一个编译过的库中,无论是带有默认实现还是没有。然而,您可以自由添加新的消息端口类型,创建自己的命名规则等,并将其与使用这些接口的任何 Remoting Client 实现一起使用;
- 遵循 DRY (Don't Repeat Yourself) 原则:如果我能够创建一个新的命名规则并将其应用于我所有现有的 Remoting Client、消息端口等,我就避免了重复代码(例如,创建单独的接口和适配器以遵守不同的命名规则)。此外,如果我将所有方法放在一个接口中,那么这些接口的每个“组合”都可以成为一个全新的实现,因此我避免了实现数量的“爆炸”;
- 遵循 KISS (Keep It Simple, Stupid) 原则。好吧,遵循 *单一职责原则* 有助于实现这一原则,因为每个类都执行“最少的工作”来完成任务。
可能还有许多其他原则遵循了实际设计。而且,也许这个设计也违反了一些原则……原则太多了,以至于实际上有些原则相互矛盾,因此不可能完全遵循所有原则。
需要考虑(或不考虑)的许多事项
需要考虑的事项包括性能、线程安全、生命周期管理、无状态或有状态、异步操作、重新连接等等。
性能
通过使用接口使一切都变得虚拟(通过使用接口)总会影响性能,但我不能说它有多么大的问题。有许多事情对性能的影响更大,例如使用的消息端口类型、数据序列化、生成的消息类型(如果我们能将方法名替换为数字,我们可能序列化更少的数据,并在另一端更快地找到方法)等等。实际上,每个实现都可以有不同的优先级(例如,更快或更易读),通过使一切都虚拟化,我们可以用更快的解决方案替换现有的解决方案,因此虚拟调用的轻微开销并不是什么大问题。
线程安全
接口本身并没有说明线程安全,我确实认为它们不应该说明。简单的客户端(即使是大型应用程序有时也是简单的客户端)可能永远不需要并发远程访问。因此,非线程安全的实现可以适用于这类应用程序,并且它们可以通过不尝试处理线程同步来更快。然而,如果需要,可以应用许多线程安全的解决方案。我们可以为消息端口编写装饰器,有效地在 SendMessage
调用上加锁(但在这种情况下,所有调用都将被序列化,而不是并行),或者我们可以拥有能够并行发送数据的消息端口。
但线程安全也必须存在于服务器上……我还没有解释服务器的架构,但所有服务都将被简单地注册为“单例”,其线程安全由服务本身负责。并行调用将由服务器并行执行,考虑到消息可以并行到达。
有状态或无状态
实际上,GetService
调用期望始终返回相同的单例实例,因此我们可以说这基本上是无状态的。然而,对服务的调用仍然可能返回有状态的对象。如果发生这种情况,它将由 RemotingClient
实现来处理(甚至可能由序列化实现来处理,因为我们可能希望支持返回一个可序列化的对象,该对象引用一个有状态对象……这很复杂,但可能)。
因此,总而言之,默认情况下,我确实期望服务是无状态的,并且由于所有服务都是单例,因此没有生命周期管理。一旦调用结束,调用过程中使用的所有对象都可以被垃圾回收。
连接丢失和重新连接
有两种处理重新连接的方法。要么应用程序重新创建 Remoting Client,要么消息端口必须实现来处理它。
我个人更喜欢一个能够根据需要重新连接的消息端口。如果我们确实是无状态调用,那么在连接丢失后重新连接就没有问题。
异步
唯一真正令人困扰的是异步问题。到目前为止,提供的接口具有同步签名。我确实认为Async/Await Could Be Better,如果签名可以保持同步签名,但就像它今天工作一样,我们需要更改所有返回类型以使方法异步。那么,我们应该支持异步调用吗?
如果我们仅仅更改所有返回类型,我们当然会支持异步,但仅仅为了没有理由而使一切异步都会降低性能。同步方法异步化会增加开销,因为为同步结果生成任务成本很高(相信我,对于一些大型循环来说,这是一个巨大的差异)。另外,那些拥有专用线程的用户可能更喜欢阻塞线程直到工作完成,而不是处理异步调用。因此,在客户端,我认为我们必须同时支持同步和异步调用。
另一方面,我认为服务器必须是异步的。事实上,如果服务器只预期处理少量客户端,那么使用同步调用和专用线程可能会更快。但服务器更复杂,因为消息端口可能以同步或异步方式接收消息,而处理消息可能是同步或异步的,所以我将把这个问题留到以后。
我的决定是将 IRemotingClient
接口更改为如下所示:
public interface IRemotingClient
{
IClientNameProvider NameProvider { get; }
object GetService(Type serviceType);
Task<object> GetServiceAsync(Type serviceType);
T GetService<T>();
Task<T> GetServiceAsync<T>();
Type TryGetServiceTypeFromProxyInstance(object instance);
object InvokeAction(string serviceName, string actionName, params object[] parameters);
Task<object> InvokeActionAsync(string serviceName, string actionName, params object[] parameters);
}
这也让我更改了 IRemotingProxyProvider
:
public interface IRemotingProxyProvider
{
Type TryGetServiceTypeFromProxyInstance(IRemotingClient remotingClient, object instance);
object GetProxy(IRemotingClient remotingClient, Type serviceType);
Task<object> GetProxyAsync(IRemotingClient remotingClient, Type serviceType);
T GetProxy<T>(IRemotingClient remotingClient);
Task<T> GetProxyAsync<T>(IRemotingClient remotingClient);
}
以及 IMessagePortClient
:
public interface IMessagePortClient
{
object SendMessage(object message);
Task<object> SendMessageAsync(object message);
}
我知道有太多的“重复”方法。在某些情况下,我们将有“重复”的实现(一个调用同步方法,另一个调用异步方法)或一个实现重定向到另一个。
有人可能认为将同步和异步 API 作为完全独立的事物更好,并且在需要时编写适配器。但我确实期望 Remoting Client 能够处理这两种类型的调用。如果一个调用重定向到另一个,则由它们自己负责处理。
实现
在查看服务器之前,我想先介绍一下实现。
例如,IClientNameProvider
的一个非常简单的实现可以如下所示:
public sealed class ClientNameProvider:
IClientNameProvider
{
public string GetServiceName(Type serviceType)
{
if (serviceType == null)
throw new ArgumentNullException("serviceType");
return serviceType.Name;
}
public string GetActionName(MethodInfo method)
{
if (method == null)
throw new ArgumentNullException("method");
return method.Name;
}
}
它只会返回接口的名称和方法的名称。它不会尝试处理重载、泛型或类似的东西。
对于一个简单的 TCP/IP 消息端口,我们可以像这样实现它(更准确地说,它是基于任何流的实现,这对 TCP/IP 也适用):
public sealed class MessagePortClientOverStream:
IMessagePortClient
{
private Stream _stream;
private IMemorySerializer _serializer;
private readonly object _disposeLock = new object();
public MessagePortClientOverStream(Stream stream, IMemorySerializer serializer=null)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (serializer == null)
serializer = MemorySerializerFactory.Create();
_stream = stream;
_serializer = serializer;
}
public object SendMessage(object message)
{
if (message == null)
throw new ArgumentNullException("message");
int offset;
int length;
var bytes = _serializer.Serialize(message, 4, out offset, out length);
BitConverterEx.FillBytes(bytes, offset, length-4);
_stream.Write(bytes, offset, length);
_stream.FullRead(bytes, offset, 4);
length = BitConverter.ToInt32(bytes, offset);
if (length > bytes.Length)
bytes = new byte[length];
_stream.FullRead(bytes, 0, length);
var result = _serializer.Deserialize(bytes, 0, length);
return result;
}
private bool _inAsyncCall;
public async Task<object> SendMessageAsync(object message)
{
if (message == null)
throw new ArgumentNullException("message");
if (_inAsyncCall)
throw new NotSupportedException("This message port doesn't support two pending async calls. Await the first call before doing the second async call.");
try
{
_inAsyncCall = true;
int offset;
int length;
var bytes = _serializer.Serialize(message, 4, out offset, out length);
BitConverterEx.FillBytes(bytes, offset, length-4);
await _stream.WriteAsync(bytes, offset, length);
await _stream.FullReadAsync(bytes, offset, 4);
length = BitConverter.ToInt32(bytes, offset);
if (length > bytes.Length)
bytes = new byte[length];
await _stream.FullReadAsync(bytes, 0, length);
var result = _serializer.Deserialize(bytes, 0, length);
return result;
}
finally
{
_inAsyncCall = false;
}
}
}
实际上,我认为最困难的部分是代理的创建。事实上,我有一个个人解决方案,它在运行时实现接口,并且 *非常* 快,但 .NET 提供的解决方案,即使它没有那么快,也相对容易使用,但文档不够完善。
我们必须实现一个 RealProxy
类,它有一个 Invoke
方法重定向到 IRemotingClient.InvokeAction
,然后为它获取一个透明代理。实现这一点的代码如下:
// Note: This is not the final implementation.
public sealed class RemotingProxy:
RealProxy
{
private IRemotingClient _remotingClient;
public RemotingProxy(IRemotingClient remotingClient, Type serviceType):
base(serviceType)
{
_remotingClient = remotingClient;
}
public override IMessage Invoke(IMessage msg)
{
// Personally I don't understand why the message comes with the wrong cast.
var message = (IMethodCallMessage)msg;
var result = _remotingClient.Invoke(typeof(T).FullName, message.MethodBase.Name, message.InArgs);
// In fact, if the method is async we should be using the _remotingClient.InvokeAsync, but
// I will let you look at the real implementation by downloading the source code.
return new ReturnMessage(result, null, 0, null, null);
}
}
有了这样的代理,我们就可以创建本地实现来重定向调用到远程服务器。这样做就足够了:
var realProxy = new RemotingProxy(remotingClient, typeof(ISomeInterface));
ISomeInterface objectToUseForRemoting = (ISomeInterface)realProxy.GetTransparentProxy();
从那时起,只需通过其接口使用 objectToUseForRemoting
。
并实现 IRemotingProxyProvider
,我们这样做:
public sealed class RemotingProxyProvider:
IRemotingProxyProvider
{
public Type TryGetServiceTypeFromProxyInstance(IRemotingClient remotingClient, object instance)
{
var realProxy = RemotingServices.GetRealProxy(instance);
var ourRealProxy = realProxy as RemotingProxy;
if (ourRealProxy == null)
return null;
if (remotingClient != ourRealProxy._remotingClient)
return null;
return ourRealProxy._serviceType;
}
public object GetProxy(IRemotingClient remotingClient, Type serviceType)
{
var realProxy = new RemotingProxy(remotingClient, serviceType);
return realProxy.GetTransparentProxy();
}
public Task<object> GetProxyAsync(IRemotingClient remotingClient, Type serviceType)
{
return Task.Factory.StartNew(() => GetProxy(remotingClient, serviceType));
}
public T GetProxy<T>(IRemotingClient remotingClient)
{
return (T)GetProxy(remotingClient, typeof(T));
}
public Task<T> GetProxyAsync<T>(IRemotingClient remotingClient)
{
return Task.Factory.StartNew(() => GetProxy<T>(remotingClient));
}
}
最后,RemotingClient 只是将事物粘合在一起……好吧,做这件事实际上是很多工作,但至少它没有做其他事情。我不会在这里展示它的代码,所以如果你想看它的代码,请加载示例。
服务器
我通过编写 IRemotingServer
接口开始服务器。我的最初想法是用户会像这样调用它:
remotingServer.RegisterService(typeof(IInterface), new RealImplementation());
所以我立即向接口添加了这样一个注册方法。我还考虑过使该方法泛型,以便在编译时验证给定实例的类型,但我实际上不喜欢仅为了进行验证而创建泛型方法,而且我总是喜欢拥有一个非类型化的解决方案,因为在某些情况下,在不了解编译时类型的情况下进行调用会更容易。
然后,通过使用我在 IRemotingClient
接口中使用的相同原理,我想到“用户是否可以注册一个 *动态* 服务?”我的意思是,而不是有一个接口和一个实际实现它的实例,用户是否应该能够通过简单地提供服务名称、操作名称和委托来在运行时构建服务?
在我看来,这应该是可能的,因为我不知道这个库的用户真正需要做什么。也许用户可以通过读取包含方法名称和非常基本表达式的文本文件来构建服务,而我不想强迫他们只为做这件事而在运行时创建真实类型。所以我向接口添加了 RegisterAction 方法。
现在重要的问题是:服务器将如何开始接收请求?
我一直在犹豫是在接口中放置 ProcessMessage 还是向接口放置 AddListener()。
AddListener()
似乎是最自然的方式。ProcessMessage()
使我有可能模拟调用而无需实际客户端,因此它更加松散耦合。
然而,不放置 AddListener()
对我来说似乎太奇怪了,因为它看起来就像服务器类根本不是服务器一样。所以我决定应该放置一个 AddListener()
,因此 ProcessMessage()
就变得不必要了。
因此,基本服务器接口最终如下所示:
public interface IRemotingServer:
IRemotingDisposable
{
void RegisterService(Type serviceType, object implementation);
void RegisterAction(string serviceName, string actionName, RemotingAction action);
void AddListener(IMessagePortListener listener);
void RegisterImplementationSearcher(ServiceImplementationSearcher searcher, decimal priority);
void RegisterActionSearcher(ServiceActionSearcher searcher, decimal priority);
}
IMessagePortListener
在 IMessagePortClient 的服务器端,我们同时拥有 IMessagePortServer 和 IMessagePortListener。这是因为一个服务器可以从同一个监听器接收多个连接。
在我看来,监听器应该是异步的。我最初想放置一个 AcceptAsync()
方法,该方法返回一个 Task<IMessagePortServer>
,但我放弃了这样的想法,因为这会将接受连接的循环的责任推给客户端。
所以我做了一些更异步的事情。它最初接收一个用于处理消息的委托,以及一个可选的委托,用于在客户端连接时发出通知。
有了这样的 processMessage
,消息何时或如何接收并不重要。如果内部实现是同步的还是异步的,它只需要调用 processMessage
委托,它就会完成工作。
但是,如果处理消息最终调用异步方法怎么办?
这种想法让我将 processMessage
委托更改为始终返回一个可以 await 的 Task<object>
,所以我将其声明如下:
public delegate Task<object> ProcessMessageHandler(IMessagePortServer server, object message);
稍后我做了一些测试,当它真正是异步的时候效果很好,但我的第一次测试如果消息端口实现为同步的话,它太慢了。事实是我在做一个糟糕的实现。processMessage
处理程序不需要是异步的就可以创建任务。通过使用 Task.FromResult
进行同步调用,我能够获得非常好的速度。要提供一些真实信息,通过使用异步消息端口,我的单线程通信速度已经是 WCF 的两倍……使用同步则快了近四倍(所以,比 WCF 快了近 8 倍)。
注意:使用同步消息端口不是可扩展的,但如果您的需求是一个非常快速的本地服务器,那么它效果很好。而且令人难以置信的是,许多服务器实际上只处理本地连接,因为在 SOA 架构中,一个服务器接收外部调用(因此异步是必须的),然后其他服务器只接收来自该服务器的本地连接,因此对于那些其他服务器来说,每个连接一个线程并不是那么糟糕。
其他细节
在服务器方面,我们还有其他细节需要处理。例如,如果客户端调用了一个服务器上不存在的操作,会发生什么?
我们应该只返回一个错误吗?嗯,如果您查看文章 Software Architecture,您会看到我认为我们应该始终提供一个允许执行操作的事件,而不是失败。只有当事件无法完成操作时,我们才应该允许失败。
即使这看起来像一个实现细节,但我还是想清楚地说明,实现应该支持调用一个尚未注册的操作。但事实是:这种未注册的操作可能是名称提供者而不是 IRemotingServer
的职责。
这可能发生,例如,如果我们有一个泛型方法,它在参数中不使用泛型类型(例如 Get<T>()
方法,我们无法从用法中推断出泛型参数)。在这种情况下,我们可能没有为所有有效的 T
可能性注册一个有效操作,但名称提供者可能能够识别 Get<int>
实际上正在查找方法 Get<T>
的 int
实现。
因此,我决定名称提供者应该分为客户端和服务器,从而有效地创建 IServerNameProvider
,它也必须能够从未注册的服务名称返回一个服务类型,或者返回未注册操作的方法信息。但是,这只应该适用于用户尝试注册的类型,并且可能具有替代名称的类型,而不是内存中加载的任何类型。
最后,如果名称提供者找不到结果,那么 IRemotingServer
中的最后一个机会事件,它仍然可以为请求的 actionName
提供一个操作。
注意: RegisterSearcher
实际上是注册事件。我稍后会解释为什么它不是一个普通事件。
此时,IRemotingServer
和 IServerNameProvider
接口如下所示:
public interface IRemotingServer
{
void RegisterService(Type serviceType, object implementation);
void RegisterAction(string serviceName, string actionName, RemotingAction action);
void AddListener(IMessagePortListener listener);
void RegisterImplementationSearcher(ServiceImplementationSearcher searcher, decimal priority);
void RegisterActionSearcher(ServiceActionSearcher searcher, decimal priority);
}
public interface IServerNameProvider
{
string RegisterServiceType(Type serviceType);
string RegisterServiceMethod(Type serviceType, MethodInfo method);
Type TryGetServiceType(string serviceName);
MethodInfo TryGetMethod(Type serviceType, object serviceInstance, string actionName);
}
至于实现,我将让您在 zip 文件中查看。我不认为它很难,因为每个类做的很少,但它不像客户端实现那么简单。
接口、实现和库
我一直认为创建太多库是一件坏事。在大多数项目中,人们宁愿将一个库放入他们的应用程序中,而不是放入 2、3 个或更多的库。在某些情况下,开发人员宁愿将一个 .cs 文件放入他们的项目中,以避免 DLL 引用,即使该文件包含许多类。
同时,我知道有些人讨厌在只想要一两个类时引用一个大库。
我个人是这样看待的:
- 客户端应用程序不需要服务器接口;
- 服务器应用程序不需要客户端接口;
- 编写所有客户端接口实现的人不需要默认的客户端实现;
- 编写所有服务器接口实现的人不需要默认的服务器实现。
如果我真的遵守这一点,我将得到 4 个库(实际上是 5 个,因为我的实现实际上依赖于另一个库)。
但我决定稍微减少一下。客户端和服务器接口在同一个库中。客户端和服务器实现也在同一个库中。但是实现和接口被隔离在它们自己的库中。
这确实是为了表明,如果您不喜欢我当前的实现,您可以完全重写它……但客户端可以继续依赖接口,一切都会正常工作。
IRemotingDisposable
到目前为止,唯一缺少的“停止”服务器、“关闭”消息端口等功能。
为了支持这一点,实现 IDisposable
接口非常容易,我确实认为所有重要的接口都应该是可处置的。
这意味着 IRemotingClient
、IMessagePortClient
、IRemotingServer
、IMessagePortListener
和 IMessagePortServer
都应该是可处置的。
但是,特别是关闭消息端口时,我想能够检测到这一点,并且还停止连接到它的 Remoting Client,或者如果消息端口监听器在没有实际处置服务器的情况下被处置,则停止监听。我还知道许多用户可能有自己的理由在发生这种情况时得到通知。
为了做到这一点,我想要一个可观察的 Disposable,也就是说,一个在处置时通知对象的对象。长期以来,我的基本库中一直有 IObservableDisposable
接口,但我不想将这样的引用添加到接口中,因为我理解其他开发人员可能不想使用我的其他库,而简单地引用一个接口将需要将该库放在一起。
因此,为了正确支持这一点,我创建了 IRemotingDisposable
接口,所有 Remoting 接口都必须依赖它。它是一个 IDisposable
,具有额外的 IsDisposed
属性和 Disposed
事件,因此您可以在其处置时收到通知。
接口本身没有说明的一件重要事情是,如果对象已经被处置,则注册到 Disposed
事件必须立即调用处理程序,而不是真正注册它。
在实现中,由于我实际上使用了我的基础库,我同时支持 IRemotingDisposable
和来自我的基础库的 IObservableDisposable
,但当您在代码中使用它时,如果您不想,则无需引用我的其他库。
注意: Pfz 库现在非常小。它只是一个基础类库,用于支持 Managed Thread Synchronization,一些有用的集合(如 ThreadSafeHashSet 和 ThreadSafeDictionary,它类似于 ConcurrentDictionary 但没有其问题并且还有一些额外的方法 [它在 Dictionary + Locking versus ConcurrentDictionary 中介绍])和 ReflectionHelper 类,如果您想创建非常快的委托来访问 MethodInfo,它非常有用,而且,所有这些都被实际实现所使用。
同步通过异步,异步通过同步
消息端口同时支持同步和异步调用,但我们如何调用具有同步签名的作为异步的方法(反之亦然)?
好吧,第一个解决方案是使用 InvokeActionAsync
方法。不幸的是,在这种情况下,您必须知道 serviceName
和 actionName
将如何编码,并且您还必须填写正确的参数数量,而没有 IntelliSense 支持。
我曾考虑过再添加一个帮助类,通过提供表达式来调用异步方法。我甚至让它工作了,但我不喜欢结果。每次调用都创建和解析表达式会导致严重的性能损失。
我能想到的最佳解决方案是创建另一个具有相同名称但位于不同命名空间中的接口,并具有所有相同的签名,只是将返回类型从 void
更改为 Task
,将其他类型更改为 Task<ThatOtherType>
。
因此,通过创建如下所示的 ICalculator
:
public interface ICalculator
{
Task<decimal> Sum(decimal a, decimal b);
}
我们将能够为此获取一个服务,然后进行一次干净的异步调用,如下所示:
await asyncCalculator.Sum(55, 2);
唯一重要的是,您应该使用一个能够为同步和异步接口生成相同名称的名称提供者。
由于默认名称提供者只使用类型名称,而不是命名空间,因此在 Async 命名空间中创建同名接口可以工作,但您可以使用一个移除类型名称中 Async 的名称提供者,这样您就可以将两个接口放在同一个命名空间中,或者您甚至可以使用一个移除方法名称中 Async 的名称提供者,这样您就可以将同步和异步方法放在同一个客户端接口中,而它们仍然调用服务器上的相同方法。
“搜索事件”
此库中的事件非常特别。普通事件使用 event
关键字,它在内部由 add
和 remove
方法组成,它们没有优先级,如果我们遵循微软的指南,事件处理程序以 object sender
参数开头。
那么,我为什么不做这个呢?我的事件实际上是通过那些 RegisterSearcher
方法实现的,处理程序不接收 sender
,参数不继承自 EventArgs
类型,并且我使用优先级。这难道不是对指南的严重违反吗?
好吧,它们违反了指南,但我考虑了许多其他因素。例如,大多数事件只是通知事件,它们不期望生成结果,因此处理程序的执行顺序并不重要。
但搜索者是一种特殊的事件。它们应该尝试提供结果。当考虑到在初始化期间可以提供一个通用搜索器时,我必须考虑有人可能想在之后注册一个更具体的搜索器,但它应该先运行。仅仅按照注册的相反顺序执行它们也没用……我不知道稍后进行的注册是否比第一次注册更具体。
因此,我的解决方案是使用优先级。
为什么没有 sender 参数
我使用接口,因为我期望实现可以被替换和 *装饰*。但当我将一个事件处理程序给一个装饰器时会发生什么?
装饰器可能会将事件处理程序直接传递给真实对象。但如果它这样做,那么在调用事件时,sender
将不是装饰器,而是真实实例。我们可以认为这是正确的(因为它将是真实的发送者),但这可能会很奇怪,将事件放在一个线程安全的装饰器上,然后将线程不安全的实现作为 sender
接收。
当然,装饰器可以以不同的方式实现,有效地将自己注册到被装饰对象的事件上,并拥有自己的事件,所以当事件被调用时,装饰器会拦截它并调用自己的事件实现,纠正发送者。但是,如果我们这样做,我们将使装饰器的生成变得更加困难,因为它们也应该处理优先级。另外,装饰器处理程序的优先级是什么?如果我们认为装饰器将其自己的处理程序放入主对象,而主对象可能已经具有一些优先级,那么我们必须使用正确的优先级。
我当然可以为每个处理程序编写一个装饰器,但与其复杂化事情,不如避免 sender
。如果拥有 sender
非常重要,用户总是可以创建另一个对象,该对象拥有对正确“发送者”的引用,然后注册该对象的一个方法作为处理程序,该方法将可以访问“发送者”。
为什么没有 Unregister 方法?
我个人认为 Unregister
方法带来了太多的麻烦,但几乎没有好处。例如,如果我注册了一个执行、给出结果(已缓存)然后被注销的搜索器。生成的注册信息是否应该被注销?如果答案是肯定的,我们如何控制这一点?
即使我知道如何做到这一点,我们也应该为这种情况使代码复杂化吗?老实说,多久会有人想注销一个搜索器?
所以,我可以这样说,这里有两个想法。一是使库完整。作为一个库,我们永远不知道用户的需求,他们可能总是想要更多。我能想象一个服务器管理器决定暂时注销一些服务。
另一方面,我试图先提供接口,尝试帮助其他开发者轻松地完成工作,所以我不希望事情复杂化。因此,我可以这样说,我遵循 *YAGNI - You aren't gonna need it* 原则。注销东西太不常见了,我们可以很容易地通过创建一个新对象(如 RemotingServer
)并注册所有必需的项(除了我们想要“删除”的项)来解决。当然,这将强制所有人断开连接,而不是只影响那些实际使用已删除服务的用户,但这是一个变通方法,而不是解决方案。
最后,我们总是可以创建另一个接口,其中包含额外的所有方法,并创建一个实际实现这些额外方法的实现。所以,最终,我认为这是一个选择问题,我的选择是让事情更简单。
示例
示例是一个非常简单的应用程序,分为一个带有 Calculator 服务的服务器和一个实际上使用 ICalculator
接口数千次的客户端,以同步模式和异步模式运行,还提供了使用 WCF 的这些测试版本。
根据您运行此示例的方式,您会看到 WCF 的运行速度比我的实现快。但老实说,我在这篇文章中提供的代码根本没有优化,它只是实现了接口以使其功能化和易于理解,而不是为了速度。通过简单地将 .NET 序列化替换为另一个二进制序列化,可以使其速度大大提高(如果您查看 MemorySerializer
源代码,您会看到我注释掉了一些使用了另一个序列化器的代码)。
示例代码真正重要的部分是证明该框架正在工作,并为您提供一个测试它的起点。通过查看它,您会注意到 ICalculator
没有属性,而 WcfCalculator
需要属性。另外,我没有使用它,但您可以使用 RegisterAction()
方法,提供服务名称和操作名称以及委托来注册整个服务,您可以通过 RemotingClient
的 InvokeAction
或 InvokeMethodAsync
直接调用。我不知道如何在 WCF 中轻松做到这一点。
我可以使用这个库替换 WCF 吗?
正如我多次说过的,我在这篇文章中提供的实现非常简单,没有任何优化。我确实相信它已为生产环境做好准备,但仅限于某些非常有限的场景。
考虑到它实际上没有超时、支持重新连接、从多个线程调用方法或任何类型的安全性,我只能说它适用于本地通信,如果需要由多个线程使用,则每个线程需要创建一个 Remoting Client。当然,它可以改进以处理更复杂的场景,并且通过正确的实现,它可以超越 WCF,但您必须在生产环境中使用它之前进行尝试,并且您还应该理解主库的接口。我相信,了解这些简单的接口如何提供如此多的可能性,至少会很有趣。