65.9K
CodeProject 正在变化。 阅读更多。
Home

命令传输协议(CTP) - 分布式或并行计算的新网络协议

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (31投票s)

2004年4月19日

22分钟阅读

viewsIcon

241517

downloadIcon

7608

本文介绍了一种用于分布式或并行计算的新型网络协议的改进版本。通常,它仅适用于小型消息的快速、可靠且功能丰富的交换。提供了该协议的实现和演示项目。

CTP - 序列很重要

访问项目主页.

引言

首先,这里所说的“计算集群”是什么意思。集群是为某些特定目的而形成的多个工作站的联合体。计算集群是为繁重计算而构建的集群。它是一个对网络功能提出特殊要求的特定系统。高质量集群的网络机制主要特性是

  1. 快速数据交换。
  2. 可靠的数据传输。
  3. 支持广播。通常,某些网络中的所有工作站都参与计算实验,因此广播使控制变得更容易。
  4. 支持大量数据块交换。有时,例如,实验的初始条件可以用这样的数据块表示。
  5. 点对点网络。任何工作站都可以是数据源和数据目的地,因此它们同时是客户端和服务器。

事实上,大多数并行计算软件工具包都以库的形式呈现,它们使用标准网络协议 TCP/IP [1]。使用此协议存在许多缺点

  • 数据交换速度慢。TCP 的“可靠性”和“通用性”带来了很多开销。此协议是通用协议,因此适用于在互联网等不稳定环境中工作,但在为计算开发的稳定(或相当稳定)系统中,可以获得更多好处。
  • TCP 不支持广播。UDP 支持,但它不可靠,并且 UDP 数据报的大小限制为 65467 字节 [1]。
  • 在数据交换之前创建逻辑通道的理念对于集群计算是多余的。首先因为集群通常是经过良好调优、运行良好的网络。其次因为,一些集群计算策略导致工作站之间无序交换。
  • TCP 是一个基于流的协议,但对于确定的任务,有界块交换更可取,因为它允许明确地说出何时所有用于后续操作的数据已到达。

当然,可以针对集群计算中出现的特殊要求调整专门的网络协议。因此,CTP 是一种旨在满足任意任务需求的协议,这些任务需要快速消息交换的支持,并且可以作为消息接收的反应开始繁重计算。尽管其名称中的字母“P”表示“协议”,但它不仅仅是一个规范。CTP 是一种思想和工具包,它允许使用它。因此,它能够取代 MPI 实现、PVM 等产品。

理念

大多数现有的并行计算工具包都使用所谓的“消息”作为基本抽象。CTP 中使用的基本抽象是“命令”。命令是某人向某人发出做某事的指令(在大多数情况下,集群中的工作站正是以这种方式进行通信)或对此类指令的响应。从最后一句话中,可以得出结论,命令具有以下参数

  • “某人” - 发送方。
  • “某人” - 接收方。
  • “某事” - 命令描述。

因此,首先,需要以某种方式定义发送方和接收方。为此,将使用 IP 地址。原因在于 IP 使用极其广泛,并且它完全满足要求(为所有工作站提供唯一标识符)。命令将由整数数字标识。

就所讨论的协议词汇而言,“命令”和“消息”实际上是同义词。“命令”是“消息”,但反之不总是成立。

CTP 需要满足上述集群网络要求。实现此目的的方法如下(与引言中顺序相同)

  1. 为了提高交换速度,UDP 将用作协议的基础。此外,使用 UDP,而无需深入到原始网络,将在未来使用户免受协议支持工具包安装带来的额外问题。
  2. 将实现数据交换的可靠性。每个发送的包都将存储,直到接收方确认收到数据为止。为了维护此机制,包需要提供标识符。标识将在发送方侧通过分配整数数字来执行。这些 ID 通常不能是唯一的,但对于每个发送方必须是唯一的。
  3. 广播支持是使用 UDP 作为基本协议的另一个论据。
  4. 将实现大量数据交换支持。如果将要发送的消息大于某个限制(默认情况下为 65400 字节),则需要将其分成较小的部分。这些部分将被编号并单独发送给接收方,一个接一个。在接收方侧,它们将被 объеди在一起以整理初始命令。一个重要的注意事项是,只有在所有部分都已收到后,接收方应用程序才会收到有关命令到达的信息。此类命令将被称为“大型命令”,但在实践中,大多数命令是“正常”命令(只需一个包进行传输)。
  5. 对于点对点交换,CTP 的实现应将客户端和服务器功能作为一个整体包含在内。

CTP/IP 与 OSI 模型 [2] 以及 UDP/IP 理念的关系如图 1 所示。

Fig. 1. Relationship between OSI-model, UDP/IP-model and CTP/IP-model

图 1. OSI 模型、UDP/IP 模型和 CTP/IP 模型之间的关系。

CTP 涵盖了从传输层到应用层的多个层次,这证明其责任范围从相对较低的级别开始,延伸到较高的级别。

内部世界

接下来的讨论以 CTP 基本概念、其属性和功能的混乱描述形式进行。阅读所有这些内容将形成一个关于内部正在发生什么事情的整体概念。选择这种奇怪的陈述形式,是因为系统性讨论需要同时从几个点开始,但不幸的是,这是不可能的。

CTP 的调试功能可能有助于更好地理解操作顺序。有一个功能可以将所有操作和事件的描述放置到给定的输出流。

标题

每个 CTP 数据包通常表示为报头加正文(数据)。报头结构如表 1 所示(按出现顺序)。

名称 大小(位) 注释
包大小 16 无符号整数。包的大小(包括报头)。
命令号 16 无符号整数。命令号(从 0 到 32767,最高位未设置)。如果最高位已设置,则数据包表示对具有相应命令号的消息的确认。
大型消息中的包号 32 无符号整数。对于大型命令 - 包号,从零到包总数(由报头的下一个字段给出)减一。对于正常命令 - 零。
消息的包总数 32 无符号整数。对于大型命令 - 发送所需包的总数。对于正常命令 - 零或一。
ID 32 无符号整数。包的标识符。对于每个发送方必须是唯一的。
消息大小 64(仅使用 48) 无符号整数。整个命令数据的大小(不包含任何报头)。因此,最大命令的大小是最大包数乘以最大包大小,即 232*65400。这等于 280890861158400 字节,或超过 255 TB。最后一个值允许将消息大小视为无限。显然,48 位足以存储大小值,但为了对齐分配了 64 位。
选项 8 位集合。每个位确定相应选项是否已设置。选项将在稍后讨论。

表 1. CTP 包头

可以计算出,报头的总大小为 25 字节。所有重要的传输参数,如发送方和接收方的 IP 地址和端口,都存储在 UDP 报头中。

每个数据包可以通过其发送方、接收方和 ID 完全识别。发送方以以下方式为每个接收方提供 ID 的唯一性:将要发送的下一个数据包的 ID 初始值取为伪随机数。发送每个数据包后,它将被递增。发送给接收方的第一个数据包必须用特殊选项标记(见下文),以允许接收方了解起始 ID 的值。

存储器

说明 CTP 进行数据包交换的操作序列的流程图如图 2 所示

Fig. 2. Flowchart of CTP's implementation

图 2. CTP 实现的流程图。

在流程图中,提到了所谓的“存储”。有四个数据存储,在生命周期中累积,提供功能。

  1. 会话信息存储。它存储当前工作站与之通信的每个工作站的描述。其中,下一个数据包 ID、交换超时以及从该接收方收到的数据包描述都包含在内。

    交换超时用于确定何时需要重新发送已发送但未确认的数据包。此超时是自适应的(因为集群可能是异构的,并且可能通过内网和互联网连接工作站)。最初,采用默认超时(默认情况下为 100 毫秒)。第一次交换后,其值取为所需时间乘以系数(默认情况下为 3)。

    如果在超时期间未收到数据包到达的确认,则应重新发送数据包。重新发送之间的时间间隔将呈指数增长。如果数据包在 8 次重新发送(255 次超时)后仍未确认,则将生成错误消息“命令未确认时间过长”。如果超时设置为零,则此功能将关闭。

    消息可以重新发送。因此,有必要保护用户免受多次接收同一消息的影响。这就是为什么为每个收件人存储已接收数据包的描述。它以有序列表的形式实现。第一个元素包含按顺序接收到的数据包的最大 ID。在此元素之后,可能还有更多 ID,对应于已接收但大于第一个元素的数据包。在此列表中插入每个新 ID 后,从第一个元素开始的序列将被截断。例如,假设此存储包含 {7, 9, 10, 11, 13, 14}。这意味着所有 ID 小于或等于 7 以及等于 9、10、11、13 和 14 的数据包都已收到。收到 ID 为 8 的数据包后,列表将变为 {11, 13, 14}。如果所有数据包都按顺序到达,则列表始终只包含一个元素。

    ID 的值应在一个无限循环中(232-1 后是 0)。在此阶段,确定发送方生成的起始 ID 非常重要。

    当第一条消息即将发送或已从工作站接收到,或者尚不清楚时,会向会话信息存储添加一个新条目。将有一个用于广播消息的特殊条目。

  2. 已发送命令存储。要发送命令,需要整理数据包。需要分配一些内存并填充数据包头和数据。事实是,它不会在发送后立即释放和取消分配,而是存储到已发送命令存储中。只有在所有数据包都已确认到达后,才能从已发送命令存储中删除记录。

    这种思想可以针对“每个命令”而不是“每个数据包”实现(如 CTP 1.0 中),但第一种变体更可取。在这种情况下,所谓的“智能缓冲区”可以通过在数据包数据排列时预留和保护报头所需的内存来避免冗余内存分配。

  3. 大型命令存储 用于在分部分接收整个大型消息时进行整理。它存储总数量、部分接收状态向量以及用于编译的缓冲区。消息的每个部分,除了最后一个,都具有最大数据大小,因此部分可以轻松找到它们在缓冲区中的位置,因为它们知道自己的编号。当所有部分都已收到时,消息被认为是已整理好的,并且服务器会通知应用程序数据已到达。
  4. 交付存储。整个接收到的消息或错误描述被称为“交付”。生成后,它们将被添加到交付列表。然后,特殊的交付者线程将从列表中取出它们并将其传递给应用程序。

    用面向对象编程的术语来说:实现接收方应用程序的类对象可以订阅以获取给定命令的交付。在这种情况下,相应的对象将收到有关命令到达以及与该命令相关的错误的信息。

    此外,还需要一个默认接收器,用于接收有关没有订阅者的命令以及没有相关命令的常见错误(例如套接字创建错误)的信息。

确认

确认是带有空正文(仅报头)的数据包,它与已确认数据包的报头只有三个区别。在确认报头中

  • 包大小设置为报头大小。
  • 在命令号中,最高位已设置。
  • 消息大小设置为零。

这可能被认为是一种低效的解决方案——用单独的确认来确认每个数据包,但这样做是为了通过使用选项提供更多功能。不要忘记 CTP 不仅仅是一种协议,还是一个工具包。

数据包发送后

接收方收到数据包后做的第一件事是检查是否已收到相同的包。如果已收到此类数据包,则表示发送方未能收到确认,因此必须再次发送确认,并且可以跳过此数据包的接收过程。

确认必须是“再次发送”,而不是“重发”。确认不存储在已发送数据包存储中,而是在需要时生成。

如果之前没有收到此类数据包,则需要存储其到达信息。

如果收到的数据包表示正常命令,则服务器通知应用程序数据已到达(创建交付并将其放入交付存储)。如果它是大型命令的一部分,则服务器将其存储到大型命令存储中。如果它是消息的最后一个剩余部分,则也应生成交付。

数据包找到其位置后,必须发送确认,然后接收方开始等待下一个数据包。

当发送方收到任何确认时,它将从已发送数据包存储中删除相应的记录。该机制,就像物理系统一样,渴望将其势能降至最低,以便尽快释放所有存储。

线程

协议功能的实现应该是多线程的。有三种类型的线程

  • 服务器线程 接收数据包,实现确认支持、大型命令编排等。如果数据到达或错误信息出现,线程会将其添加到交付存储中。每隔一段时间(默认情况下为 100 毫秒),此线程会检查是否存在需要重新发送的数据包,并在必要时重新发送它们。服务器线程的数量可以是任意的,具体取决于任务。
  • 交付者线程 检查交付列表,如果它不为空,则将第一个交付发送给相应的订阅者或默认接收器。如果线程长时间(默认情况下为 20 秒)不执行任何操作,它将被终止。在空闲循环中,线程可能会休眠一段时间(默认情况下为 20 毫秒)。

    每个命令都是一个指令(或响应)。它可能指令做一些困难且持久的事情。因此,将交付者实现为单独的线程允许“按指令”计算某些东西,就在请求它的地方和时刻。然而,强烈建议不要浪费此功能。例如,不要在命令接收处理程序中使用模式对话框,因为这将毫无用处地占用交付者。

  • 交付管理器线程 如果所有现有交付者都忙碌且交付列表不为空,则创建额外的交付者线程。当然,交付者的最大数量受某个值限制(默认情况下为 50)。协议机制力求减少负载。在空闲循环中,线程可能会休眠一段时间(默认情况下为 10 毫秒)。

选项

选项允许为网络添加有趣的功能。有五种可能的选项

  • DelAfterError - 如果设置,则在接收方被告知其到达未被确认(生成错误描述后)后,将从已发送数据包存储中删除有关发送此数据包的信息。因此,此错误将只交付一次,如果数据未到达,它将丢失。
  • NoResend - 如果设置,即使此数据包未被确认,也不会重新发送。
  • UniqueCommand - 如果设置,则此数据包命令的确认将确认所有具有相同命令号并发送到给定接收方的数据包。不允许对大型消息使用此选项,以防止完整性损坏。
  • Broadcast - 如果设置,则此数据包将进行广播。选项本身对网络没有任何影响。通常,对于广播,用户必须指定接收方的 IP 地址,例如:255.255.255.255。但此选项提供了使用 CTP 进行广播所需的两个重要事项。首先,发送的消息必须获取其 ID,并且必须存储在会话信息存储中对应于“广播”的条目中。其次,此消息将获得被任意工作站确认的能力,因为无法事先知道谁将收到它。如果至少一个工作站确认,则认为该命令已确认。不建议(但可能)将其用于大型命令。
  • StartSession - 如果设置,则此数据包是发送方发送给此接收方的第一个数据包。因此,它带来的 ID 可以作为与此发送方会话的最小 ID。在检查数据包是否已收到时会考虑这一点。

    请注意,“会话”的概念意味着单向通道。如果两个工作站正在交换一些数据,那么它们在会话信息存储中都为对方有条目。

这些选项可以任意组合使用:单独使用,全部共同使用等等。

例如,将选项 ErrorOnceNoResendUniqueCommand 按位或运算组合,可能对诸如“如果你还活着就回复我”(也称为“ping”)之类的命令很有用。对于频繁发送、小型且不携带信息,但表示响应或确认的命令——接收方正在工作。

Windows 实现

最初,CTP 是为 Windows 操作系统实现的,旨在成为 (Cellular Automata Modeling Environment & Library) 项目 [3] 中使用的网络机制的基础。请参见项目主页。当然,它也可以用于任何需要快速消息交换和“按指令”进行繁重计算的任意应用程序。

该协议的实现由一组类表示。实现 CTP 主要功能的类名为 CCTPNet。CTP 实现中涉及的所有类的描述如下。

IPAddr 类

IPAddr 类的对象表示工作站的 IP 地址。除了源文件,此类别不需要任何解释。

union IPAddr
{
    // Data type for solid representation
    typedef unsigned __int32 IPSolid;

    // Actual data
    struct IPBytes {
        unsigned char b1,b2,b3,b4;
    } Bytes;
    IPSolid Solid;

    // Constructors
    IPAddr() {SetLocalhost();};
    IPAddr(unsigned char b1, unsigned char b2, 
           unsigned char b3, unsigned char b4)
      {Bytes.b1=b1;Bytes.b2=b2;Bytes.b3=b3;Bytes.b4=b4;};
    IPAddr(IPSolid l) {Solid=l;};

    // Returns true is this ip-address refers to localhost (127.0.0.1)
    inline bool IsLocalhost()
      {return Bytes.b1==127&&Bytes.b2==0&&Bytes.b3==0&&Bytes.b4==1;};

    // Returns true is this ip-address refers to broadcasting address
    // (255.255.255.255)
    inline bool IsBroadcasting()
      {return Bytes.b1==255&&Bytes.b2==255&&Bytes.b3==255&&Bytes.b4==255;};

    // Returns via s and return value dotted
    // string representation of ip-address
    inline LPTSTR GetString(LPTSTR s)
      {sprintf(s,"%d.%d.%d.%d",Bytes.b1, 
               Bytes.b2,Bytes.b3,Bytes.b4); return s;};

    // Set stored ip address to value, represented with string s (in
    // dot-separated format). Returns true if succeeded and false otherwise
    bool FromString(LPTSTR s);

    // Set ip address to localhost (127.0.0.1)
    inline void SetLocalhost()
      {Bytes.b1=127;Bytes.b2=0;Bytes.b3=0;Bytes.b4=1;};

    // Set ip address to broadcasting address (255.255.255.255)
    inline void SetBroadcast()
      {Bytes.b1=255;Bytes.b2=255;Bytes.b3=255;Bytes.b4=255;};

    // Operators
    bool operator ==(unsigned long ip) {return Solid==ip;};
    bool operator ==(IPAddr ip) {return Solid==ip.Solid;};
    bool operator !=(unsigned long ip) {return Solid!=ip;};
    bool operator !=(IPAddr ip) {return Solid!=ip.Solid;};
    IPAddr& operator =(const unsigned long ip) {Solid=ip; return *this;};
    IPAddr& operator =(const IPAddr ip) {Solid=ip.Solid; return *this;};
};

SmartBuffer 类

SmartBuffer 类的对象表示“智能缓冲区”,它使 CTP 的实现免于冗余内存分配。它会根据数据的特定大小(单个数据包中的最大数据量)动态地为数据包头预留空间。因此,用户只需将数据放入智能缓冲区。然后发送函数插入报头,数据包即可发送出去。此类的定义如下

class SmartBuffer
{
public:
    // Constructor. Parameters:
    // +) datasize - amount of data to be used;
    // +) autodel - if true then this buffer will be freed automatically by
    //    protocol's implementation after it will be sent and confirmed if
    //    needed. So, working with such buffer, you will have to create it with
    //    operator new, but do not to delete it. If this parameter equals false
    //    you will have to do new and delete manually
    // +) headsize - size of header of each packet;
    // +) maxdatasize - maximum size of data in single packet (without header)
    SmartBuffer(unsigned int datasize=0, bool autodel=true, 
      unsigned int headsize=25, unsigned int maxdatasize=65400);


    // Constructor. Parameters:
    // +) fname - name of file to be stored in internal buffer as data;
    // +) datasize - amount of data to be stroed just before the file;
    // +) autodel - if true then this buffer will be freed automatically by
    //    protocol's implementation after it will be sent and confirmed if
    //    needed. So, working with such buffer, you will have to create it with
    //    operator new, but do not to delete it. If this parameter equals false
    //    you will have to do new and delete manually
    // +) headsize - size of header of each packet;
    // +) maxdatasize - maximum size of data in single packet (without header)
    SmartBuffer(LPCTSTR fname, unsigned int datasize=0, bool autodel=true, 
      unsigned int headsize=25, unsigned int maxdatasize=65400);

    // Destructor
    virtual ~SmartBuffer() {delete[] m_pBuffer;};

// Access to key values
    // Header size
    inline unsigned int GetHeadSize() {return m_uHeadSize;};

    // Data size
    inline unsigned int GetDataSize() {return m_uDataSize;};
    void SetDataSize(unsigned int datasize);

    // Maximum data size for single packet
    inline unsigned int GetMaxDataSize() {return m_uMaxDataSize;};

    // Allocated buffer size
    inline unsigned int GetBufferSize() {return m_uBufferSize;};

    // Auto deleting
    inline bool GetAutoDel() {return m_bAutoDel;}
    inline void SetAutoDel(bool autodel) {m_bAutoDel=autodel;}

// Access to key pointers
    // Returns begining of the buffer
    inline char* GetBufferBegin() {return m_pBuffer;};

    // Returns current pointer
    inline void* GetCurPtr() {return m_pCurPtr;};

    // Sets current pointer to pointer to data of i-th packet (i from zero to
    // GetPacketsCount()-1). Result false if index i is out of bounds
    inline bool CurPtrToDataPtr(unsigned int i)
      {char* res=GetDataPtr(i); 
       if (res) {m_pCurPtr=res; return true;} else return false;};
    
    // Sets current pointer to the begining of data
    inline void CurPtrToDataBegin() {m_pCurPtr=m_pBuffer+m_uHeadSize;};

// Access to packets
    // Returns amount of packets
    inline unsigned int GetPacketsCount()
      {return m_uBufferSize/(m_uHeadSize+m_uMaxDataSize)+
        ((m_uBufferSize%(m_uHeadSize+m_uMaxDataSize))?1:0);};

    // Returns pointer to header of i-th packet (i from zero to
    // GetPacketsCount()-1). Result is zero if index i is out of bounds
    inline char* GetHeadPtr(unsigned int i)
      {char* res=i*(m_uHeadSize+m_uMaxDataSize)+m_pBuffer; 
        if (res>m_pBuffer+m_uBufferSize) return NULL; else return res;};

    // Returns pointer to data of i-th packet (i from zero to
    // GetPacketsCount()-1). Result is zero if index i is out of bounds
    inline char* GetDataPtr(unsigned int i)
      {char* res=GetHeadPtr(i);
       if (res) return res+m_uHeadSize; else return NULL;};

    // Returns size of i-th packet (i from zero to GetPacketsCount()-1),
    // including header. Only last packet's size can differ from header's size
    // plus maximum data size. Result is zero if index i is out of bounds
    inline unsigned int GetPacketSize(unsigned int i)
      {if (i<(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize)) 
        return m_uHeadSize+m_uMaxDataSize; 
        else if (i==(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize))
          return (m_uBufferSize)%(m_uHeadSize+m_uMaxDataSize);
        else return 0;};

// Data and header access routines
    // Put data of size GetHeadSize() from src to the are of i-th header.
    // Returns true if data was copied successfully to the existing header etc
    // and false otherwise
    bool PutHead(void* src, unsigned int i);

    // Put byte of data bt to internal buffer from current pointer (if dest is
    // negative) or from dest-th byte.. Current pointer will be moved to the
    // end of put data (skipping headers) if movecur equals true. Returns true
    // if data was copied successfully
    bool PutDataByte(unsigned char bt, bool movecur=true, int dest=-1);

    // Put data of size size from src to internal buffer from current pointer
    // (if dest is negative) or from dest-th byte. Current pointer will be
    // moved to the end of put data (skipping headers) if movecur equals true.
    // Returns true if data was copied successfully, without truncation etc and
    // false otherwise
    bool PutData(void* src, unsigned int size, bool movecur=true, int dest=-1);

    // Put string to internal buffer from current pointer (if dest is negative)
    // or from dest-th byte. Current pointer will be moved to the end of put
    // data (skipping headers) if movecur equals true. Returns true if data was
    // copied successfully, without truncation etc and false otherwise
    inline bool PutDataString(char* str, bool movecur=true, int dest=-1)
      {return PutData(str,strlen(str)+1,movecur,dest);};

    // Put data from file fname to internal buffer from current pointer
    // (if dest is negative) or from dest-th byte. Current pointer will be
    // moved to the end of put data (skipping headers) if movecur equals true.
    // Returns true if data was copied successfully, without truncation etc and
    // false otherwise
    bool PutDataFile(LPCTSTR fname, bool movecur=true, int dest=-1);

    // Trim the buffer, by cutting the content, excluding the part of buffer
    // from current pointer to the end. It is strongly to perform this
    // operation only after all buffer's modifications
    void Trim();

protected:
    // Calculates needed buffer size
    inline unsigned int GetNeededBufferSize(unsigned int datasize, 
           unsigned int headsize, unsigned int maxdatasize)
      {return datasize?(datasize/maxdatasize*(headsize+maxdatasize)+
        ((datasize%maxdatasize>0)?(datasize%maxdatasize+ 
                                  headsize):0)):headsize;};

    // Calculate pointer by dest (if negative - then by m_pCurPtr). Result
    // pointer will be put to ptr. If prtnsize is not NULL then portion size
    // will be also calculated and put to variable, pointed by prtnsize
    inline void DestToPtr(int dest, char*& ptr, unsigned int* prtnsize);

// Data members
    bool m_bAutoDel; // Detele it automatically or not
    unsigned int m_uHeadSize; // Size of header
    unsigned int m_uDataSize; // Size of data
    unsigned int m_uMaxDataSize; // Maximum size of data in single packet

    unsigned int m_uBufferSize; // Size of allocated internal buffer
    char* m_pBuffer; // Pointer to internal buffer
    char* m_pCurPtr; // Current pointer
};

NetSender 类

NetSender 类是 CCTPNet 的基类,它实现了 CTP 的主要功能。NetSender 仅用于描述任意协议的通用网络发送类的接口。

class NetSender
{
public:
    // Send smart buffer sb to address to. Parameter command represents command
    // id. Parameter options - sending options. If parameter storeiffail is
    // true then sent command will be stored in sent packets storage even if
    // sending fails and will not overwise. Returns true if succeeded (message
    // has gone) and false otherwise
    virtual bool Send(SmartBuffer& sb, unsigned __int16 command, 
      IPAddr to, unsigned __int8 options=0, bool storeiffail=true)=0;

    // Returns true if sender is working (is not suspended and so on) and false
    // otherwise. Default implementation returns true, because implementation
    // of many protocols cannot be switched off and cannot handle errors at all
    virtual bool IsWorking() {return true;};
};

NetReceiver 类

如果存在 NetSender 类,那么也必须存在 NetReceiver 类。NetReceiver 用于描述可以订阅数据到达和错误信息交付的对象的接口。

class NetReceiver
{
public:
    // Is called when have received data pointed by data
    virtual void OnReceive(void* data)=0;

    // Is called when there was an error, described with data pointed by data
    virtual void OnError(void* data)=0;
};

如果一个对象要订阅交付,它的类必须是 NetReceiver 的后代(C++ 中多重继承的支持允许向任何类添加额外的父类)。成员函数 OnReceiveOnError 将分别在消息到达和发生错误时被调用。在第一种情况下,指针 data 将指向到达数据的描述;在第二种情况下,data 将指向由 NetSender 的后代生成的一些错误描述。

这些成员函数的参数是指向 void 的指针,而不是指向某个具体类的指针,因为 NetReceiver 类也面向任意协议。在使用 CTP 时,OnReceive 的参数将指向 CCTPReceivedData 类的对象,而 OnError 的参数将指向 CCTPErrorInfo 类的对象。

CCTPReceivedData 类

此类的对象描述并提供对接收到数据的访问。它不需要解释。

struct CCTPReceivedData {
    // Constructor. Parameters:
    // +) command - command;
    // +) size - amount of data to be stored;
    // +) from - ip address of host, that sends this data;
    // +) buf - points to buffer, which stores received data. If NULL then
    //    data copying will be skipped (only allocation performs)
    CCTPReceivedData(unsigned __int16 command, 
      unsigned __int64 size, unsigned long from, char* buf);
    // Destructor
    virtual ~CCTPReceivedData() {delete[] pBuf;};

    unsigned __int16 command; // Command
    unsigned __int64 size; // Message size (48 bit)
    IPAddr from; // Host, that had sent this data
    char* pBuf; // Data
};

CCTPErrorInfo 类

此类的对象描述了网络过程中发生的错误。

struct CTCPErrorInfo {
    // Constructor. Parameters:
    // +) type - error type. When it occurs:
    //    +) 0 - on socket creation;
    //    +) 1 - on socket binding or tuning;
    //    +) 2 - on data sending;
    //    +) 3 - on data receiving;
    // +) code - WinSock error code;
    // +) addr - address of host, which causes error (not always can be
    // interpreted, if can not the just equals localhost)
    CTCPErrorInfo(unsigned char type,int code,IPAddr addr)
      {this->type=type; this->code=code; 
       this->addr=addr; GetTimeStamp(timestamp);};

    // Put time stamp to string s and returns it
    static char* GetTimeStamp(char* s);

    unsigned char type; // Error type
    int code; // WinSock error code
    IPAddr addr; // Address of host, which causes error
    char timestamp[22]; // Time stamp, when error occurred
};

上面的一切都必须清楚,除了一个情况:为什么 timestamp 需要作为此类的一个字段?事实是,网络错误发生的时间可能非常重要,例如,用于构建日志文件。但是,错误描述交付的时间可能与发生时间大相径庭。为了避免此类错误,决定将时间戳作为 CCTPErrorInfo 类的一个字段,并将在对象构造期间填充。可以使用静态成员函数 GetTimeStamp 随时检索当前时间的时间戳,精确到千分之一秒。

CCTPNet 类

此类实现了 CTP 的主要功能(客户端和服务器同时)。以下定义描述了成员

class CCTPNet: public NetSender
{
public:
    // Data structures

    // Packet header
#pragma pack(push)
#pragma pack(1)
    struct Header {
        // Constructor
        Header() {size=0;command=0;number=0;amount=0;id=0;messize=0;options=0;}

        void ToStream(ostream& out);

        unsigned __int16 size    ; // Packet size (16)
        unsigned __int16 command ; // Command (16)
        unsigned __int32 number  ; // Packet number (from zero to amount-1) (32)
        unsigned __int32 amount  ; // Amount of packets in the command (32)
        unsigned __int32 id      ; // Packet id (32)
        unsigned __int64 messize ; // Message size (64, but 48 are used)
        unsigned __int8  options ; // Options (8)
    };
#pragma pack(pop)

    // Options bits
    enum Options {
        // Delete sent command from the storage after error was
        // generated
        DelAfterError=0x01,

        // Do not resend this packet even if it was not confirmed
        NoResend=0x02,

        // Confirmation of this packets command will confirm all packets with
        // the same command, that was sent to same recipient.
        // NB: It is not recommended to use this option with multipacket
        // messages to protect it from integrity corruption
        UniqueCommand=0x04,

        // Broadcast this message (message with this option will be confirmed
        // from arbitrary recipient)
        Broadcast=0x08,

        // Mark packet, which is first in the session (in the interchange with
        // given recipient)
        // Note: This option is used by CTP internal world and is not needed to
        // be set by user
        StartSession=0x10
    };

    // Set of options, which appropriate for ping
    static const unsigned __int8 OptPing;

    // Structures for messages and error information delivery
    enum DeliveryType {
        ReceivedData,
        ErrorInfo
    };
    struct Delivery {
        // Constructor
        Delivery(NetReceiver* target,CCTPErrorInfo* data)
          {this->target=target; this->data=data; 
           this->type=DeliveryType::ErrorInfo;};
        Delivery(NetReceiver* target,CCTPReceivedData* data)
          {this->target=target; this->data=data; 
           this->type=DeliveryType::ReceivedData;};
        Delivery() {target=NULL; data=NULL; type=(DeliveryType)NULL;};
        // Only for STL compliance

        NetReceiver* target; // Receiver
        void* data; // Data
        DeliveryType type; // Delivery type
    };
    typedef list<Delivery> DeliveriesList;

    // Time settings storage structure
    struct Times {
        // Constructor, which sets defaults
        Times() {
            uMultiplier=       3;
            uDefTimeout=       100;
            uSleepOnDestroy=   50;
            uSleepSuspended=   10;
            uSleepDelMan=      10;
            uSleepNothing=     20;
            uPeriodDestroy=    2000;
            uPeriodAutoDest=   20000;
            uPeriodCheckResend=100;
        };

        // Multiplier for the time, needed for single transfer, to determine
        // timeout
        unsigned int uMultiplier;

        // Default timeout
        unsigned int uDefTimeout;
    
        // Sleeping time during waiting for desroying
        unsigned int uSleepOnDestroy;

        // Sleeping time when suspended
        unsigned int uSleepSuspended;

        // Sleeping time in deriveries manager
        unsigned int uSleepDelMan;

        // Sleeping time when server has nothing to do
        unsigned int uSleepNothing;

        // Period while waiting for desroying, after which working threads will
        // be stopped forcedly
        unsigned int uPeriodDestroy;

        // If deliverer will do nothing during this period it will be destroyed 
        unsigned int uPeriodAutoDest;

        // Period of checking if some packets are to be resent
        unsigned int uPeriodCheckResend;
    };

    // Constructor creates server with all necessary parameter and tunes client
    // for fast data sending:
    // +) receiver - default receiver of arrived data and errors;
    // +) port - number of port, which to listen;
    // +) servers - amount of setvers to be started;
    // +) times - pointer to time settings storage structure (if NULL then
    //    defaults will be used);
    // +) log - points to output stream for gebug log building (if NULL then no
    //    output will be produced.
    // +) packetdatasize - value of maximum data size to be send in single
    //    packet (if message is bigger than it is the "large message");
    // +) maxthreads - maximum amount of deliverers threads
    CCTPNet(NetReceiver* receiver, unsigned short port, 
         unsigned short servers=1,Times* times=NULL,ostream* log=NULL, 
         unsigned __int16 packetdatasize=65400, 
         unsigned short maxthreads=50);

    // Destructor
    virtual ~CCTPNet();

    // Parameter access routines

    // Time settins routines
    const Times& GetTimes() {return m_Times;}
    void SetTimes(Times& times) {m_Times=times;}

    // Port routines
    unsigned short GetPort() {return m_uPort;}
    void SetPort(unsigned short port)
      {closesocket(m_SendSocket); closesocket(m_RecvSocket); 
       FreeSntCommands(); FreeSessions(); FreeLargeCommands(); 
       m_uPort=port; CreateSockets();}

    // Packet data size routines
    unsigned __int16 GetPacketDataSize() {return m_uPacketDataSize;}
    void SetPacketDataSize(unsigned __int16 ps)
      {delete[] m_pBuffer; m_uPacketDataSize=ps; 
       m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];}

    // Maximal threads amount routines
    void SetMaxDeliverers(unsigned short maxthreads)
      {m_uMaxDeliverers=maxthreads;};
    unsigned short GetMaxDeliverers() {return m_uMaxDeliverers;};

    // Info target routines
    NetReceiver* GetDefaultReceiver() {return m_DefReceiver;}
    void SetDefaultReceiver(NetReceiver* receiver) {m_DefReceiver=receiver;}
    // Sets special receiver receiver for command command and type type. If
    // receiver for definite command and delivery type already exists it will
    // be replaced
    void AddSpecialReceiver(unsigned __int16 command, 
                 NetReceiver* receiver, DeliveryType type);
    // Delete receiver receiver from special receivers list
    void DeleteSpecialReceiver(NetReceiver* receiver);
    // Returns receiver for command command and type type
    NetReceiver* GetReceiver(unsigned __int16 command, DeliveryType type);

    // Suspending status routines
    bool GetSuspended() {return m_bSuspended;};
    void SetSuspended(bool suspended) {m_bSuspended=suspended;};
    virtual bool IsWorking() {return !m_bSuspended;};

    // Operations

    // Returns size or datagrams header
    static unsigned __int16 GetHeaderSize() {return sizeof(Header);}

    // Send smart buffer sb to address to. Parameter command represents command
    // id. Parameter options - sending options. If parameter storeiffail is
    // true then sent command will be stored in sent packets storage even if
    // sending fails and will not otherwise. Returns true if succeeded (message
    // has gone) and false otherwise
    virtual bool Send(SmartBuffer& sb, unsigned __int16 command, 
         IPAddr to, unsigned __int8 options=0, bool storeiffail=true);

    // Send dataless packet (header only). Parameter command represents command
    // id. Parameter options - sending options. If parameter storeiffail is
    // true then sent command will be stored in sent packets storage even if
    // sending fails and will not otherwise. Returns true if succeeded (message
    // has gone) and false otherwise
    bool Send(unsigned __int16 command, IPAddr to, unsigned __int8 options=0, 
         bool storeiffail=true) {return Send(*(new SmartBuffer()), 
         command,to,options,storeiffail);};

    // Save information about packet received from from with header head.
    // Returns true if new message was received and false otherwise
    bool SaveRcvPacket(unsigned long from,Header* head);

    // Mark packet sent to to with header pointed by header as confirmed.
    // Memory will be cleared if possible
    void ConfirmSntPacket(unsigned long to,Header* header);

    // Send to to confirmation of receipt of the packet with header, pointed by
    // header
    void SendConfirmation(unsigned long to,Header header);

    // Arrange large packet received from from with hearer pointed by head.
    // Function returns true if solid message is arranged after last packet
    bool ArrangeLargeCommand(unsigned long from,Header* head);

    // Resend packets, which have not got confimational commands
    void ResendNotConfirmedData();

    // Determines is command is confirmation of command or not
    inline bool IsConfirmation(unsigned __int16 command)
      {return (command&m_iConfirm)!=0;};

    // Retrieving status information functions

    // Returns amount of entries in sent commands storage
    inline unsigned int GetSntCommandsCount() {return m_SntCommands.size();};

    // Returns amount of entries in created sessions information storege
    inline unsigned int GetSessionsCount() {return m_Sessions.size();};

    // Returns amount of entries in large messages storage
    inline unsigned int GetLrgMessagesCount() {return m_LargeCommands.size();};

    // Returns current amount of deliverer threads
    inline unsigned int GetDelThreadsCount() {return m_pDeliverTrds.size();};

    // Returns current amount of busy deliverer threads
    inline unsigned int GetBusyDelThreadsCount() {return m_uBusy;};

    // Returns current amount of deliveries
    inline unsigned int GetDelCount() {return m_Deliveries.size();};

protected:
    // Free buffers of all sent packets
    void FreeSntCommands();

    // Free information about received packets
    void FreeSessions();

    // Free buffers of large packets
    void FreeLargeCommands();

    // Free all planned deliveries
    void FreeDeliveries();

    // Actually send packet pointed with buf to recipient to. Returns true if
    // succeeded and false otherwise
    bool SendPacket(char* buf, unsigned long to);

    // Creates sending and receiving sockets. Returns true if succeeded and
    // false otherwise
    bool CreateSockets();

    // Check the options validity in the given header
    void CheckupOptions(Header& header);

public:
    // Socket for data receiving
    SOCKET m_RecvSocket;

    // Receiving buffer
    char* m_pBuffer;

    // Equals true if server and delivery threads needs to be finished
    bool m_bKill;

    // Maximal amount of delivery threads
    unsigned short m_uMaxDeliverers;

    //Output stream for log building
    ostream* m_pLog;

    // Deliveries (received messages and error information) storage
    DeliveriesList m_Deliveries;

    // Threads handles
    vector<CWinThread*> m_pServerTrds; // Server threads
    CWinThread* m_pDelManTrd; // Delivery manager thread
    vector<CWinThread*> m_pDeliverTrds; // Deliverers threads

    // Critical section for server threads access
    CCriticalSection m_csServerTrds;

    // Critical section for deliverers threads access
    CCriticalSection m_csDeliverTrds;

    // Critical section for deliveries access
    CCriticalSection m_csDeliveries;

    // Critical section for sent packets storage access
    CCriticalSection m_csSntCommands;

    // Critical section for sent recipients access
    CCriticalSection m_csSessions;

    // Critical section for sent large commands storage access
    CCriticalSection m_csLargeCommands;

    // Critical section for network access
    CCriticalSection m_csNetwork;

    // Critical section for log access
    CCriticalSection m_csLog;

    // Stores amount of busy deliverers threads
    unsigned short m_uBusy;

    // Determines is a<b or not, taking overruning in the account (0xffffffff
    // is less than zero)
    inline static bool Less(unsigned __int32 a,unsigned __int32 b)
      {if (max(a,b)-min(a,b)>0x7fffffff) 
        return !(a<b); else return a<b;}

    // Bit mask which is to be set for confirmations
    static const unsigned __int16 m_iConfirm;

protected:
    // Fills id field of header, refered by head for data, which will be sent
    // to recipient addr. If recipient's address is for broadcasting, then
    // option Broadcast is to be set set beforehand. This function will also
    // set StartSession option, if needed
    void GetNextID(Header& head, IPAddr addr);

    // Returns timeout for the session with workstation addr or for
    // broadcasting, if parameter bcast equals true
    unsigned int GetTimeout(IPAddr addr, bool bcast);

    // Sets timeout to the value of parameter timeout for the session with
    // workstation addr or for broadcasting, if parameter bcast equals true.
    // Value will be set only if current value is zero
    void SetTimeout(IPAddr addr, bool bcast, unsigned int timeout);

    // Type definitions

    // Structures for sent packets
    struct SntCommandInfo {
        // Constructors
        SntCommandInfo():sbBody(*(new SmartBuffer()))
          {CI=NULL; uCount=0;} // Only for STL compliance
        SntCommandInfo(SmartBuffer& sb, DWORD time, 
                    unsigned long to):sbBody(sb)
          {ipTo=to; uCount=sb.GetPacketsCount(); CI=new CommandInfo[uCount]; 
           for (unsigned int i=0; i<uCount; i++)
             {CI[i].dwTime=time; CI[i].dwLTime=time;}}

        // Confirms receiving of i-th packet. Returns true if this object can
        // be excluded from sent commands list and false otherwise
        bool Confirm(unsigned int i);

        // Free memory, controlled by this sent command information storage
        inline void Free() {delete[] CI; if (sbBody.GetAutoDel()) delete &sbBody;};

        // Representation of recipients IP address
        unsigned long ipTo;
        // Reference to smart buffer
        SmartBuffer& sbBody;
        // Amount of messages in this command
        unsigned __int32 uCount;

        // Structure for single command info
        struct CommandInfo {
            // Constructor
            CommandInfo() {uResend=1; dwTime=0; dwLTime=0; bConfirmed=false;};
        
            // Increment period between sendings
            void IncResend() {if (uResend<16384) uResend<<=1;}
            // Dead timeout has elapsed
            bool IsDeadTimeout() {return uResend>=256;}

            // Period between sendings
            unsigned int uResend;
            // Creation time
            DWORD dwTime;
            // Last sending (or resending) time
            DWORD dwLTime;
            // Was confirmed or not
            bool bConfirmed;
        };

        // Array of commands' information
        CommandInfo* CI;
    };
    typedef list<SntCommandInfo> SntCommandInfoList;

    // Structures for session description
    struct SessionInfo {
        // Constructor
        SessionInfo()
          {id=rand()*rand(); timeout=0; 
           received.clear(); minwasset=false;}

        // Type for received messages list
        typedef list<unsigned __int32> RcvList;

        unsigned __int32 id; // Next id
        bool minwasset; // Was minimal id already set or it was not 
        unsigned int timeout; // Timeout
        RcvList received; // Ids of received packets
    };
    typedef map<IPAddr::IPSolid,SessionInfo> SessionsInfo;

    // Structures for storing parts of large packets
    struct LargeCommandInfo {
        // Constructors. Parameters:
        // +) command - command;
        // +) size - amount of data to be stored;
        // +) from - ip address of host, that sends this data;
        // +) id - first packet's id;
        // +) amount - amount of packets left in the message
        LargeCommandInfo(unsigned __int16 command, unsigned __int64 size, 
            unsigned long from, unsigned __int32 id, unsigned __int32 amount)
          {pRD=new CCTPReceivedData(command,size,from,NULL); 
           this->id=id; uCount=amount; received=new bool[uCount];
           for (unsigned int i=0; i<uCount; i++)
             received[i]=false;};
        LargeCommandInfo()
          {id=0; uCount=0; received=NULL; 
           pRD=NULL;}; // Only for STL compliance

        // Mark i-th part of commang
        // as received one. Returns true if all parts
        // were received and false otherwise
        inline bool GotPart(unsigned int i);
    
        // Free memory, controlled by this
        // large command information storage
        // (message's body is not destroyed)
        inline void Free() {delete[] received;};

        unsigned __int32 id; // First packet's id
        unsigned __int32 uCount; // Amount of packets for command
        bool* received; // Array of flags, which shows received or not
        CCTPReceivedData* pRD; // Points to received data
    };
    typedef list<LargeCommandInfo> LargeCommandInfoList;

    // Structures for special receivers
    struct SpecialReceiver {
        // Constructor
        SpecialReceiver(unsigned __int16 command, 
              NetReceiver* receiver, DeliveryType type)
          {this->command=command; 
           this->receiver=receiver;
           this->type=type;};
        SpecialReceiver()
        {command=0;receiver=NULL;
         type=(DeliveryType)NULL;};
         // For STL compliance

        unsigned __int16 command; // Command
        NetReceiver* receiver; // Receiver
        DeliveryType type;
        // Type of delivery to be sent to the receiver
    };
    typedef list<SpecialReceiver> SpecialReceiversList;

    // Storages and other data structures

    // Sessions information storage
    SessionsInfo m_Sessions;

    // Sent commands storage
    SntCommandInfoList m_SntCommands;

    // Large packets storage
    LargeCommandInfoList m_LargeCommands;

    // Points to receiver which will get messages and error information by
    // default (if no special receiver will be present)
    NetReceiver* m_DefReceiver;

    // Special receivers
    SpecialReceiversList m_Receivers;

    // Socket for data sending
    SOCKET m_SendSocket;

    // Local address, used for data receiving
    SOCKADDR_IN m_Local;

    // Port on which to work;
    unsigned short m_uPort;

    // Size of the data in packet
    unsigned __int16 m_uPacketDataSize;

    // Time settings
    Times m_Times;

    // Determines if this workstation is offline or not
    bool m_bSuspended;

    // Returns reference to corresponding session information. Broadcasting
    // session will be returned, if parameter bcast equals true
    SessionInfo& GetSessionInfo(IPAddr addr, bool bcast);
};

CCTPStatusDlg 类

此类允许显示一个对话框,该对话框显示 CTP 负载:存储中的元素数量和交付者线程的数量。它还提供暂停服务器的功能。该类有一个构造函数,它接受一个 CCTPNet 对象的引用,以便进行监视。CCTPStatusDlg 类的定义如下

class CCTPStatusDlg : public CDialog
{
public:
    // Constructor. Parameter ctp gives
    // a reverence to CTP fuctionality class,
    // to keep an eye on it. It will be refreshed
    // each cycle milliseconds
    CCTPStatusDlg(CCTPNet& ctp, UINT cycle, 
                     CWnd* pParent = NULL);

// Dialog Data
    //{{AFX_DATA(CCTPStatusDlg)
    enum { IDD = IDD_CTPSTATUS };
    //}}AFX_DATA

// Overrides
    //{{AFX_VIRTUAL(CCTPStatusDlg)
    public:
    virtual BOOL DestroyWindow();
    protected:
    virtual void DoDataExchange(CDataExchange* pDX);
    //}}AFX_VIRTUAL

// Implementation
protected:
    // Set suspend status
    void SetSuspendStatus();

    // Reference to CTP
    CCTPNet& m_CTP;

    // Refreshment timer
    UINT m_uTimer;

    // Refreshment cycle
    UINT m_uCycle;

    // Generated message map functions
    //{{AFX_MSG(CCTPStatusDlg)
    afx_msg void OnTimer(UINT nIDEvent);
    virtual BOOL OnInitDialog();
    afx_msg void OnShowWindow(BOOL bShow, UINT nStatus);
    afx_msg void OnBsuspend();
    //}}AFX_MSG
    DECLARE_MESSAGE_MAP()
};

请记住,如果您想在您的项目中使用此对话框,您必须将对话框模板 IDD_CTPSTATUS 和字符串 IDS_CTP_SUSPEND 以及 IDD_CTP_RESUME 从演示应用程序复制到您的项目。

这一切如何使用?

首先,将文件 CTPNet.hCTPNet.cppNetBasics.hNetBasics.cpp 添加到您的项目中。然后将以下指令放入 StdAfx.h

// Include MFC multithreading support
#include <afxmt.h>

// Include STL
#pragma warning(disable: 4786)
#pragma warning(push)
// Disable STL-critical warnings
#pragma warning(disable: 4245)
#pragma warning(disable: 4100)
#pragma warning(disable: 4663)
#pragma warning(disable: 4018)
#pragma warning(disable: 4097)
#include <map>
#include <list>
#include <vector>
#include <algorithm>
#include <iostream>
using namespace std;
// Enable STL-critical warnings
#pragma warning(pop)

// CRT library includes
#include <sys/timeb.h>
#include <time.h>
#include <math.h>

// Include Windows Sockets
#include <Winsock2.h>
#include <Ws2tcpip.h>

此代码在文件 NetIncludes.h 中提供。还需要将 WinSockets 库 ws2_32.lib 链接到您的项目(选择“项目” | “设置” | “链接”,然后在“对象/库模块”编辑字段中键入“ws2_32.lib”)。

然后,启动 Winsock。为此,例如,将以下代码放入项目主窗口的初始化函数中

WSADATA wsaData;
WSAStartup(MAKEWORD(2,2),&wsaData);

此后,放置以下代码以启动 CTP 服务器

m_pCTP = new CCTPNet(m_pCTPReceiver,1515);
// Server created suspended so it needs to be started manually
m_pCTP->SetSuspended(false);

上述代码的正确性基于 m_pCTPReceiverNetReceiver 的后代这一假设。例如,您可以将 NetReceiver 添加到主窗口的父级中。

测量

本文提供的演示应用程序允许尝试所述的 CTP 实现。它还包括在相同框架下 TCP 和 UDP 的实现,因此所有这些协议可以一起使用,并且显然可以进行比较(图 3)。

Fig. 3. Time of interchange via CTP, TCP and UDP (where possible)

图 3. CTP、TCP 和 UDP(如果可能)的交换时间(微秒)与命令数据大小的关系。

为了进行此实验,两个工作站的系统时钟通过 SNTP 与同一时间服务器使用 NetTime 2.0 进行同步。图中取了交换时间的极端值。

CTP 和 UDP 的相似结果表明,CTP 的实现不会占用大量关键资源。其开销足够小,可以忽略不计。

在处理正常命令和可以通过多个数据包传输的非大型命令时,CTP 比 TCP 快两倍。这是一个了不起的结果,因为集群中绝大多数交互都是使用正常消息进行的。启动计算的命令、查询某些值的请求以及对此类查询的响应都是小的。

TCP 更适合处理大型命令。然而,在集群计算中,大型数据块很少出现,例如,在任务分离阶段(即使在这里,也并非总是如此)。一个重要的注意事项是,CTP 对于大型数据块来说并不慢,因此它可以作为集群中的网络机制,注意上一段的内容。

此外,实验是在两个节点上进行的,因为这在这里更有趣:纯协议实现的吞吐量。对于数十个节点之间快速交换的比较结果对 CTP 来说会更令人满意,因为它的活动将保持不变,而 TCP 将在通道创建和重新创建上损失很多。对于 CTP,接收方是谁并不重要。

无法超越 TCP 的原因是它在内核级别,而 CTP 由应用程序实现。一方面,这是后者的缺点,但另一方面,它绝对独立且完整。

参考文献

  1. 琼斯 A., 奥伦德 J. Windows 网络编程 - Microsoft Press. 2000。
  2. 奈梅斯 E.,斯奈德 G.,西巴斯 S.,海因 T. R. Unix 系统管理手册。第三版 - Prentice Hall PTR. 2001。
  3. 瑙莫夫 L. CAME&L - 细胞自动机建模环境与库 // 细胞自动机。第六届细胞自动机研究与工业国际会议 (ACRI-2004)。2004。可从项目主页获取。

历史

  • 2004年4月19日 - CTP v. 1.0 发布。
  • 2004年9月16日 - CTP v. 1.1 发布。

    改进

    • 支持“智能缓冲区”;
    • 计时性能略有提升;
    • 修复了一些错误。
  • 2005年1月31日 - CTP v. 1.2 发布。

    改进

    • 现在考虑会话信息;因此现在我们有了自适应超时、智能重发等等;
    • 现在无需在系统注册表中存储任何内容;
    • 确认交换得到改善;
    • 收到的数据包存储概念已替换和改进;
    • 全面支持广播;
    • 支持多服务器;
    • “智能缓冲区”更多功能;
    • 功能丰富的调试和日志接口;
    • 所有超时和延迟都变得可调;
    • 计时性能显著提高;
    • 修复了大量错误。
© . All rights reserved.