不可见的进程间通信
使用依赖注入,无缝地与远程服务通信。
引言
多年来,实现客户端/服务器应用程序的远程组件通信一直很麻烦。总是需要将数据序列化为某种协议消息,然后再反序列化。应用程序依赖于传输技术和格式。如果所有技术绑定都可以被移除,只关注功能契约,那该多好啊!
依赖注入多年前就已经消除了应用程序内部的技术依赖。我的想法是将这个概念扩展到应用程序之外。结果是一个名为 IOC-Talk 的开源库。
我知道 WCF 提供了一些相同的承诺。但它的配置非常混乱,并且您会与 WCF 本身紧密绑定。另一个原因是实现面向会话的双向通信场景很困难。
管道
目的是设计独立的功能接口,并以标准依赖注入场景中的方式使用/实现它们。但这还不够。我们需要一个微小的上层来管理不同的连接。IOC-Talk 为每个会话创建一份契约服务的实例。所有服务接口的导入都封装在一个自定义的契约会话类中(定义了双向通信支持的服务)。
轻量级开源库
只有当您不会陷入另一个死胡同时(依赖),才建议使用这样的库。因此,IOC-Talk 的设计旨在支持无缝替换传输技术和消息序列化层,而无需更改您的功能业务代码。唯一的接触点是选择的依赖注入容器,以及您是否需要功能会话管理。
当前可用的 IOC-Talk 库实现了以下具体的软件堆栈:
- MEF 依赖注入容器
- JSON 消息序列化
- TCP/IP 套接字通信
工作原理
以下顺序描述了会话建立过程:
- 初始化通信基础设施,并分配本地会话契约类(包含所需的本地和远程接口导入)。
- 如果连接建立成功,将创建一个自定义契约类的新实例。
- 请求依赖注入容器来满足创建的契约会话实例的所有导入。
- 如果依赖注入容器找不到本地服务实现,会自动实现一个代理实现来调用远程服务(也支持自定义代理实现)。
- 在此进程中创建的所有实例都分配给连接会话。
- 触发会话创建事件/接口方法
现在,使用会话契约实例,应用程序可以调用远程端的任何接口服务,并能够处理本地服务实现的传入调用请求。
以下顺序描述了远程服务调用的过程:
- 功能应用程序代码使用会话契约中的实例调用接口方法。
- 这将触发代理实现的方法。
- 代理实现会将调用和提供的参数重定向到通信主机。
- 然后,调用将使用序列化服务(JSON)序列化为消息,并发送到远程端点(TCP)。
- 在远程端,消息被反序列化。
- 请求依赖注入容器获取目标接口的一个实例。
- 调用方法,并将返回值发送回源。
使用代码
为了演示库的实际应用,我将一步一步地设置一个简单的客户端/服务器应用程序。该应用程序将记录多个节点的 CPU 利用率。为此,我们将实现一个客户端控制台应用程序,该应用程序定期测量 CPU 利用率,并将结果发布到一个中央服务器应用程序。
服务器控制台应用程序将记录结果。
- 首先,我们在 Visual Studio 中创建一个名为“PerformanceRecorder”的空白解决方案。
- 我们添加 C# 类库“PerformanceRecorder.Interface”来定义接口。
- 我们需要一个功能性的服务接口来为我们的中央服务。
public interface IPerformanceObserverService
{
void PublishProcessorUsage(IProcessorUsage pUsage);
}
- 以及一个数据传输对象的接口。
public interface IProcessorUsage
{
/// <summary>
/// Gets or sets the measure time
/// </summary>
DateTime Time { get; set; }
/// <summary>
/// Gets or sets the cpu usage in percent
/// </summary>
int UsagePercent { get; set; }
/// <summary>
/// Gets or sets the source machine name
/// </summary>
string Source { get; set; }
}
- 现在,我们的服务契约已经定义,我们可以开始实现服务了。
- 添加一个新的类库“PerformanceRecorder.Observer”,并添加对“PerformanceRecorder.Interface”的引用。
- 创建一个名为“PerformanceObserverService”的新类,并继承自“IPerformanceObserverService”。
public class PerformanceObserverService : IPerformanceObserverService
{
public void PublishProcessorUsage(IProcessorUsage pUsage)
{
Console.WriteLine("{0} - {1} % CPU usage on \"{2}\"", pUsage.Time.ToLongTimeString(), pUsage.UsagePercent, pUsage.Source);
//todo: serialize and write to local file
}
}
- 为了能够被依赖注入发现,我们添加对 .NET MEF 库(System.ComponentModel.Composition)的引用,并导出我们的服务接口。
using System.ComponentModel.Composition;
// ...
[Export(typeof(IPerformanceObserverService))]
public class PerformanceObserverService : IPerformanceObserverService
- 为了运行我们实现的的服务,我们创建一个新的控制台应用程序“PerformanceRecorder.ServiceConsole”,并添加对“.Interface”和“.Observer”的引用。
- 每个物理连接都由一个契约会话类表示,该类导入通信所需的所有服务接口。这包括本地和远程实现的接口。在此示例中,我们仅支持一个服务器端实现的的服务。
using System.ComponentModel.Composition;
using PerformanceRecorder.Interface;
namespace PerformanceRecorder.ServiceConsole
{
public class PerfRecordServiceContract
{
[Import]
public IPerformanceObserverService ObserverService { get; set; }
}
}
- 现在是时候向 ServiceConsole 项目添加 IOC-Talk 引用了。
我们安装了 NuGet 包“ioctalk”。
- 为了完成服务器端,我们使用服务器控制台项目中的 App.config 将所有内容连接起来。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<configSections>
<section name="IOCTalk" type="BSAG.IOCTalk.Communication.Common.Factory.AppConfigSection, BSAG.IOCTalk.Communication.Common"/>
</configSections>
<IOCTalk>
<SessionContract type="PerformanceRecorder.ServiceConsole.PerfRecordServiceContract, PerformanceRecorder.ServiceConsole" name="Sample Service">
<Communication type="BSAG.IOCTalk.Communication.Tcp.TcpCommunicationController, BSAG.IOCTalk.Communication.Tcp">
<ConnectionType>Service</ConnectionType>
<Port>50123</Port>
<Security enabled="false">
<CertificateName>YourCertificateName</CertificateName>
<ClientCertificateRequired>false</ClientCertificateRequired>
</Security>
</Communication>
<Container type="BSAG.IOCTalk.Container.MEF.TalkContainerHostMEF`1, BSAG.IOCTalk.Container.MEF" />
</SessionContract>
</IOCTalk>
<system.diagnostics>
<trace autoflush="false" indentsize="4">
<listeners>
<add name="configConsoleListener"
type="System.Diagnostics.ConsoleTraceListener" traceOutputOptions="DateTime" />
</listeners>
</trace>
</system.diagnostics>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
</configuration>
- 在 Program.cs 的 Main 方法中,我们初始化通信基础设施。
using BSAG.IOCTalk.Communication.Common.Factory; using System; namespace PerformanceRecorder.ServiceConsole { class Program { static void Main(string[] args) { Console.WriteLine("-------------------------------------------"); Console.WriteLine("Performance Recorder Server"); Console.WriteLine("-------------------------------------------"); Console.WriteLine(); var commServices = IOCTalkLoader.InitFromAppConfig(); Console.WriteLine(); Console.WriteLine("Press [Enter] to exit"); Console.ReadLine(); foreach (var commService in commServices) { commService.Shutdown(); } } } }
- 服务器现在已准备好接受端口 30123 上的客户端会话。
- 客户端实现需要类似的管道,但不需要服务实现。
- 新的控制台应用程序:“PerformanceRecorder.ClientConsole”
- 引用:“PerformanceRecorder.Interface”,MEF 和 IOC-Talk(Nuget 包)。
- 实现客户端会话契约并导入远程服务接口。
public class PerfRecordClientContract
{
[Import]
public IPerformanceObserverService ObserverService { get; set; }
}
- 扩展客户端控制台 App.config,如下所示:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<configSections>
<section name="IOCTalk" type="BSAG.IOCTalk.Communication.Common.Factory.AppConfigSection, BSAG.IOCTalk.Communication.Common"/>
</configSections>
<IOCTalk>
<SessionContract type="PerformanceRecorder.ClientConsole.PerfRecordClientContract, PerformanceRecorder.ClientConsole" name="Sample Client">
<Communication type="BSAG.IOCTalk.Communication.Tcp.TcpCommunicationController, BSAG.IOCTalk.Communication.Tcp">
<ConnectionType>Client</ConnectionType>
<Host>127.0.0.1</Host>
<Port>30123</Port>
<Security enabled="false">
<ServerName>YourCertificateName</ServerName>
<ProvideClientCertificate>false</ProvideClientCertificate>
<ClientCertificateName>YourCertificateName</ClientCertificateName>
</Security>
</Communication>
<Container type="BSAG.IOCTalk.Container.MEF.TalkContainerHostMEF`1, BSAG.IOCTalk.Container.MEF">
<CreateDebugEnabledAssembly>true</CreateDebugEnabledAssembly>
</Container>
</SessionContract>
</IOCTalk>
<system.diagnostics>
<trace autoflush="false" indentsize="4">
<listeners>
<add name="configConsoleListener"
type="System.Diagnostics.ConsoleTraceListener" traceOutputOptions="DateTime" />
</listeners>
</trace>
</system.diagnostics>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
</configuration>
- 在客户端控制台应用程序中初始化通信基础设施。
static void Main(string[] args)
{
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Performance Recorder Client");
Console.WriteLine("-------------------------------------------");
Console.WriteLine();
var commServices = IOCTalkLoader.InitFromAppConfig();
Console.WriteLine();
Console.WriteLine("Press [Enter] to exit");
Console.ReadLine();
foreach (var commService in commServices)
{
commService.Shutdown();
}
}
- 我们几乎达到了目标。唯一剩下的事情就是在客户端建立连接后,定期推送 CPU 使用率。为此,我们在“PerfRecordClientContract”类中实现“ISessionStateChanged”接口。我们使用“OnSessionCreated”方法启动一个计时器,该计时器定期将当前处理器使用率推送到远程服务器。
using System;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Timers;
using BSAG.IOCTalk.Common.Exceptions;
using BSAG.IOCTalk.Common.Interface.Session;
using PerformanceRecorder.Interface;
namespace PerformanceRecorder.ClientConsole
{
public class PerfRecordClientContract : ISessionStateChanged
{
private Timer pushTimer;
private PerformanceCounter cpuCounter;
public PerfRecordClientContract()
{
cpuCounter = new PerformanceCounter();
cpuCounter.CategoryName = "Processor";
cpuCounter.CounterName = "% Processor Time";
cpuCounter.InstanceName = "_Total";
pushTimer = new Timer(1000);
pushTimer.Elapsed += OnPushTimer_Elapsed;
}
[Import]
public IPerformanceObserverService ObserverService { get; set; }
public void OnSessionCreated(ISession session)
{
pushTimer.Start();
Console.WriteLine("Processor usage measurement started");
}
public void OnSessionTerminated(ISession session)
{
pushTimer.Stop();
pushTimer.Dispose();
cpuCounter.Dispose();
}
private void OnPushTimer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
ProcessorUsage pUsage = new ProcessorUsage()
{
Time = DateTime.Now,
UsagePercent = (int)cpuCounter.NextValue(),
Source = Environment.MachineName
};
ObserverService.PublishProcessorUsage(pUsage);
}
catch (RemoteConnectionLostException)
{
Console.WriteLine("Connection lost during remote call");
}
catch (Exception ex)
{
Console.WriteLine("Unexpected exception: " + ex);
}
}
}
}
- 看,客户端/服务器应用程序正在运行(高 CPU 负载并非由示例引起 :) IOC-Talk 是为高吞吐量场景设计和优化的)。
这是一个非常基础的示例,只有一个通信方向。会话管理仅在客户端控制台应用程序中实现。在生产项目中,这可能会使用您自己的功能会话管理接口在控制台应用程序之外实现。
如果您有兴趣,也可以尝试 IOC-Talk 项目页面上更复杂的示例应用程序。
历史
- 2016-08-23:将 PerformanceRecorder.zip 中的 Nuget 包升级到版本 1.2.13(Bugfix 会话创建)。