65.9K
CodeProject 正在变化。 阅读更多。
Home

QUDP - 一种可靠的 UDP 协议

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2021 年 11 月 13 日

CPOL

3分钟阅读

viewsIcon

11035

一种基于队列的 UDP 协议

引言

我最近的任务是设计一个基于 UDP 的可靠协议。 它是在 C++ 中开发的,并且是从一个简单队列的想法演变而来的。 它的设计是分步的,每一步都以某种方式降低了底层通信机制的可靠性,直到达到 UDP 级别的可靠性(丢失、重复和重新排序的数据包)。 每一步都经过单元测试和集成/压力测试。 在大部分开发过程中,我使用 FIFO 来模拟底层网络。 我喜欢感受数据的行为方式,并且在用户空间中看到这些数据是一种很好的方法。

我描述了我采取的每一步,以及与该步骤相关的代码链接。 请随时获取任何步骤中的代码,进行尝试并将其朝着其他方向发展。 我在文章末尾列出了一些潜在的改进。

流程

步骤 1

通过线程安全队列打包/解包在两个线程之间发送的数据 - 基准行为。

第二步

将代码重构为发布者和订阅者。 发布者和订阅者之间的两个队列将模拟它们之间的双向网络通信。 访问这些内部队列将能够模拟网络错误。

步骤 3

修改消费者以容忍模拟网络上的重复和乱序数据包。 这是在解决数据丢失问题之前完成的,因为客户端重发可能会导致重新排序和重复的数据包,因此首先解决此问题。 消费者确认最后交付的帧,但生产者暂时放弃这些确认。

重构了“内部网络”,为嘈杂的模拟和 UDP 实现做好准备。

步骤 4

添加了对丢失数据的支持。 生产者维护一个待处理的帧队列,等待消费者确认。 最旧的未确认帧在超时后重新发送。 ACK 用于清除消费者已交付的帧的待处理列表。 生产者侧的待处理帧队列的大小是固定的,因此引入了生产者和消费者之间的基本形式的流量控制。

添加了压力测试。 单元测试中存在时间依赖性 -> 应该重构生产者以公开内部结构以进行更好的测试

将实体重新排列到它们自己的文件中。

步骤 5

在生产者和消费者之间添加了日志记录和 UPD 插件。

Using the Code

以下是 QUDP 如何使用的一个示例。 客户端代码创建一个 UDPNetwork 对象,并在构造函数中使用其端口号,然后在 QConsumer 的构造函数中使用它。

auto consumer = std::thread([]()
{
    std::shared_ptr<INetwork> qudp(new UdpNetwork(31415));
    auto qConsumer = std::make_unique<QConsumer<SignalData>>(qudp);
    while (true)
    {
        SignalData data;
        qConsumer->DeQ(data);
        printf("Time stamp %f \t\t Signal %f\n", data.mTimeStamp_sec, data.mValue);
    }
});

服务器端代码创建一个 UDPNetwork 对象,并在构造函数中使用客户端的 IP 地址和端口号。 然后在 QProducer 的构造函数中使用它。

auto producer = std::async(std::launch::async, []()
{
     auto processStart = system_clock::now();
     duration<int, std::milli> sleepTime_ms(10);
     std::shared_ptr<INetwork> qudp(new UdpNetwork("127.0.0.1", 31415));
     auto qProducer = std::make_unique<QProducer<SignalData>>(qudp);
     while (true)
     {
         std::this_thread::sleep_for(sleepTime_ms); 
         const auto uSecSinceStart = 
               duration_cast<microseconds>(system_clock::now() - processStart);
         const auto signal = GenerateSignal(uSecSinceStart);
         const auto secSinceStart = static_cast<double>(uSecSinceStart.count()) / uSecInASec;
         SignalData data{signal, secSinceStart};
         qProducer->EnQ(data);
      }
});

完整的演示代码可以在这里这里找到。

改进

一些我不满意的地方

  • 生产者和消费者之间没有初始化握手。 如果生产者在消费者之前启动,则初始数据包会丢失并且必须重新发送。 从这种情况中恢复需要一段时间。
  • 生产者侧的待处理队列大小和超时需要调整。 在高发送速率下,丢失的帧往往会短暂地阻止后续数据包的传输。
  • 协议的 ACK 确认部分不足以维持数据帧丢失情况下的稳定数据吞吐量。 来自消费者的 NACK(否定确认)是否可以减少生产者侧的停顿?

但这是一个有趣的小项目,我期待进一步开发它。 :)

历史

  • 2021 年 11 月 14 日 - 初始版本
© . All rights reserved.