65.9K
CodeProject 正在变化。 阅读更多。
Home

健壮的 C++:队列模板

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2020 年 6 月 14 日

GPL3

4分钟阅读

viewsIcon

9217

downloadIcon

141

实现不管理内存的可损坏队列

引言

如果您正在使用 STL 并需要队列,您可能会选择 `std::deque` 或 `std::list`。然而,对于某些系统来说,使用这些模板可能会有问题。

  1. 在有许多短队列的系统中,它们的内存开销可能很大。
  2. 它们会分配和管理内存,这可能与硬实时系统需要提供可预测响应时间的需求相冲突。
  3. 它们隐藏了队列数据结构,使得健壮的系统难以从由于不安全的临界区或内存损坏引起的队列损坏中恢复。

本文介绍了一些模板,当这些问题使得使用 STL 模板不理想时,可以使用这些模板。

背景

本文中的模板来自 Robust Services Core (RSC),这是一个用于开发健壮 C++ 应用程序的开源框架。这些模板是:

  • Q1Way,用于单向队列;
  • Q2Way,用于双向队列。

由于它们的实现类似,本文只介绍 Q1Way,因为它使用更频繁。Q2Way 仅用于性能原因,当元素需要经常从相对较长的队列的中间移除时。Q2Way 还支持反向迭代,但应用程序很少需要此功能。

Using the Code

代码已编辑,删除了与模板非核心相关的内容。这些内容出现在附加的 *.zip 文件中的完整代码版本中,如果您不使用 RSC,可以删除它们。

设计的核心是,一个可队列对象定义一个 Q1Link 成员,该成员提供指向队列中下一个对象的链接。因此,模板不需要为队列中的元素分配内存。

这是 Q1Link 的代码,除了它的析构函数。

//  Link for an item on a one-way queue.  An object that resides on a one-way
//  queue includes this as a member and implements a LinkDiff function that
//  returns the distance between the top of the object and its Q1Link member.
//
class Q1Link
{
   template<class T> friend class Q1Way;
public:
   //  Public because an instance of this class is included in objects that
   //  can be queued.
   //
   Q1Link() : next(nullptr) { }

   //  Public because an instance of this class is included in objects that
   //  can be queued.
   //
   ~Q1Link();

   //  Deleted to prohibit copying.
   //
   Q1Link(const Q1Link& that) = delete;
   Q1Link& operator=(const Q1Link& that) = delete;

   //  Returns true if the item is on a queue.
   //
   bool IsQueued() const { return next != nullptr; }

   //  Returns a string for displaying the link.
   //
   std::string to_str() const
   {
      std::ostringstream stream;
      stream << next;
      return stream.str();
   }
private:
   //  The next item in the queue.  Because Q1Way uses circular queues, a
   //  value of nullptr means that the item is not on a queue.
   //
   Q1Link* next;
};

对象在被删除之前应该被从队列中移除。但如果它仍然在队列中,Q1Link 的析构函数会移除它,以防止损坏的链接损坏队列并使后续对象成为孤儿。

Q1Link::~Q1Link()
{
   //  If the item is still queued, exqueue it.  This is a serious problem if
   //  it is the tail item, because it will leave the queue head pointing to
   //  a deleted item.
   //
   if(next == nullptr) return;

   auto prev = this;
   auto curr = next;
   auto i = INT16_MAX;

   while((i >= 0) && (curr != nullptr))
   {
      if(curr == this)
      {
         auto before = INT16_MAX - i;

         if(before == 0)
         {
            Debug::SwLog("tail item was still queued", before);
            prev->next = nullptr;
         }
         else
         {
            Debug::SwLog("item was still queued", before);
            prev->next = curr->next;
         }
         return;
      }

      --i;
      prev = curr;
      curr = curr->next;
   }

   Debug::SwLog("item is still queued", INT16_MAX - i);
}

Q1Link 禁止复制,这可能会通过复制链接来损坏队列。因为可队列对象定义了一个 Q1Link 成员,这使得对象也无法复制。如果它*必须*支持复制,可以通过调用其析构函数(以确保项未入队)然后将 `next` 初始化为 `nullptr` 来实现 Q1Link 的复制构造函数和复制运算符,这样项的新实例就可以保持未入队状态。

现在我们可以开始查看 Q1Way 模板本身了。请注意,它指向队列的*尾部*元素,这使得入队和出队操作都高效。入队的元素实际上形成一个循环队列。

//  One-way queue.  Recommended unless items are often exqueued, which
//  can be expensive.
//  o no items: tail_.next = nullptr
//  o one item: tail_.next = item, item.next = item (points to itself)
//  o two or more items: tail_.next = last item, last item.next = first
//    item, second last item.next = last item (circular queue)
//
template<class T> class Q1Way
{
public:
   //  Initializes the queue header to default values.  Before the queue
   //  can be used, Init must be invoked.
   //
   Q1Way() : tail_(nullptr), diff_(NilDiff) { }

   //  Cleans up the queue.
   //
   ~Q1Way()
   {
      if(tail_.next == nullptr) return;
      Purge();
   }

   //  Deleted to prohibit copying.
   //
   Q1Way(const Q1Way& that) = delete;
   Q1Way& operator=(const Q1Way& that) = delete;

   //  Initializes the queue so that it can be used.
   //
   void Init(ptrdiff_t diff)
   {
      tail_.next = nullptr;  // queue is empty
      diff_ = diff;          // distance from each item's vptr to its Q1Link
   }
private:
   //  For initializing diff_.
   //
   static const ptrdiff_t NilDiff = -1;

   //  The queue head, which actually points to the tail item.
   //  If the queue is empty, tail_.next is nullptr.
   //
   Q1Link tail_;

   //  The distance from an item's vptr to its Q1Link.
   //
   ptrdiff_t diff_;
};

为了操作队列链接,Q1Way 需要知道每个对象的 Q1Link 位于何处。这就是 `diff_` 的目的,它是从对象开始到其 `Q1Link` 成员的距离(以字节为单位)。队列中的所有对象必须使用相同的偏移量,因此如果不同类的对象要放在同一个队列中,就需要一个共同的基类。

`diff_` 的值是由一个丑陋的函数返回的,RSC 类总是将其命名为 `LinkDiff`。例如,RSC 的 `Thread` 类定义了一个 `msgq_` 成员,用于队列化发送给线程的消息。这些消息派生自 `MsgBuffer`,因此需要一个 `Q1Link` 成员和一个 `LinkDiff` 函数,以便 `Thread` 可以初始化 `msgq_`。然而,`MsgBuffer` 派生自 `Pooled`,它已经为所有 池化对象 提供了 `Q1Link` 成员和 `LinkDiff` 函数。因此,大纲(省略其他成员)如下所示:

class Pooled : public Object
{
public:
   static ptrdiff_t LinkDiff() const;
private:
   Q1Link link_;
};

ptrdiff_t Pooled ::LinkDiff()
{
   uintptr_t local;
   auto fake = reinterpret_cast<const Pooled*>(&local);
   return ptrdiff(&fake->link_, fake);
}

class Thread : public Permanent
{
private:
   Q1Way<MsgBuffer> msgq_;
};

msgq_.Init(Pooled::LinkDiff());  // in Thread's constructor

一旦调用了 `Init`,Q1Way 函数 `Item` 就可以找到每个元素的 `Q1Link` 的位置。此函数只需要在内部使用,因此是 `private` 的。

Q1Link* Item(const T& elem) const
{
   if(diff_ == NilDiff) return nullptr;
   if(&elem == nullptr) return nullptr;
   return (Q1Link*) getptr2(&elem, diff_);  // adds diff_ bytes to &elem
}

通常,一个项会被添加到队列的后端。

bool Enq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;        // error
   if(item->next != nullptr) return false;  // if item is queued, do nothing
   if(tail_.next != nullptr)                // if queue isn't empty
   {                                        // then
      item->next = tail_.next->next;        // item points to first element
      tail_.next->next = item;              // last element points to item
   }
   else
   {
      item->next = item;                    // else item points to itself
   }
   tail_.next = item;                       // tail points to item
   return true;
}

一个项也可以被添加到队列的前端。

bool Henq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;        // error
   if(item->next != nullptr) return false;  // if item is queued, do nothing
   if(tail_.next != nullptr)                // if queue isn't empty
   {
      item->next = tail_.next->next;        // item points to first element
      tail_.next->next = item;              // last element points to item
   }                                        // tail isn't changed, so item is
   else                                     // after last (and is thus first)
   {
      item->next = item;                    // item points to itself
      tail_.next = item;                    // tail points to item
   }
   return true;
}

一个项甚至可以被添加到队列的中间。

bool Insert(T* prev, T& elem)
{
   if(prev == nullptr)                        // if nothing is previous
      return Henq(elem);                      // then put item at head
   auto item = Item(elem);
   if(item == nullptr) return false;          // error
   auto ante = (Q1Link*)
      getptr2(prev, diff_);                   // put item after previous
   if(item->next != nullptr) return false;    // item must not be queued
   if(ante->next == nullptr) return false;    // prev must be queued
   item->next = ante->next;
   ante->next = item;
   if(tail_.next == ante) tail_.next = item;  // update tail if item is last
   return true;
}

通常,一个项会从队列的前端移除。

T* Deq()
{
   if(tail_.next == nullptr)          // if tail points to nothing
      return nullptr;                 // then queue is empty
   Q1Link* item = tail_.next->next;   // set item to first element
   if(tail_.next != item)             // if item wasn't alone
      tail_.next->next = item->next;  // then last now points to second
   else
      tail_.next = nullptr;           // else queue is now empty
   item->next = nullptr;              // item has been removed
   return (T*) getptr1(item, diff_);  // location of item's vptr (subtracts
                                      //   diff_ bytes from item)
}

如果需要,可以从队列的任何位置移除一个项。

bool Exq(T& elem)
{
   auto item = Item(elem);
   if(item == nullptr) return false;  // error
   if(item->next == nullptr)          // if the item isn't queued
      return true;                    // then return immediately
   if(tail_.next == nullptr)          // if the queue is empty
      return false;                   // then the item can't be there
   if(item->next == item)             // if the item points to itself
   {
      if(tail_.next == item)          // and if the item is also last
      {
         item->next = nullptr;        // then remove the item
         tail_.next = nullptr;        // and the queue is now empty
         return true;
      }
      return false;                   // the item is on another queue
   }
   auto curr = tail_.next;            // starting at the last element,
   while(curr->next != item)          // advance until the item is
   {
      curr = curr->next;              // the next element--but
      if(curr == tail_.next)          // stop after searching the
         return false;                // entire queue
   }
   curr->next = item->next;           // curr's next becomes item's next
   if(tail_.next == item)             // if the item was the tail
      tail_.next = curr;              // then the tail has to back up
   item->next = nullptr;              // the item has been removed
   return true;
}

函数 `First` 和 `Next` 支持队列遍历。它们只是使用指向当前项的指针,而不是单独的迭代器。遍历通常的形式是:

for(auto item = queue_.First(); item != nullptr; queue_.Next(item))…
T* First() const
{
   if(diff_ == NilDiff) return nullptr;  // queue is not initialized
   Q1Link* item = tail_.next;            // set item to first element
   if(item == nullptr) return nullptr;   // queue is empty
   item = item->next;                    // advance to head
   return (T*) getptr1(item, diff_);     // location of item's vptr
}

//  Updates ELEM to the next item in the queue.  If ELEM is nullptr,
//  provides the first item.  Returns true if there was a next item.
//
bool Next(T*& elem) const
{
   if(diff_ == NilDiff) return false;    // queue is not initialized
   Q1Link* item;                         // item will hold result
   if(elem == nullptr)                   // nullptr means
   {
      item = tail_.next;                 // start at last element
      if(item == nullptr) return false;  // return if the queue is empty
   }
   else
   {
      item = (Q1Link*)
         getptr2(elem, diff_);           // start at the current item
      if(tail_.next == item)             // if that is the last element
      {
         elem = nullptr;                 // then there are no more
         return false;
      }
   }
   item = item->next;                    // advance to next element
   if(item == nullptr)                   // make sure the item was queued
   {
      elem = nullptr;
      return false;
   }
   elem = (T*) getptr1(item, diff_);     // location of item's vptr
   return true;
}

//  Returns the item that follows ELEM.
//
T* Next(const T& elem) const
{
   auto item = Item(elem);
   if(item == nullptr) return nullptr;     // error
   if(tail_.next == item) return nullptr;  // return if item was last
   item = item->next;                      // advance to the next element
   if(item == nullptr) return nullptr;     // return if item wasn't queued
   return (T*) getptr1(item, diff_);       // location of next item's vptr
}

还有用于查找队列长度的常用函数。

bool Empty() const { return (tail_.next == nullptr); }

size_t Size() const
{
   if(diff_ == NilDiff) return 0;  // queue is not initialized
   Q1Link* item = tail_.next;      // start at the last item
   if(item == nullptr) return 0;   // check for an empty queue
   size_t count = 1;               // there is at least one element
   item = item->next;              // advance to the first element
   while(item != tail_.next)       // stop when we reach the tail
   {
      item = item->next;
      ++count;
   }
   return count;                   // report the result
}

最后,还有一个相当于 `clear` 的函数。

void Purge()
{
   while(tail_.next != nullptr)
   {
      auto item = Deq();
      delete item;
   }
}

如果您想修改这些模板,NtIncrement.cpp 包含 `Q1WayCommands` 和 `Q2WayCommands`,它们允许通过 RSC 的 CLI 来测试 `Q1Way` 和 `Q2Way` 函数。

关注点

提供了以下函数来测试试图在队列损坏后恢复的软件。

//  Corrupts ELEM's next pointer for testing purposes.  If ELEM is
//  nullptr, the queue's tail pointer is corrupted instead.
//
void Corrupt(T* elem)
{
   if(elem == nullptr)                     // if no item provided
   {
      tail_.next = (Q1Link*) BAD_POINTER;  // corrupt queue header
      return;
   }
   auto item = (Q1Link*)                   // start at the current item
      getptr2(elem, diff_);
   item->next = (Q1Link*) BAD_POINTER;     // corrupt ELEM's next pointer
}

如果对象池中可用块的队列损坏了,对象池审计如何修复它的描述 在此

历史

  • 2020 年 6 月 13 日:初始版本
© . All rights reserved.