命令传输协议(CTP) - 分布式或并行计算的新网络协议






4.85/5 (31投票s)
2004年4月19日
22分钟阅读

241517

7608
本文介绍了一种用于分布式或并行计算的新型网络协议的改进版本。通常,它仅适用于小型消息的快速、可靠且功能丰富的交换。提供了该协议的实现和演示项目。
CTP - 序列很重要
引言
首先,这里所说的“计算集群”是什么意思。集群是为某些特定目的而形成的多个工作站的联合体。计算集群是为繁重计算而构建的集群。它是一个对网络功能提出特殊要求的特定系统。高质量集群的网络机制主要特性是
- 快速数据交换。
- 可靠的数据传输。
- 支持广播。通常,某些网络中的所有工作站都参与计算实验,因此广播使控制变得更容易。
- 支持大量数据块交换。有时,例如,实验的初始条件可以用这样的数据块表示。
- 点对点网络。任何工作站都可以是数据源和数据目的地,因此它们同时是客户端和服务器。
事实上,大多数并行计算软件工具包都以库的形式呈现,它们使用标准网络协议 TCP/IP [1]。使用此协议存在许多缺点
- 数据交换速度慢。TCP 的“可靠性”和“通用性”带来了很多开销。此协议是通用协议,因此适用于在互联网等不稳定环境中工作,但在为计算开发的稳定(或相当稳定)系统中,可以获得更多好处。
- TCP 不支持广播。UDP 支持,但它不可靠,并且 UDP 数据报的大小限制为 65467 字节 [1]。
- 在数据交换之前创建逻辑通道的理念对于集群计算是多余的。首先因为集群通常是经过良好调优、运行良好的网络。其次因为,一些集群计算策略导致工作站之间无序交换。
- TCP 是一个基于流的协议,但对于确定的任务,有界块交换更可取,因为它允许明确地说出何时所有用于后续操作的数据已到达。
当然,可以针对集群计算中出现的特殊要求调整专门的网络协议。因此,CTP 是一种旨在满足任意任务需求的协议,这些任务需要快速消息交换的支持,并且可以作为消息接收的反应开始繁重计算。尽管其名称中的字母“P”表示“协议”,但它不仅仅是一个规范。CTP 是一种思想和工具包,它允许使用它。因此,它能够取代 MPI 实现、PVM 等产品。
理念
大多数现有的并行计算工具包都使用所谓的“消息”作为基本抽象。CTP 中使用的基本抽象是“命令”。命令是某人向某人发出做某事的指令(在大多数情况下,集群中的工作站正是以这种方式进行通信)或对此类指令的响应。从最后一句话中,可以得出结论,命令具有以下参数
- “某人” - 发送方。
- “某人” - 接收方。
- “某事” - 命令描述。
因此,首先,需要以某种方式定义发送方和接收方。为此,将使用 IP 地址。原因在于 IP 使用极其广泛,并且它完全满足要求(为所有工作站提供唯一标识符)。命令将由整数数字标识。
就所讨论的协议词汇而言,“命令”和“消息”实际上是同义词。“命令”是“消息”,但反之不总是成立。
CTP 需要满足上述集群网络要求。实现此目的的方法如下(与引言中顺序相同)
- 为了提高交换速度,UDP 将用作协议的基础。此外,使用 UDP,而无需深入到原始网络,将在未来使用户免受协议支持工具包安装带来的额外问题。
- 将实现数据交换的可靠性。每个发送的包都将存储,直到接收方确认收到数据为止。为了维护此机制,包需要提供标识符。标识将在发送方侧通过分配整数数字来执行。这些 ID 通常不能是唯一的,但对于每个发送方必须是唯一的。
- 广播支持是使用 UDP 作为基本协议的另一个论据。
- 将实现大量数据交换支持。如果将要发送的消息大于某个限制(默认情况下为 65400 字节),则需要将其分成较小的部分。这些部分将被编号并单独发送给接收方,一个接一个。在接收方侧,它们将被 объеди在一起以整理初始命令。一个重要的注意事项是,只有在所有部分都已收到后,接收方应用程序才会收到有关命令到达的信息。此类命令将被称为“大型命令”,但在实践中,大多数命令是“正常”命令(只需一个包进行传输)。
- 对于点对点交换,CTP 的实现应将客户端和服务器功能作为一个整体包含在内。
CTP/IP 与 OSI 模型 [2] 以及 UDP/IP 理念的关系如图 1 所示。
图 1. OSI 模型、UDP/IP 模型和 CTP/IP 模型之间的关系。
CTP 涵盖了从传输层到应用层的多个层次,这证明其责任范围从相对较低的级别开始,延伸到较高的级别。
内部世界
接下来的讨论以 CTP 基本概念、其属性和功能的混乱描述形式进行。阅读所有这些内容将形成一个关于内部正在发生什么事情的整体概念。选择这种奇怪的陈述形式,是因为系统性讨论需要同时从几个点开始,但不幸的是,这是不可能的。
CTP 的调试功能可能有助于更好地理解操作顺序。有一个功能可以将所有操作和事件的描述放置到给定的输出流。
标题
每个 CTP 数据包通常表示为报头加正文(数据)。报头结构如表 1 所示(按出现顺序)。
名称 | 大小(位) | 注释 |
---|---|---|
包大小 | 16 | 无符号整数。包的大小(包括报头)。 |
命令号 | 16 | 无符号整数。命令号(从 0 到 32767,最高位未设置)。如果最高位已设置,则数据包表示对具有相应命令号的消息的确认。 |
大型消息中的包号 | 32 | 无符号整数。对于大型命令 - 包号,从零到包总数(由报头的下一个字段给出)减一。对于正常命令 - 零。 |
消息的包总数 | 32 | 无符号整数。对于大型命令 - 发送所需包的总数。对于正常命令 - 零或一。 |
ID | 32 | 无符号整数。包的标识符。对于每个发送方必须是唯一的。 |
消息大小 | 64(仅使用 48) | 无符号整数。整个命令数据的大小(不包含任何报头)。因此,最大命令的大小是最大包数乘以最大包大小,即 232*65400。这等于 280890861158400 字节,或超过 255 TB。最后一个值允许将消息大小视为无限。显然,48 位足以存储大小值,但为了对齐分配了 64 位。 |
选项 | 8 | 位集合。每个位确定相应选项是否已设置。选项将在稍后讨论。 |
表 1. CTP 包头
可以计算出,报头的总大小为 25 字节。所有重要的传输参数,如发送方和接收方的 IP 地址和端口,都存储在 UDP 报头中。
每个数据包可以通过其发送方、接收方和 ID 完全识别。发送方以以下方式为每个接收方提供 ID 的唯一性:将要发送的下一个数据包的 ID 初始值取为伪随机数。发送每个数据包后,它将被递增。发送给接收方的第一个数据包必须用特殊选项标记(见下文),以允许接收方了解起始 ID 的值。
存储器
说明 CTP 进行数据包交换的操作序列的流程图如图 2 所示
图 2. CTP 实现的流程图。
在流程图中,提到了所谓的“存储”。有四个数据存储,在生命周期中累积,提供功能。
- 会话信息存储。它存储当前工作站与之通信的每个工作站的描述。其中,下一个数据包 ID、交换超时以及从该接收方收到的数据包描述都包含在内。
交换超时用于确定何时需要重新发送已发送但未确认的数据包。此超时是自适应的(因为集群可能是异构的,并且可能通过内网和互联网连接工作站)。最初,采用默认超时(默认情况下为 100 毫秒)。第一次交换后,其值取为所需时间乘以系数(默认情况下为 3)。
如果在超时期间未收到数据包到达的确认,则应重新发送数据包。重新发送之间的时间间隔将呈指数增长。如果数据包在 8 次重新发送(255 次超时)后仍未确认,则将生成错误消息“命令未确认时间过长”。如果超时设置为零,则此功能将关闭。
消息可以重新发送。因此,有必要保护用户免受多次接收同一消息的影响。这就是为什么为每个收件人存储已接收数据包的描述。它以有序列表的形式实现。第一个元素包含按顺序接收到的数据包的最大 ID。在此元素之后,可能还有更多 ID,对应于已接收但大于第一个元素的数据包。在此列表中插入每个新 ID 后,从第一个元素开始的序列将被截断。例如,假设此存储包含 {7, 9, 10, 11, 13, 14}。这意味着所有 ID 小于或等于 7 以及等于 9、10、11、13 和 14 的数据包都已收到。收到 ID 为 8 的数据包后,列表将变为 {11, 13, 14}。如果所有数据包都按顺序到达,则列表始终只包含一个元素。
ID 的值应在一个无限循环中(232-1 后是 0)。在此阶段,确定发送方生成的起始 ID 非常重要。
当第一条消息即将发送或已从工作站接收到,或者尚不清楚时,会向会话信息存储添加一个新条目。将有一个用于广播消息的特殊条目。
- 已发送命令存储。要发送命令,需要整理数据包。需要分配一些内存并填充数据包头和数据。事实是,它不会在发送后立即释放和取消分配,而是存储到已发送命令存储中。只有在所有数据包都已确认到达后,才能从已发送命令存储中删除记录。
这种思想可以针对“每个命令”而不是“每个数据包”实现(如 CTP 1.0 中),但第一种变体更可取。在这种情况下,所谓的“智能缓冲区”可以通过在数据包数据排列时预留和保护报头所需的内存来避免冗余内存分配。
- 大型命令存储 用于在分部分接收整个大型消息时进行整理。它存储总数量、部分接收状态向量以及用于编译的缓冲区。消息的每个部分,除了最后一个,都具有最大数据大小,因此部分可以轻松找到它们在缓冲区中的位置,因为它们知道自己的编号。当所有部分都已收到时,消息被认为是已整理好的,并且服务器会通知应用程序数据已到达。
- 交付存储。整个接收到的消息或错误描述被称为“交付”。生成后,它们将被添加到交付列表。然后,特殊的交付者线程将从列表中取出它们并将其传递给应用程序。
用面向对象编程的术语来说:实现接收方应用程序的类对象可以订阅以获取给定命令的交付。在这种情况下,相应的对象将收到有关命令到达以及与该命令相关的错误的信息。
此外,还需要一个默认接收器,用于接收有关没有订阅者的命令以及没有相关命令的常见错误(例如套接字创建错误)的信息。
确认
确认是带有空正文(仅报头)的数据包,它与已确认数据包的报头只有三个区别。在确认报头中
- 包大小设置为报头大小。
- 在命令号中,最高位已设置。
- 消息大小设置为零。
这可能被认为是一种低效的解决方案——用单独的确认来确认每个数据包,但这样做是为了通过使用选项提供更多功能。不要忘记 CTP 不仅仅是一种协议,还是一个工具包。
数据包发送后
接收方收到数据包后做的第一件事是检查是否已收到相同的包。如果已收到此类数据包,则表示发送方未能收到确认,因此必须再次发送确认,并且可以跳过此数据包的接收过程。
确认必须是“再次发送”,而不是“重发”。确认不存储在已发送数据包存储中,而是在需要时生成。
如果之前没有收到此类数据包,则需要存储其到达信息。
如果收到的数据包表示正常命令,则服务器通知应用程序数据已到达(创建交付并将其放入交付存储)。如果它是大型命令的一部分,则服务器将其存储到大型命令存储中。如果它是消息的最后一个剩余部分,则也应生成交付。
数据包找到其位置后,必须发送确认,然后接收方开始等待下一个数据包。
当发送方收到任何确认时,它将从已发送数据包存储中删除相应的记录。该机制,就像物理系统一样,渴望将其势能降至最低,以便尽快释放所有存储。
线程
协议功能的实现应该是多线程的。有三种类型的线程
- 服务器线程 接收数据包,实现确认支持、大型命令编排等。如果数据到达或错误信息出现,线程会将其添加到交付存储中。每隔一段时间(默认情况下为 100 毫秒),此线程会检查是否存在需要重新发送的数据包,并在必要时重新发送它们。服务器线程的数量可以是任意的,具体取决于任务。
- 交付者线程 检查交付列表,如果它不为空,则将第一个交付发送给相应的订阅者或默认接收器。如果线程长时间(默认情况下为 20 秒)不执行任何操作,它将被终止。在空闲循环中,线程可能会休眠一段时间(默认情况下为 20 毫秒)。
每个命令都是一个指令(或响应)。它可能指令做一些困难且持久的事情。因此,将交付者实现为单独的线程允许“按指令”计算某些东西,就在请求它的地方和时刻。然而,强烈建议不要浪费此功能。例如,不要在命令接收处理程序中使用模式对话框,因为这将毫无用处地占用交付者。
- 交付管理器线程 如果所有现有交付者都忙碌且交付列表不为空,则创建额外的交付者线程。当然,交付者的最大数量受某个值限制(默认情况下为 50)。协议机制力求减少负载。在空闲循环中,线程可能会休眠一段时间(默认情况下为 10 毫秒)。
选项
选项允许为网络添加有趣的功能。有五种可能的选项
DelAfterError
- 如果设置,则在接收方被告知其到达未被确认(生成错误描述后)后,将从已发送数据包存储中删除有关发送此数据包的信息。因此,此错误将只交付一次,如果数据未到达,它将丢失。NoResend
- 如果设置,即使此数据包未被确认,也不会重新发送。UniqueCommand
- 如果设置,则此数据包命令的确认将确认所有具有相同命令号并发送到给定接收方的数据包。不允许对大型消息使用此选项,以防止完整性损坏。Broadcast
- 如果设置,则此数据包将进行广播。选项本身对网络没有任何影响。通常,对于广播,用户必须指定接收方的 IP 地址,例如:255.255.255.255。但此选项提供了使用 CTP 进行广播所需的两个重要事项。首先,发送的消息必须获取其 ID,并且必须存储在会话信息存储中对应于“广播”的条目中。其次,此消息将获得被任意工作站确认的能力,因为无法事先知道谁将收到它。如果至少一个工作站确认,则认为该命令已确认。不建议(但可能)将其用于大型命令。StartSession
- 如果设置,则此数据包是发送方发送给此接收方的第一个数据包。因此,它带来的 ID 可以作为与此发送方会话的最小 ID。在检查数据包是否已收到时会考虑这一点。请注意,“会话”的概念意味着单向通道。如果两个工作站正在交换一些数据,那么它们在会话信息存储中都为对方有条目。
这些选项可以任意组合使用:单独使用,全部共同使用等等。
例如,将选项 ErrorOnce
、NoResend
和 UniqueCommand
按位或运算组合,可能对诸如“如果你还活着就回复我”(也称为“ping”)之类的命令很有用。对于频繁发送、小型且不携带信息,但表示响应或确认的命令——接收方正在工作。
Windows 实现
最初,CTP 是为 Windows 操作系统实现的,旨在成为 (Cellular Automata Modeling Environment & Library) 项目 [3] 中使用的网络机制的基础。请参见项目主页。当然,它也可以用于任何需要快速消息交换和“按指令”进行繁重计算的任意应用程序。
该协议的实现由一组类表示。实现 CTP 主要功能的类名为 CCTPNet
。CTP 实现中涉及的所有类的描述如下。
IPAddr 类
IPAddr
类的对象表示工作站的 IP 地址。除了源文件,此类别不需要任何解释。
union IPAddr { // Data type for solid representation typedef unsigned __int32 IPSolid; // Actual data struct IPBytes { unsigned char b1,b2,b3,b4; } Bytes; IPSolid Solid; // Constructors IPAddr() {SetLocalhost();}; IPAddr(unsigned char b1, unsigned char b2, unsigned char b3, unsigned char b4) {Bytes.b1=b1;Bytes.b2=b2;Bytes.b3=b3;Bytes.b4=b4;}; IPAddr(IPSolid l) {Solid=l;}; // Returns true is this ip-address refers to localhost (127.0.0.1) inline bool IsLocalhost() {return Bytes.b1==127&&Bytes.b2==0&&Bytes.b3==0&&Bytes.b4==1;}; // Returns true is this ip-address refers to broadcasting address // (255.255.255.255) inline bool IsBroadcasting() {return Bytes.b1==255&&Bytes.b2==255&&Bytes.b3==255&&Bytes.b4==255;}; // Returns via s and return value dotted // string representation of ip-address inline LPTSTR GetString(LPTSTR s) {sprintf(s,"%d.%d.%d.%d",Bytes.b1, Bytes.b2,Bytes.b3,Bytes.b4); return s;}; // Set stored ip address to value, represented with string s (in // dot-separated format). Returns true if succeeded and false otherwise bool FromString(LPTSTR s); // Set ip address to localhost (127.0.0.1) inline void SetLocalhost() {Bytes.b1=127;Bytes.b2=0;Bytes.b3=0;Bytes.b4=1;}; // Set ip address to broadcasting address (255.255.255.255) inline void SetBroadcast() {Bytes.b1=255;Bytes.b2=255;Bytes.b3=255;Bytes.b4=255;}; // Operators bool operator ==(unsigned long ip) {return Solid==ip;}; bool operator ==(IPAddr ip) {return Solid==ip.Solid;}; bool operator !=(unsigned long ip) {return Solid!=ip;}; bool operator !=(IPAddr ip) {return Solid!=ip.Solid;}; IPAddr& operator =(const unsigned long ip) {Solid=ip; return *this;}; IPAddr& operator =(const IPAddr ip) {Solid=ip.Solid; return *this;}; };
SmartBuffer 类
SmartBuffer
类的对象表示“智能缓冲区”,它使 CTP 的实现免于冗余内存分配。它会根据数据的特定大小(单个数据包中的最大数据量)动态地为数据包头预留空间。因此,用户只需将数据放入智能缓冲区。然后发送函数插入报头,数据包即可发送出去。此类的定义如下
class SmartBuffer { public: // Constructor. Parameters: // +) datasize - amount of data to be used; // +) autodel - if true then this buffer will be freed automatically by // protocol's implementation after it will be sent and confirmed if // needed. So, working with such buffer, you will have to create it with // operator new, but do not to delete it. If this parameter equals false // you will have to do new and delete manually // +) headsize - size of header of each packet; // +) maxdatasize - maximum size of data in single packet (without header) SmartBuffer(unsigned int datasize=0, bool autodel=true, unsigned int headsize=25, unsigned int maxdatasize=65400); // Constructor. Parameters: // +) fname - name of file to be stored in internal buffer as data; // +) datasize - amount of data to be stroed just before the file; // +) autodel - if true then this buffer will be freed automatically by // protocol's implementation after it will be sent and confirmed if // needed. So, working with such buffer, you will have to create it with // operator new, but do not to delete it. If this parameter equals false // you will have to do new and delete manually // +) headsize - size of header of each packet; // +) maxdatasize - maximum size of data in single packet (without header) SmartBuffer(LPCTSTR fname, unsigned int datasize=0, bool autodel=true, unsigned int headsize=25, unsigned int maxdatasize=65400); // Destructor virtual ~SmartBuffer() {delete[] m_pBuffer;}; // Access to key values // Header size inline unsigned int GetHeadSize() {return m_uHeadSize;}; // Data size inline unsigned int GetDataSize() {return m_uDataSize;}; void SetDataSize(unsigned int datasize); // Maximum data size for single packet inline unsigned int GetMaxDataSize() {return m_uMaxDataSize;}; // Allocated buffer size inline unsigned int GetBufferSize() {return m_uBufferSize;}; // Auto deleting inline bool GetAutoDel() {return m_bAutoDel;} inline void SetAutoDel(bool autodel) {m_bAutoDel=autodel;} // Access to key pointers // Returns begining of the buffer inline char* GetBufferBegin() {return m_pBuffer;}; // Returns current pointer inline void* GetCurPtr() {return m_pCurPtr;}; // Sets current pointer to pointer to data of i-th packet (i from zero to // GetPacketsCount()-1). Result false if index i is out of bounds inline bool CurPtrToDataPtr(unsigned int i) {char* res=GetDataPtr(i); if (res) {m_pCurPtr=res; return true;} else return false;}; // Sets current pointer to the begining of data inline void CurPtrToDataBegin() {m_pCurPtr=m_pBuffer+m_uHeadSize;}; // Access to packets // Returns amount of packets inline unsigned int GetPacketsCount() {return m_uBufferSize/(m_uHeadSize+m_uMaxDataSize)+ ((m_uBufferSize%(m_uHeadSize+m_uMaxDataSize))?1:0);}; // Returns pointer to header of i-th packet (i from zero to // GetPacketsCount()-1). Result is zero if index i is out of bounds inline char* GetHeadPtr(unsigned int i) {char* res=i*(m_uHeadSize+m_uMaxDataSize)+m_pBuffer; if (res>m_pBuffer+m_uBufferSize) return NULL; else return res;}; // Returns pointer to data of i-th packet (i from zero to // GetPacketsCount()-1). Result is zero if index i is out of bounds inline char* GetDataPtr(unsigned int i) {char* res=GetHeadPtr(i); if (res) return res+m_uHeadSize; else return NULL;}; // Returns size of i-th packet (i from zero to GetPacketsCount()-1), // including header. Only last packet's size can differ from header's size // plus maximum data size. Result is zero if index i is out of bounds inline unsigned int GetPacketSize(unsigned int i) {if (i<(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize)) return m_uHeadSize+m_uMaxDataSize; else if (i==(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize)) return (m_uBufferSize)%(m_uHeadSize+m_uMaxDataSize); else return 0;}; // Data and header access routines // Put data of size GetHeadSize() from src to the are of i-th header. // Returns true if data was copied successfully to the existing header etc // and false otherwise bool PutHead(void* src, unsigned int i); // Put byte of data bt to internal buffer from current pointer (if dest is // negative) or from dest-th byte.. Current pointer will be moved to the // end of put data (skipping headers) if movecur equals true. Returns true // if data was copied successfully bool PutDataByte(unsigned char bt, bool movecur=true, int dest=-1); // Put data of size size from src to internal buffer from current pointer // (if dest is negative) or from dest-th byte. Current pointer will be // moved to the end of put data (skipping headers) if movecur equals true. // Returns true if data was copied successfully, without truncation etc and // false otherwise bool PutData(void* src, unsigned int size, bool movecur=true, int dest=-1); // Put string to internal buffer from current pointer (if dest is negative) // or from dest-th byte. Current pointer will be moved to the end of put // data (skipping headers) if movecur equals true. Returns true if data was // copied successfully, without truncation etc and false otherwise inline bool PutDataString(char* str, bool movecur=true, int dest=-1) {return PutData(str,strlen(str)+1,movecur,dest);}; // Put data from file fname to internal buffer from current pointer // (if dest is negative) or from dest-th byte. Current pointer will be // moved to the end of put data (skipping headers) if movecur equals true. // Returns true if data was copied successfully, without truncation etc and // false otherwise bool PutDataFile(LPCTSTR fname, bool movecur=true, int dest=-1); // Trim the buffer, by cutting the content, excluding the part of buffer // from current pointer to the end. It is strongly to perform this // operation only after all buffer's modifications void Trim(); protected: // Calculates needed buffer size inline unsigned int GetNeededBufferSize(unsigned int datasize, unsigned int headsize, unsigned int maxdatasize) {return datasize?(datasize/maxdatasize*(headsize+maxdatasize)+ ((datasize%maxdatasize>0)?(datasize%maxdatasize+ headsize):0)):headsize;}; // Calculate pointer by dest (if negative - then by m_pCurPtr). Result // pointer will be put to ptr. If prtnsize is not NULL then portion size // will be also calculated and put to variable, pointed by prtnsize inline void DestToPtr(int dest, char*& ptr, unsigned int* prtnsize); // Data members bool m_bAutoDel; // Detele it automatically or not unsigned int m_uHeadSize; // Size of header unsigned int m_uDataSize; // Size of data unsigned int m_uMaxDataSize; // Maximum size of data in single packet unsigned int m_uBufferSize; // Size of allocated internal buffer char* m_pBuffer; // Pointer to internal buffer char* m_pCurPtr; // Current pointer };
NetSender 类
NetSender
类是 CCTPNet
的基类,它实现了 CTP 的主要功能。NetSender
仅用于描述任意协议的通用网络发送类的接口。
class NetSender { public: // Send smart buffer sb to address to. Parameter command represents command // id. Parameter options - sending options. If parameter storeiffail is // true then sent command will be stored in sent packets storage even if // sending fails and will not overwise. Returns true if succeeded (message // has gone) and false otherwise virtual bool Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options=0, bool storeiffail=true)=0; // Returns true if sender is working (is not suspended and so on) and false // otherwise. Default implementation returns true, because implementation // of many protocols cannot be switched off and cannot handle errors at all virtual bool IsWorking() {return true;}; };
NetReceiver 类
如果存在 NetSender
类,那么也必须存在 NetReceiver
类。NetReceiver
用于描述可以订阅数据到达和错误信息交付的对象的接口。
class NetReceiver { public: // Is called when have received data pointed by data virtual void OnReceive(void* data)=0; // Is called when there was an error, described with data pointed by data virtual void OnError(void* data)=0; };
如果一个对象要订阅交付,它的类必须是 NetReceiver
的后代(C++ 中多重继承的支持允许向任何类添加额外的父类)。成员函数 OnReceive
和 OnError
将分别在消息到达和发生错误时被调用。在第一种情况下,指针 data
将指向到达数据的描述;在第二种情况下,data
将指向由 NetSender
的后代生成的一些错误描述。
这些成员函数的参数是指向 void
的指针,而不是指向某个具体类的指针,因为 NetReceiver
类也面向任意协议。在使用 CTP 时,OnReceive
的参数将指向 CCTPReceivedData
类的对象,而 OnError
的参数将指向 CCTPErrorInfo
类的对象。
CCTPReceivedData 类
此类的对象描述并提供对接收到数据的访问。它不需要解释。
struct CCTPReceivedData { // Constructor. Parameters: // +) command - command; // +) size - amount of data to be stored; // +) from - ip address of host, that sends this data; // +) buf - points to buffer, which stores received data. If NULL then // data copying will be skipped (only allocation performs) CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf); // Destructor virtual ~CCTPReceivedData() {delete[] pBuf;}; unsigned __int16 command; // Command unsigned __int64 size; // Message size (48 bit) IPAddr from; // Host, that had sent this data char* pBuf; // Data };
CCTPErrorInfo 类
此类的对象描述了网络过程中发生的错误。
struct CTCPErrorInfo { // Constructor. Parameters: // +) type - error type. When it occurs: // +) 0 - on socket creation; // +) 1 - on socket binding or tuning; // +) 2 - on data sending; // +) 3 - on data receiving; // +) code - WinSock error code; // +) addr - address of host, which causes error (not always can be // interpreted, if can not the just equals localhost) CTCPErrorInfo(unsigned char type,int code,IPAddr addr) {this->type=type; this->code=code; this->addr=addr; GetTimeStamp(timestamp);}; // Put time stamp to string s and returns it static char* GetTimeStamp(char* s); unsigned char type; // Error type int code; // WinSock error code IPAddr addr; // Address of host, which causes error char timestamp[22]; // Time stamp, when error occurred };
上面的一切都必须清楚,除了一个情况:为什么 timestamp
需要作为此类的一个字段?事实是,网络错误发生的时间可能非常重要,例如,用于构建日志文件。但是,错误描述交付的时间可能与发生时间大相径庭。为了避免此类错误,决定将时间戳作为 CCTPErrorInfo
类的一个字段,并将在对象构造期间填充。可以使用静态成员函数 GetTimeStamp
随时检索当前时间的时间戳,精确到千分之一秒。
CCTPNet 类
此类实现了 CTP 的主要功能(客户端和服务器同时)。以下定义描述了成员
class CCTPNet: public NetSender { public: // Data structures // Packet header #pragma pack(push) #pragma pack(1) struct Header { // Constructor Header() {size=0;command=0;number=0;amount=0;id=0;messize=0;options=0;} void ToStream(ostream& out); unsigned __int16 size ; // Packet size (16) unsigned __int16 command ; // Command (16) unsigned __int32 number ; // Packet number (from zero to amount-1) (32) unsigned __int32 amount ; // Amount of packets in the command (32) unsigned __int32 id ; // Packet id (32) unsigned __int64 messize ; // Message size (64, but 48 are used) unsigned __int8 options ; // Options (8) }; #pragma pack(pop) // Options bits enum Options { // Delete sent command from the storage after error was // generated DelAfterError=0x01, // Do not resend this packet even if it was not confirmed NoResend=0x02, // Confirmation of this packets command will confirm all packets with // the same command, that was sent to same recipient. // NB: It is not recommended to use this option with multipacket // messages to protect it from integrity corruption UniqueCommand=0x04, // Broadcast this message (message with this option will be confirmed // from arbitrary recipient) Broadcast=0x08, // Mark packet, which is first in the session (in the interchange with // given recipient) // Note: This option is used by CTP internal world and is not needed to // be set by user StartSession=0x10 }; // Set of options, which appropriate for ping static const unsigned __int8 OptPing; // Structures for messages and error information delivery enum DeliveryType { ReceivedData, ErrorInfo }; struct Delivery { // Constructor Delivery(NetReceiver* target,CCTPErrorInfo* data) {this->target=target; this->data=data; this->type=DeliveryType::ErrorInfo;}; Delivery(NetReceiver* target,CCTPReceivedData* data) {this->target=target; this->data=data; this->type=DeliveryType::ReceivedData;}; Delivery() {target=NULL; data=NULL; type=(DeliveryType)NULL;}; // Only for STL compliance NetReceiver* target; // Receiver void* data; // Data DeliveryType type; // Delivery type }; typedef list<Delivery> DeliveriesList; // Time settings storage structure struct Times { // Constructor, which sets defaults Times() { uMultiplier= 3; uDefTimeout= 100; uSleepOnDestroy= 50; uSleepSuspended= 10; uSleepDelMan= 10; uSleepNothing= 20; uPeriodDestroy= 2000; uPeriodAutoDest= 20000; uPeriodCheckResend=100; }; // Multiplier for the time, needed for single transfer, to determine // timeout unsigned int uMultiplier; // Default timeout unsigned int uDefTimeout; // Sleeping time during waiting for desroying unsigned int uSleepOnDestroy; // Sleeping time when suspended unsigned int uSleepSuspended; // Sleeping time in deriveries manager unsigned int uSleepDelMan; // Sleeping time when server has nothing to do unsigned int uSleepNothing; // Period while waiting for desroying, after which working threads will // be stopped forcedly unsigned int uPeriodDestroy; // If deliverer will do nothing during this period it will be destroyed unsigned int uPeriodAutoDest; // Period of checking if some packets are to be resent unsigned int uPeriodCheckResend; }; // Constructor creates server with all necessary parameter and tunes client // for fast data sending: // +) receiver - default receiver of arrived data and errors; // +) port - number of port, which to listen; // +) servers - amount of setvers to be started; // +) times - pointer to time settings storage structure (if NULL then // defaults will be used); // +) log - points to output stream for gebug log building (if NULL then no // output will be produced. // +) packetdatasize - value of maximum data size to be send in single // packet (if message is bigger than it is the "large message"); // +) maxthreads - maximum amount of deliverers threads CCTPNet(NetReceiver* receiver, unsigned short port, unsigned short servers=1,Times* times=NULL,ostream* log=NULL, unsigned __int16 packetdatasize=65400, unsigned short maxthreads=50); // Destructor virtual ~CCTPNet(); // Parameter access routines // Time settins routines const Times& GetTimes() {return m_Times;} void SetTimes(Times& times) {m_Times=times;} // Port routines unsigned short GetPort() {return m_uPort;} void SetPort(unsigned short port) {closesocket(m_SendSocket); closesocket(m_RecvSocket); FreeSntCommands(); FreeSessions(); FreeLargeCommands(); m_uPort=port; CreateSockets();} // Packet data size routines unsigned __int16 GetPacketDataSize() {return m_uPacketDataSize;} void SetPacketDataSize(unsigned __int16 ps) {delete[] m_pBuffer; m_uPacketDataSize=ps; m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];} // Maximal threads amount routines void SetMaxDeliverers(unsigned short maxthreads) {m_uMaxDeliverers=maxthreads;}; unsigned short GetMaxDeliverers() {return m_uMaxDeliverers;}; // Info target routines NetReceiver* GetDefaultReceiver() {return m_DefReceiver;} void SetDefaultReceiver(NetReceiver* receiver) {m_DefReceiver=receiver;} // Sets special receiver receiver for command command and type type. If // receiver for definite command and delivery type already exists it will // be replaced void AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type); // Delete receiver receiver from special receivers list void DeleteSpecialReceiver(NetReceiver* receiver); // Returns receiver for command command and type type NetReceiver* GetReceiver(unsigned __int16 command, DeliveryType type); // Suspending status routines bool GetSuspended() {return m_bSuspended;}; void SetSuspended(bool suspended) {m_bSuspended=suspended;}; virtual bool IsWorking() {return !m_bSuspended;}; // Operations // Returns size or datagrams header static unsigned __int16 GetHeaderSize() {return sizeof(Header);} // Send smart buffer sb to address to. Parameter command represents command // id. Parameter options - sending options. If parameter storeiffail is // true then sent command will be stored in sent packets storage even if // sending fails and will not otherwise. Returns true if succeeded (message // has gone) and false otherwise virtual bool Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options=0, bool storeiffail=true); // Send dataless packet (header only). Parameter command represents command // id. Parameter options - sending options. If parameter storeiffail is // true then sent command will be stored in sent packets storage even if // sending fails and will not otherwise. Returns true if succeeded (message // has gone) and false otherwise bool Send(unsigned __int16 command, IPAddr to, unsigned __int8 options=0, bool storeiffail=true) {return Send(*(new SmartBuffer()), command,to,options,storeiffail);}; // Save information about packet received from from with header head. // Returns true if new message was received and false otherwise bool SaveRcvPacket(unsigned long from,Header* head); // Mark packet sent to to with header pointed by header as confirmed. // Memory will be cleared if possible void ConfirmSntPacket(unsigned long to,Header* header); // Send to to confirmation of receipt of the packet with header, pointed by // header void SendConfirmation(unsigned long to,Header header); // Arrange large packet received from from with hearer pointed by head. // Function returns true if solid message is arranged after last packet bool ArrangeLargeCommand(unsigned long from,Header* head); // Resend packets, which have not got confimational commands void ResendNotConfirmedData(); // Determines is command is confirmation of command or not inline bool IsConfirmation(unsigned __int16 command) {return (command&m_iConfirm)!=0;}; // Retrieving status information functions // Returns amount of entries in sent commands storage inline unsigned int GetSntCommandsCount() {return m_SntCommands.size();}; // Returns amount of entries in created sessions information storege inline unsigned int GetSessionsCount() {return m_Sessions.size();}; // Returns amount of entries in large messages storage inline unsigned int GetLrgMessagesCount() {return m_LargeCommands.size();}; // Returns current amount of deliverer threads inline unsigned int GetDelThreadsCount() {return m_pDeliverTrds.size();}; // Returns current amount of busy deliverer threads inline unsigned int GetBusyDelThreadsCount() {return m_uBusy;}; // Returns current amount of deliveries inline unsigned int GetDelCount() {return m_Deliveries.size();}; protected: // Free buffers of all sent packets void FreeSntCommands(); // Free information about received packets void FreeSessions(); // Free buffers of large packets void FreeLargeCommands(); // Free all planned deliveries void FreeDeliveries(); // Actually send packet pointed with buf to recipient to. Returns true if // succeeded and false otherwise bool SendPacket(char* buf, unsigned long to); // Creates sending and receiving sockets. Returns true if succeeded and // false otherwise bool CreateSockets(); // Check the options validity in the given header void CheckupOptions(Header& header); public: // Socket for data receiving SOCKET m_RecvSocket; // Receiving buffer char* m_pBuffer; // Equals true if server and delivery threads needs to be finished bool m_bKill; // Maximal amount of delivery threads unsigned short m_uMaxDeliverers; //Output stream for log building ostream* m_pLog; // Deliveries (received messages and error information) storage DeliveriesList m_Deliveries; // Threads handles vector<CWinThread*> m_pServerTrds; // Server threads CWinThread* m_pDelManTrd; // Delivery manager thread vector<CWinThread*> m_pDeliverTrds; // Deliverers threads // Critical section for server threads access CCriticalSection m_csServerTrds; // Critical section for deliverers threads access CCriticalSection m_csDeliverTrds; // Critical section for deliveries access CCriticalSection m_csDeliveries; // Critical section for sent packets storage access CCriticalSection m_csSntCommands; // Critical section for sent recipients access CCriticalSection m_csSessions; // Critical section for sent large commands storage access CCriticalSection m_csLargeCommands; // Critical section for network access CCriticalSection m_csNetwork; // Critical section for log access CCriticalSection m_csLog; // Stores amount of busy deliverers threads unsigned short m_uBusy; // Determines is a<b or not, taking overruning in the account (0xffffffff // is less than zero) inline static bool Less(unsigned __int32 a,unsigned __int32 b) {if (max(a,b)-min(a,b)>0x7fffffff) return !(a<b); else return a<b;} // Bit mask which is to be set for confirmations static const unsigned __int16 m_iConfirm; protected: // Fills id field of header, refered by head for data, which will be sent // to recipient addr. If recipient's address is for broadcasting, then // option Broadcast is to be set set beforehand. This function will also // set StartSession option, if needed void GetNextID(Header& head, IPAddr addr); // Returns timeout for the session with workstation addr or for // broadcasting, if parameter bcast equals true unsigned int GetTimeout(IPAddr addr, bool bcast); // Sets timeout to the value of parameter timeout for the session with // workstation addr or for broadcasting, if parameter bcast equals true. // Value will be set only if current value is zero void SetTimeout(IPAddr addr, bool bcast, unsigned int timeout); // Type definitions // Structures for sent packets struct SntCommandInfo { // Constructors SntCommandInfo():sbBody(*(new SmartBuffer())) {CI=NULL; uCount=0;} // Only for STL compliance SntCommandInfo(SmartBuffer& sb, DWORD time, unsigned long to):sbBody(sb) {ipTo=to; uCount=sb.GetPacketsCount(); CI=new CommandInfo[uCount]; for (unsigned int i=0; i<uCount; i++) {CI[i].dwTime=time; CI[i].dwLTime=time;}} // Confirms receiving of i-th packet. Returns true if this object can // be excluded from sent commands list and false otherwise bool Confirm(unsigned int i); // Free memory, controlled by this sent command information storage inline void Free() {delete[] CI; if (sbBody.GetAutoDel()) delete &sbBody;}; // Representation of recipients IP address unsigned long ipTo; // Reference to smart buffer SmartBuffer& sbBody; // Amount of messages in this command unsigned __int32 uCount; // Structure for single command info struct CommandInfo { // Constructor CommandInfo() {uResend=1; dwTime=0; dwLTime=0; bConfirmed=false;}; // Increment period between sendings void IncResend() {if (uResend<16384) uResend<<=1;} // Dead timeout has elapsed bool IsDeadTimeout() {return uResend>=256;} // Period between sendings unsigned int uResend; // Creation time DWORD dwTime; // Last sending (or resending) time DWORD dwLTime; // Was confirmed or not bool bConfirmed; }; // Array of commands' information CommandInfo* CI; }; typedef list<SntCommandInfo> SntCommandInfoList; // Structures for session description struct SessionInfo { // Constructor SessionInfo() {id=rand()*rand(); timeout=0; received.clear(); minwasset=false;} // Type for received messages list typedef list<unsigned __int32> RcvList; unsigned __int32 id; // Next id bool minwasset; // Was minimal id already set or it was not unsigned int timeout; // Timeout RcvList received; // Ids of received packets }; typedef map<IPAddr::IPSolid,SessionInfo> SessionsInfo; // Structures for storing parts of large packets struct LargeCommandInfo { // Constructors. Parameters: // +) command - command; // +) size - amount of data to be stored; // +) from - ip address of host, that sends this data; // +) id - first packet's id; // +) amount - amount of packets left in the message LargeCommandInfo(unsigned __int16 command, unsigned __int64 size, unsigned long from, unsigned __int32 id, unsigned __int32 amount) {pRD=new CCTPReceivedData(command,size,from,NULL); this->id=id; uCount=amount; received=new bool[uCount]; for (unsigned int i=0; i<uCount; i++) received[i]=false;}; LargeCommandInfo() {id=0; uCount=0; received=NULL; pRD=NULL;}; // Only for STL compliance // Mark i-th part of commang // as received one. Returns true if all parts // were received and false otherwise inline bool GotPart(unsigned int i); // Free memory, controlled by this // large command information storage // (message's body is not destroyed) inline void Free() {delete[] received;}; unsigned __int32 id; // First packet's id unsigned __int32 uCount; // Amount of packets for command bool* received; // Array of flags, which shows received or not CCTPReceivedData* pRD; // Points to received data }; typedef list<LargeCommandInfo> LargeCommandInfoList; // Structures for special receivers struct SpecialReceiver { // Constructor SpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type) {this->command=command; this->receiver=receiver; this->type=type;}; SpecialReceiver() {command=0;receiver=NULL; type=(DeliveryType)NULL;}; // For STL compliance unsigned __int16 command; // Command NetReceiver* receiver; // Receiver DeliveryType type; // Type of delivery to be sent to the receiver }; typedef list<SpecialReceiver> SpecialReceiversList; // Storages and other data structures // Sessions information storage SessionsInfo m_Sessions; // Sent commands storage SntCommandInfoList m_SntCommands; // Large packets storage LargeCommandInfoList m_LargeCommands; // Points to receiver which will get messages and error information by // default (if no special receiver will be present) NetReceiver* m_DefReceiver; // Special receivers SpecialReceiversList m_Receivers; // Socket for data sending SOCKET m_SendSocket; // Local address, used for data receiving SOCKADDR_IN m_Local; // Port on which to work; unsigned short m_uPort; // Size of the data in packet unsigned __int16 m_uPacketDataSize; // Time settings Times m_Times; // Determines if this workstation is offline or not bool m_bSuspended; // Returns reference to corresponding session information. Broadcasting // session will be returned, if parameter bcast equals true SessionInfo& GetSessionInfo(IPAddr addr, bool bcast); };
CCTPStatusDlg 类
此类允许显示一个对话框,该对话框显示 CTP 负载:存储中的元素数量和交付者线程的数量。它还提供暂停服务器的功能。该类有一个构造函数,它接受一个 CCTPNet
对象的引用,以便进行监视。CCTPStatusDlg
类的定义如下
class CCTPStatusDlg : public CDialog { public: // Constructor. Parameter ctp gives // a reverence to CTP fuctionality class, // to keep an eye on it. It will be refreshed // each cycle milliseconds CCTPStatusDlg(CCTPNet& ctp, UINT cycle, CWnd* pParent = NULL); // Dialog Data //{{AFX_DATA(CCTPStatusDlg) enum { IDD = IDD_CTPSTATUS }; //}}AFX_DATA // Overrides //{{AFX_VIRTUAL(CCTPStatusDlg) public: virtual BOOL DestroyWindow(); protected: virtual void DoDataExchange(CDataExchange* pDX); //}}AFX_VIRTUAL // Implementation protected: // Set suspend status void SetSuspendStatus(); // Reference to CTP CCTPNet& m_CTP; // Refreshment timer UINT m_uTimer; // Refreshment cycle UINT m_uCycle; // Generated message map functions //{{AFX_MSG(CCTPStatusDlg) afx_msg void OnTimer(UINT nIDEvent); virtual BOOL OnInitDialog(); afx_msg void OnShowWindow(BOOL bShow, UINT nStatus); afx_msg void OnBsuspend(); //}}AFX_MSG DECLARE_MESSAGE_MAP() };
请记住,如果您想在您的项目中使用此对话框,您必须将对话框模板 IDD_CTPSTATUS
和字符串 IDS_CTP_SUSPEND
以及 IDD_CTP_RESUME
从演示应用程序复制到您的项目。
这一切如何使用?
首先,将文件 CTPNet.h、CTPNet.cpp、NetBasics.h 和 NetBasics.cpp 添加到您的项目中。然后将以下指令放入 StdAfx.h
// Include MFC multithreading support #include <afxmt.h> // Include STL #pragma warning(disable: 4786) #pragma warning(push) // Disable STL-critical warnings #pragma warning(disable: 4245) #pragma warning(disable: 4100) #pragma warning(disable: 4663) #pragma warning(disable: 4018) #pragma warning(disable: 4097) #include <map> #include <list> #include <vector> #include <algorithm> #include <iostream> using namespace std; // Enable STL-critical warnings #pragma warning(pop) // CRT library includes #include <sys/timeb.h> #include <time.h> #include <math.h> // Include Windows Sockets #include <Winsock2.h> #include <Ws2tcpip.h>
此代码在文件 NetIncludes.h 中提供。还需要将 WinSockets 库 ws2_32.lib 链接到您的项目(选择“项目” | “设置” | “链接”,然后在“对象/库模块”编辑字段中键入“ws2_32.lib”)。
然后,启动 Winsock。为此,例如,将以下代码放入项目主窗口的初始化函数中
WSADATA wsaData; WSAStartup(MAKEWORD(2,2),&wsaData);
此后,放置以下代码以启动 CTP 服务器
m_pCTP = new CCTPNet(m_pCTPReceiver,1515); // Server created suspended so it needs to be started manually m_pCTP->SetSuspended(false);
上述代码的正确性基于 m_pCTPReceiver
是 NetReceiver
的后代这一假设。例如,您可以将 NetReceiver
添加到主窗口的父级中。
测量
本文提供的演示应用程序允许尝试所述的 CTP 实现。它还包括在相同框架下 TCP 和 UDP 的实现,因此所有这些协议可以一起使用,并且显然可以进行比较(图 3)。
图 3. CTP、TCP 和 UDP(如果可能)的交换时间(微秒)与命令数据大小的关系。
为了进行此实验,两个工作站的系统时钟通过 SNTP 与同一时间服务器使用 NetTime 2.0 进行同步。图中取了交换时间的极端值。
CTP 和 UDP 的相似结果表明,CTP 的实现不会占用大量关键资源。其开销足够小,可以忽略不计。
在处理正常命令和可以通过多个数据包传输的非大型命令时,CTP 比 TCP 快两倍。这是一个了不起的结果,因为集群中绝大多数交互都是使用正常消息进行的。启动计算的命令、查询某些值的请求以及对此类查询的响应都是小的。
TCP 更适合处理大型命令。然而,在集群计算中,大型数据块很少出现,例如,在任务分离阶段(即使在这里,也并非总是如此)。一个重要的注意事项是,CTP 对于大型数据块来说并不慢,因此它可以作为集群中的网络机制,注意上一段的内容。
此外,实验是在两个节点上进行的,因为这在这里更有趣:纯协议实现的吞吐量。对于数十个节点之间快速交换的比较结果对 CTP 来说会更令人满意,因为它的活动将保持不变,而 TCP 将在通道创建和重新创建上损失很多。对于 CTP,接收方是谁并不重要。
无法超越 TCP 的原因是它在内核级别,而 CTP 由应用程序实现。一方面,这是后者的缺点,但另一方面,它绝对独立且完整。
参考文献
- 琼斯 A., 奥伦德 J. Windows 网络编程 - Microsoft Press. 2000。
- 奈梅斯 E.,斯奈德 G.,西巴斯 S.,海因 T. R. Unix 系统管理手册。第三版 - Prentice Hall PTR. 2001。
- 瑙莫夫 L. CAME&L - 细胞自动机建模环境与库 // 细胞自动机。第六届细胞自动机研究与工业国际会议 (ACRI-2004)。2004。可从项目主页获取。
历史
- 2004年4月19日 - CTP v. 1.0 发布。
- 2004年9月16日 - CTP v. 1.1 发布。
改进
- 支持“智能缓冲区”;
- 计时性能略有提升;
- 修复了一些错误。
- 2005年1月31日 - CTP v. 1.2 发布。
改进
- 现在考虑会话信息;因此现在我们有了自适应超时、智能重发等等;
- 现在无需在系统注册表中存储任何内容;
- 确认交换得到改善;
- 收到的数据包存储概念已替换和改进;
- 全面支持广播;
- 支持多服务器;
- “智能缓冲区”更多功能;
- 功能丰富的调试和日志接口;
- 所有超时和延迟都变得可调;
- 计时性能显著提高;
- 修复了大量错误。