65.9K
CodeProject 正在变化。 阅读更多。
Home

XML 服务器套件

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2016 年 3 月 23 日

CPOL

11分钟阅读

viewsIcon

12142

XmlServer 是一个多线程服务器应用程序,它监听客户端请求,调用“运算符”,并响应客户端请求。

XmlServer Suite

我写这篇文章的初衷是出于自私。  我想更多地了解 TCP 监听器、TCP 客户端和多线程。    有什么比创建一个“Hello World”应用程序更能学习编程概念呢?  这是我的“Hello World”应用程序,它帮助我了解了 TCP/IP 通信和多线程/线程池的内部原理。  该应用程序名为“XML Server Suite”,它包括一个 XML 服务器、客户端、管理工具、单元测试、Windows 服务和可插拔的“运算符”。  

完整的源代码可在 https://xmlserver.codeplex.com/ 找到

“方法总比困难多”

所以你可能会问自己,为什么还要花时间做这个,因为很多事情都可以用 WCF 和 netTcpBinding 绑定、Task Parallel Library (TPL) 或 MSMQ 消息来完成。  你还会在代码中注意到,我创建了自己的线程池和池管理器。  正如我之前所说,我想为我的个人学习创建一个“Hello World”应用程序,并提高我的技能。 

应用程序做什么?

在审查代码之前,我认为有必要向您展示应用程序的工作原理。 

XmlServer 监听以 XML 形式的客户端请求。  客户端 XML 请求包括“运算符”的名称和一个“事实”集合。  以下是来自客户端的示例 XML 请求

<?xml version="1.0" encoding="utf-16"?>
<Request Type="Math Operator" Version="1.0">
<FirstNumber>10</FirstNumber>
<SecondNumber>20</SecondNumber>
<Operation>add</Operation>
</Request>

请注意,这是一个来自客户端的“请求”,因为 XML 的第一个节点是 <Request />。  客户端请求 XmlServer 执行 Math Operator 的 1.0 版本。  客户端正在传入 3 个事实

  1. FirstNumber = 10
  2. SecondNumber = 20
  3. Operation = add

XmlServer 收到此请求,从线程池获取一个空闲线程,并使用反射调用已暴露给 XmlServer 的 DLL 中的方法。  XmlServer 返回给客户端的响应如下

<?xml version="1.0" encoding="utf-16"?>
<Response Type="Math Operator" Version="1.0" Success="true">
<Calculation>30.00</Calculation>
</Response>

这是来自 XmlServer 的成功响应,因为 XML 中的第一个节点是 <Response />,并且它包含属性“Success”的值“true”。  XmlServer 还向客户端返回了一个“事实”集合。  在这种情况下,只有一个名为 <Calculation /> 的响应事实,其中包含所请求操作的结果:10 + 20 = 30

这就是我的“Hello World”应用程序的基本原理

源代码包含什么?

源代码中总共包含 19 个项目。  我试图将项目组织成逻辑组。  以下是项目的概述

服务器解决方案文件夹
  • XmlServer.ServiceContracts

    这包含服务器和运算符使用的所有接口。  要创建自己的运算符,您的类需要实现此项目中找到的 IOperator 接口。

  • XmlServer.ServiceImplementation

    此项目包含实现 XmlServer.ServiceContracts 项目中大多数接口的类。  例如,IFact 接口包含 4 个属性:Name、Value、Description 和 IsRequired。  XmlServer.ServiceImplementation 项目包含一个实现 IFact 接口的 Fact 类。

  • XmlServer

    这是主要的服务器代码。  它包含 2 个监听器(用于监控的处理监听器和优先级监听器;稍后将详细介绍)。  它还包含客户端处理程序、池服务程序和运算符调用程序。

运算符解决方案文件夹

我创建了许多示例运算符来展示这种“可插拔”架构的灵活性,包括

  • DBQuery Operator

    我没有完成这个运算符,但意图是查询数据库并返回结果。

  • Delay Operator

    此运算符使线程休眠指定的时长。  此运算符对于通过模拟客户端的不同耗时请求来对服务器进行压力测试非常有用。

  • Echo Operator

    从客户端传递到服务器的所有事实都会被“回显”回客户端。

  • Email Operator

    同样,由于时间关系,我没有完成这个运算符,但我的意图是让客户端将一些 SMTP 配置设置作为事实发送,然后服务器发送电子邮件。

  • Exception Operator

    此运算符会导致运算符出现异常,该异常在 XmlServer 中被捕获。  用于单元测试。

  • Log Operator

    将消息写入事件查看器。

  • Math Operator

    允许 XmlServer 对 2 个数字执行基本算术运算。

  • MSMQ Operator

    未完成;将消息写入 MSMQ 服务器/队列。

  • QuickReturn Operator

    此运算符模拟运算符 DLL 中的长时间运行进程。  创建一个线程来模拟长时间运行的处理器,但立即向客户端返回响应。

  • Time Operator

    客户端没有为此运算符传递任何事实。  它返回 XmlServer 计算机上注册的所有时区的当前日期/时间。

注意:XmlServer 中还有一个内置的运算符。  它是状态运算符,它返回监听器的统计信息,例如等待的客户端数量、已处理的客户端数量、总处理时间、平均处理时间以及估计的清理时间。

杂项解决方案文件夹
  • XmlServer.Helper

    此项目包含许多用于处理 XML、响应、请求和事实的实用程序。

托管
  • XmlServer.Host.Console

    此控制台应用程序允许您快速将 XmlServer 托管在一个简单的控制台应用程序中。  这主要用于运行单元测试。

  • XmlServer.Host.Service

    这是一个托管 XmlServer 的 Windows 服务。  要安装服务,请从命令行运行 install.bat。  转到计算机管理/服务来启动服务。

工具
  • XmlServer.Admin

    这是一个 WinForm 应用程序,用于监视正在运行的 XmlServer。  它对优先级监听器进行状态运算符调用,以便在处理监听器积压时立即获得结果。  此应用程序使用了 Simple Performance Chart。  感谢发布了出色的图表控件!

应用程序架构

XmlServer 中的主类是 Server 类。  Server 类允许您启动/停止或暂停/恢复监听器。  服务器实际上使用了 2 个监听器

  1. 处理监听器:这是执行客户端请求处理的监听器,通过调用请求的运算符来处理。  客户端应用程序应使用此监听器的端口进行请求。
  2. 优先级监听器:此监听器由 XmlServer.Admin 应用程序使用。  它主要用于请求状态运算符。  优先级监听器知道处理监听器的状态;也就是说,它知道有多少客户端正在等待处理,有多少客户端已被处理,处理一个客户端的平均时间以及清理所有等待的客户端的时间。

这是服务器类及其两个监听器的图示

这是创建 Server 类并配置监听器的方式。  此代码也是单元测试的一部分

// create the server class
var server = new XmlServer.Server();

server.ProcessingListener.Configuration.IPAddress = "192.168.0.8";
server.ProcessingListener.Configuration.Port = 8095;
server.ProcessingListener.Configuration.OperatorsFolder = operatorFolder;
server.ProcessingListener.Configuration.NumberOfThreads = 50;

server.PriorityListener.Configuration.IPAddress = "192.168.0.8";
server.PriorityListener.Configuration.Port = 8096;
server.PriorityListener.Configuration.OperatorsFolder = operatorFolder;
server.PriorityListener.Configuration.NumberOfThreads = 5;

server.StartListening();

Server 类还包含许多事件/委托,用于获取对日志记录和跟踪有用的附加信息。

server.ProcessingListener.ListenerDebugMessage += Listener_ListenerDebugMessage;
server.ProcessingListener.ListenerException += Listener_ListenerException;

server.ProcessingListener.ListenerStarted += Listener_ListenerStarted;
server.ProcessingListener.ListenerStopped += Listener_ListenerStopped;

server.ProcessingListener.ListenerRequestCompleted += Listener_ListenerRequestCompleted;

这是监听器启动后的流程

  1. 监听器启动“客户端消息泵”,这基本上是一个等待客户端请求的循环。  消息泵方法称为 Listener.AcceptClients()。

    快速顺便说一句,根据维基百科,“消息泵”是“一种编程构造,它等待并分派程序中的事件或消息”。

  2. 在 AcceptClients() 方法收到客户端请求后,它会立即创建一个 ClientHandler 对象并将其添加到 ClientPool 类中。

    ClientPool 是另一个累积 ClientHandler 请求的类。  这是 ClientPool 的构造函数

            public ClientPool(int Capacity)
            {
                pool = new Queue< clienthandler >(Capacity);
            }
    

    请注意,该类使用通用 Queue集合,以便请求可以先入先出 (FIFO) 地处理。

    ClientPool 类非常简单,只包含一个 Count 属性以及 Add()、NextClient() 和 Clear() 方法。  当然,ClientPool 类需要是线程安全的。  这是 NextClient() 的代码

            public ClientHandler NextClient()
            {
                if (Count == 0) return null;
    
                ClientHandler client = null;
                try
                {
                    Monitor.Enter(this);
                    client = pool.Dequeue();
                }
                finally
                {
                    Monitor.Exit(this);
                }
                return client;
            }
            

    需要注意的是,当调用 NextClient() 时,由于 Dequeue() 方法,ClientHandler 类实际上会从池/队列中移除。

  3. 到目前为止,我们是这样的:在这个编程逻辑点,我们已经收到了一个客户端请求,为该请求创建了一个 ClientHandler 对象,并将 ClientHandler 对象添加到了 ClientPool 中。

    另一个重要的注意点:此时我们还没有从客户端请求中读取任何数据。

  4. ClientPool 类现在在池/队列中包含 ClientHandler 请求。

    PoolServicer 类负责处理 ClientHandler 请求。  PoolServicer 还有一个名为“Process()”的消息泵,它不断查找需要处理的 ClientHandler。  PoolServicer 通过不断调用 NextClient() 来获取下一个要处理的 ClientHandler 来实现这一点。

    PoolServicer 获取 ClientHandler 对象后要做的第一件事是请求 ClientHandler 从客户端读取请求。  数据从客户端读取,然后 ClientHandler 对象被放回 ClientPool。

    流程重新开始:PoolServicer 获取下一个需要处理的 ClientHandler。  ClientHandler 读取来自 Tcp Socket 的传入请求,并将 ClientHandler 放回池中。

    假设我们一次收到 100 个并发请求。  使用上述方法(仅读取部分传入消息并将请求放回池中),我们可以“一点一点地”开始处理所有请求。

    当 ClientHandler 最终完成读取请求的所有传入数据后,我们就可以实际处理该请求了。  如果“bytesRead”大于零,则 ClientHandler 仍被视为“Alive”,因为还需要从套接字读取更多数据。

           public void Process() {
             int bytesRead = this.networkStream.Read(bytes, 0, (int)bytes.Length);
                    if (bytesRead > 0)
                    {
                        // there might be more data, so store the data received so far.
                        sbRequest.Append(Encoding.ASCII.GetString(bytes, 0, bytesRead));
                    }
                    else
                    {
                        // all of the data has been recevied
                        ProcessDataReceived();
                    }
            

    这是 PoolServicer 的部分源代码,它获取下一个 ClientHandler,开始处理请求,并根据名为 ClientHandler.Alive 的属性将 ClientHandler 放回池中。

    // get a client handler to be processed
    ClientHandler client = null;
    try
    {
       // get the next ClientHandler to be processed from the pool/queue.
       // IMPORTANT: This will remove the ClientHandler from the queue/pool
       client = clientPool.NextClient();
    }
    catch
    {
       client = null;
    }
    
    // see if we got a client to process
    if (client != null)
    {
       IncrementWorkingThreads();
       try
       {
          client.Process();
       }
       catch { }
    
       // if the client is still connected, schedule it for later processing (by adding it back into the pool/queue)
       // if the client is not alive anymore, we are done processing it....DO NOT ADD BACK TO POOL/QUEUE
       if (client.Alive)
       {
          // the client still needs to be processed; add it back to the pool/queue
          clientPool.Add(client);
       }
       else
       {
          try
          {
             // the client finished processing
             IncrementClientsProcessed();
             IncrementTotalClientPoolTime(client.ProcessingTime);
           }
           catch { }
        }
    
        DecrementWorkingThreads();
    
    }
            
  5. 此算法的最后一步发生在 ClientHandler 读取完所有传入数据并且 PoolServicer 已将 ClientHandler 从池中移除之后。  现在我们准备好让 ClientHandler 处理请求,这通过 ClientHandler.ProcessDataRecieved() 来完成。

    ProcessDataRecieved() 方法评估从客户端收到的 XML 并执行以下步骤

    1. 它从 XML 的请求节点中获取运算符名称和版本。  例如,这是 Echo Operator 的客户端请求
      <?xml version="1.0" encoding="utf-16"?>
      <Response Type="Echo Operator" Version="1.0" Success="true">
      <GuidValue1>f4991a9b-7dd9-437c-9e75-4e227b0f4c23</GuidValue1>
      <GuidValue2>9fc1095e-8247-4bbe-81cb-9f31a24336bf</GuidValue2>
      <GuidValue3>d2846483-b93c-49c2-a10d-8f6e3cd866bf</GuidValue3>
      </Response>
                      

      在此示例中,客户端请求的是“Echo Operator”的 1.0 版本。

    2. ClientHandler 反序列化 XML 请求并获取请求中的所有“事实”。  例如,以上请求中的事实是
      {
          new Fact { Name = "GuidValue1", Value = "f4991a9b-7dd9-437c-9e75-4e227b0f4c23" },
          new Fact { Name = "GuidValue2", Value = "9fc1095e-8247-4bbe-81cb-9f31a24336bf" },
          new Fact { Name = "GuidValue3", Value = "d2846483-b93c-49c2-a10d-8f6e3cd866bf: }
      }
      

      有关解析请求/响应 XML 和事实的大部分代码可以在 XmlServer.Helper.Utilities 中找到。

    3. 接下来,ClientHandler 使用反射查找实现 IOperator 接口且名称和版本等于“Echo Server_1.0”的 DLL。

      ClientHandler 调用运算符中的 ProcessRequest() 方法。  运算符中的 ProcessRequest() 方法返回一个 IResponse 对象。

    4. 最后,ClientHandler 将 IResponse 对象序列化为 XML,并将响应写回等待的客户端。  这是上面 Echo 请求的响应
      <?xml version="1.0" encoding="utf-16"?>
      <Response Type="Echo Operator" Version="1.0" Success="true">
      <GuidValue1>f4991a9b-7dd9-437c-9e75-4e227b0f4c23</GuidValue1>
      <GuidValue2>9fc1095e-8247-4bbe-81cb-9f31a24336bf</GuidValue2>
      <GuidValue3>d2846483-b93c-49c2-a10d-8f6e3cd866bf</GuidValue3>
      </Response>
      	                

运算符

创建运算符涉及创建一个 .Net 类库,实现 IOperator 接口,并将编译后的 DLL 复制到 XmlServer 文件夹。

这是我创建 Time Operator 的方法

  1. 我创建了一个名为 Operator 的新类
        public class Time : IOperator
        {
            public const string SERVER_NAME = "Time Operator";
            public const string SERVER_VERSION = "1.0";
    
            public string Name
            {
                get { return SERVER_NAME; }
            }
    
            public string Version
            {
                get { return SERVER_VERSION; }
            }
            
  2. ClientHandler 将调用 ProcessRequest()
            public IResponse ProcessRequest(IRequest request)
            {
                Response response = new Response();
                response.Request = request;     // return the original request object
                response.Success = true;        // assume a success
    
                try
                {
    
                    List facts = new List();
    
                    // get all the time zones on this computer
                    var timeZones = TimeZoneInfo.GetSystemTimeZones();
    
                    foreach (TimeZoneInfo timeZone in timeZones)
                    {
                        var dateTime = TimeZoneInfo.ConvertTime(DateTime.Now, timeZone);
                        facts.Add(new Fact { Name = timeZone.StandardName, Value = String.Format("{0} {1}", dateTime.ToLongDateString(), dateTime.ToShortTimeString()) });                    
                    }
    
                    response.Facts = facts;
    
                }
                catch (Exception ex)
                {
                    response.Facts = XmlServer.Helper.Utilities.CreateException(ex);
                    response.Success = false;
                }
    
                response.StopProcessing = System.Environment.TickCount;
                return response;
    
            }
            
  3. 然后 ClientHandler 将从 IOperator.ProcessRequest() 返回的 IResponse 对象序列化给客户端。

这是 Time Operator 的示例请求和响应

    <?xml version="1.0" encoding="utf-16"?><Request Type="Time Operator" Version="1.0" />

这是 Time Operator 返回的响应

<?xml version="1.0" encoding="utf-16"?><Response Type="Time Operator" Version="1.0" Success="true">
<DatelineStandardTime>Wednesday, March 23, 2016 3:48 AM</DatelineStandardTime>
<HawaiianStandardTime>Wednesday, March 23, 2016 5:48 AM</HawaiianStandardTime>
<AlaskanStandardTime>Wednesday, March 23, 2016 7:48 AM</AlaskanStandardTime>
<PacificStandardTimeMexico>Wednesday, March 23, 2016 7:48 AM</PacificStandardTimeMexico>
<PacificStandardTime>Wednesday, March 23, 2016 8:48 AM</PacificStandardTime>
<USMountainStandardTime>Wednesday, March 23, 2016 8:48 AM</USMountainStandardTime>
<MountainStandardTimeMexico>Wednesday, March 23, 2016 8:48 AM</MountainStandardTimeMexico>
<MountainStandardTime>Wednesday, March 23, 2016 9:48 AM</MountainStandardTime>
<CentralAmericaStandardTime>Wednesday, March 23, 2016 9:48 AM</CentralAmericaStandardTime>
<CentralStandardTime>Wednesday, March 23, 2016 10:48 AM</CentralStandardTime>
<CentralStandardTimeMexico>Wednesday, March 23, 2016 9:48 AM</CentralStandardTimeMexico>
<CanadaCentralStandardTime>Wednesday, March 23, 2016 9:48 AM</CanadaCentralStandardTime>
...
</Response>

运行 XmlServer

有几种不同的运行 XmlServer 的方法。  最简单的方法是运行单元测试“服务器和客户端测试”。  这些单元测试是独立的;也就是说,它们创建一个 XmlServer,启动服务器,发出请求,然后解析响应。  它们对于测试新运算符很有用。

或者,您可以使用控制台应用程序 XmlServer.Host.Console 启动 XmlServer,或安装 Windows 服务 XmlServer.Host.Service。  Stress Test 模拟多个客户端发出多个请求。  它使用随机数和 Delay Operator 来给 XmlServer 施加负载。  此时可以看到 Priority Listener 的真正好处。  处理监听器正忙于处理数百个客户端请求。  如果我们请求处理监听器的状态,我们将在队列末尾等待处理。  使用优先级监听器,我们的状态请求会立即得到处理,即使处理端口有积压。  这使我们能够创建一个漂亮的监视和图形应用程序,以获取 XmlServer 的实时统计信息。

未来的增强

我希望您喜欢 XmlServer。  我非常希望能收到您对该应用程序的任何反馈。  就未来增强功能而言,我曾考虑过负载均衡器。  您会注意到代码中有一个方法可以通过在请求中包含 RelayIPAddress 和 RelayPort 来将请求从一个 XmlServer 中继到另一个 XmlServer。  我认为创建一个“Relay XmlServer”会很酷,该服务器会请求多个 XmlServer 的状态,并将客户端请求中继到“Estimated Cleanup Time”最低的 XmlServer。

© . All rights reserved.