健壮的 C++:队列模板
实现不管理内存的可损坏队列
引言
如果您正在使用 STL 并需要队列,您可能会选择 `std::deque` 或 `std::list`。然而,对于某些系统来说,使用这些模板可能会有问题。
- 在有许多短队列的系统中,它们的内存开销可能很大。
- 它们会分配和管理内存,这可能与硬实时系统需要提供可预测响应时间的需求相冲突。
- 它们隐藏了队列数据结构,使得健壮的系统难以从由于不安全的临界区或内存损坏引起的队列损坏中恢复。
本文介绍了一些模板,当这些问题使得使用 STL 模板不理想时,可以使用这些模板。
背景
本文中的模板来自 Robust Services Core (RSC),这是一个用于开发健壮 C++ 应用程序的开源框架。这些模板是:
Q1Way
,用于单向队列;Q2Way
,用于双向队列。
由于它们的实现类似,本文只介绍 Q1Way
,因为它使用更频繁。Q2Way
仅用于性能原因,当元素需要经常从相对较长的队列的中间移除时。Q2Way
还支持反向迭代,但应用程序很少需要此功能。
Using the Code
代码已编辑,删除了与模板非核心相关的内容。这些内容出现在附加的 *.zip 文件中的完整代码版本中,如果您不使用 RSC,可以删除它们。
设计的核心是,一个可队列对象定义一个 Q1Link
成员,该成员提供指向队列中下一个对象的链接。因此,模板不需要为队列中的元素分配内存。
这是 Q1Link
的代码,除了它的析构函数。
// Link for an item on a one-way queue. An object that resides on a one-way
// queue includes this as a member and implements a LinkDiff function that
// returns the distance between the top of the object and its Q1Link member.
//
class Q1Link
{
template<class T> friend class Q1Way;
public:
// Public because an instance of this class is included in objects that
// can be queued.
//
Q1Link() : next(nullptr) { }
// Public because an instance of this class is included in objects that
// can be queued.
//
~Q1Link();
// Deleted to prohibit copying.
//
Q1Link(const Q1Link& that) = delete;
Q1Link& operator=(const Q1Link& that) = delete;
// Returns true if the item is on a queue.
//
bool IsQueued() const { return next != nullptr; }
// Returns a string for displaying the link.
//
std::string to_str() const
{
std::ostringstream stream;
stream << next;
return stream.str();
}
private:
// The next item in the queue. Because Q1Way uses circular queues, a
// value of nullptr means that the item is not on a queue.
//
Q1Link* next;
};
对象在被删除之前应该被从队列中移除。但如果它仍然在队列中,Q1Link
的析构函数会移除它,以防止损坏的链接损坏队列并使后续对象成为孤儿。
Q1Link::~Q1Link()
{
// If the item is still queued, exqueue it. This is a serious problem if
// it is the tail item, because it will leave the queue head pointing to
// a deleted item.
//
if(next == nullptr) return;
auto prev = this;
auto curr = next;
auto i = INT16_MAX;
while((i >= 0) && (curr != nullptr))
{
if(curr == this)
{
auto before = INT16_MAX - i;
if(before == 0)
{
Debug::SwLog("tail item was still queued", before);
prev->next = nullptr;
}
else
{
Debug::SwLog("item was still queued", before);
prev->next = curr->next;
}
return;
}
--i;
prev = curr;
curr = curr->next;
}
Debug::SwLog("item is still queued", INT16_MAX - i);
}
Q1Link
禁止复制,这可能会通过复制链接来损坏队列。因为可队列对象定义了一个 Q1Link
成员,这使得对象也无法复制。如果它*必须*支持复制,可以通过调用其析构函数(以确保项未入队)然后将 `next` 初始化为 `nullptr` 来实现 Q1Link
的复制构造函数和复制运算符,这样项的新实例就可以保持未入队状态。
现在我们可以开始查看 Q1Way
模板本身了。请注意,它指向队列的*尾部*元素,这使得入队和出队操作都高效。入队的元素实际上形成一个循环队列。
// One-way queue. Recommended unless items are often exqueued, which
// can be expensive.
// o no items: tail_.next = nullptr
// o one item: tail_.next = item, item.next = item (points to itself)
// o two or more items: tail_.next = last item, last item.next = first
// item, second last item.next = last item (circular queue)
//
template<class T> class Q1Way
{
public:
// Initializes the queue header to default values. Before the queue
// can be used, Init must be invoked.
//
Q1Way() : tail_(nullptr), diff_(NilDiff) { }
// Cleans up the queue.
//
~Q1Way()
{
if(tail_.next == nullptr) return;
Purge();
}
// Deleted to prohibit copying.
//
Q1Way(const Q1Way& that) = delete;
Q1Way& operator=(const Q1Way& that) = delete;
// Initializes the queue so that it can be used.
//
void Init(ptrdiff_t diff)
{
tail_.next = nullptr; // queue is empty
diff_ = diff; // distance from each item's vptr to its Q1Link
}
private:
// For initializing diff_.
//
static const ptrdiff_t NilDiff = -1;
// The queue head, which actually points to the tail item.
// If the queue is empty, tail_.next is nullptr.
//
Q1Link tail_;
// The distance from an item's vptr to its Q1Link.
//
ptrdiff_t diff_;
};
为了操作队列链接,Q1Way
需要知道每个对象的 Q1Link
位于何处。这就是 `diff_` 的目的,它是从对象开始到其 `Q1Link` 成员的距离(以字节为单位)。队列中的所有对象必须使用相同的偏移量,因此如果不同类的对象要放在同一个队列中,就需要一个共同的基类。
`diff_` 的值是由一个丑陋的函数返回的,RSC 类总是将其命名为 `LinkDiff`。例如,RSC 的 `Thread` 类定义了一个 `msgq_` 成员,用于队列化发送给线程的消息。这些消息派生自 `MsgBuffer`,因此需要一个 `Q1Link` 成员和一个 `LinkDiff` 函数,以便 `Thread` 可以初始化 `msgq_`。然而,`MsgBuffer` 派生自 `Pooled`,它已经为所有 池化对象 提供了 `Q1Link` 成员和 `LinkDiff` 函数。因此,大纲(省略其他成员)如下所示:
class Pooled : public Object
{
public:
static ptrdiff_t LinkDiff() const;
private:
Q1Link link_;
};
ptrdiff_t Pooled ::LinkDiff()
{
uintptr_t local;
auto fake = reinterpret_cast<const Pooled*>(&local);
return ptrdiff(&fake->link_, fake);
}
class Thread : public Permanent
{
private:
Q1Way<MsgBuffer> msgq_;
};
msgq_.Init(Pooled::LinkDiff()); // in Thread's constructor
一旦调用了 `Init`,Q1Way
函数 `Item` 就可以找到每个元素的 `Q1Link` 的位置。此函数只需要在内部使用,因此是 `private` 的。
Q1Link* Item(const T& elem) const
{
if(diff_ == NilDiff) return nullptr;
if(&elem == nullptr) return nullptr;
return (Q1Link*) getptr2(&elem, diff_); // adds diff_ bytes to &elem
}
通常,一个项会被添加到队列的后端。
bool Enq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next != nullptr) return false; // if item is queued, do nothing
if(tail_.next != nullptr) // if queue isn't empty
{ // then
item->next = tail_.next->next; // item points to first element
tail_.next->next = item; // last element points to item
}
else
{
item->next = item; // else item points to itself
}
tail_.next = item; // tail points to item
return true;
}
一个项也可以被添加到队列的前端。
bool Henq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next != nullptr) return false; // if item is queued, do nothing
if(tail_.next != nullptr) // if queue isn't empty
{
item->next = tail_.next->next; // item points to first element
tail_.next->next = item; // last element points to item
} // tail isn't changed, so item is
else // after last (and is thus first)
{
item->next = item; // item points to itself
tail_.next = item; // tail points to item
}
return true;
}
一个项甚至可以被添加到队列的中间。
bool Insert(T* prev, T& elem)
{
if(prev == nullptr) // if nothing is previous
return Henq(elem); // then put item at head
auto item = Item(elem);
if(item == nullptr) return false; // error
auto ante = (Q1Link*)
getptr2(prev, diff_); // put item after previous
if(item->next != nullptr) return false; // item must not be queued
if(ante->next == nullptr) return false; // prev must be queued
item->next = ante->next;
ante->next = item;
if(tail_.next == ante) tail_.next = item; // update tail if item is last
return true;
}
通常,一个项会从队列的前端移除。
T* Deq()
{
if(tail_.next == nullptr) // if tail points to nothing
return nullptr; // then queue is empty
Q1Link* item = tail_.next->next; // set item to first element
if(tail_.next != item) // if item wasn't alone
tail_.next->next = item->next; // then last now points to second
else
tail_.next = nullptr; // else queue is now empty
item->next = nullptr; // item has been removed
return (T*) getptr1(item, diff_); // location of item's vptr (subtracts
// diff_ bytes from item)
}
如果需要,可以从队列的任何位置移除一个项。
bool Exq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next == nullptr) // if the item isn't queued
return true; // then return immediately
if(tail_.next == nullptr) // if the queue is empty
return false; // then the item can't be there
if(item->next == item) // if the item points to itself
{
if(tail_.next == item) // and if the item is also last
{
item->next = nullptr; // then remove the item
tail_.next = nullptr; // and the queue is now empty
return true;
}
return false; // the item is on another queue
}
auto curr = tail_.next; // starting at the last element,
while(curr->next != item) // advance until the item is
{
curr = curr->next; // the next element--but
if(curr == tail_.next) // stop after searching the
return false; // entire queue
}
curr->next = item->next; // curr's next becomes item's next
if(tail_.next == item) // if the item was the tail
tail_.next = curr; // then the tail has to back up
item->next = nullptr; // the item has been removed
return true;
}
函数 `First` 和 `Next` 支持队列遍历。它们只是使用指向当前项的指针,而不是单独的迭代器。遍历通常的形式是:
for(auto item = queue_.First(); item != nullptr; queue_.Next(item))…
T* First() const
{
if(diff_ == NilDiff) return nullptr; // queue is not initialized
Q1Link* item = tail_.next; // set item to first element
if(item == nullptr) return nullptr; // queue is empty
item = item->next; // advance to head
return (T*) getptr1(item, diff_); // location of item's vptr
}
// Updates ELEM to the next item in the queue. If ELEM is nullptr,
// provides the first item. Returns true if there was a next item.
//
bool Next(T*& elem) const
{
if(diff_ == NilDiff) return false; // queue is not initialized
Q1Link* item; // item will hold result
if(elem == nullptr) // nullptr means
{
item = tail_.next; // start at last element
if(item == nullptr) return false; // return if the queue is empty
}
else
{
item = (Q1Link*)
getptr2(elem, diff_); // start at the current item
if(tail_.next == item) // if that is the last element
{
elem = nullptr; // then there are no more
return false;
}
}
item = item->next; // advance to next element
if(item == nullptr) // make sure the item was queued
{
elem = nullptr;
return false;
}
elem = (T*) getptr1(item, diff_); // location of item's vptr
return true;
}
// Returns the item that follows ELEM.
//
T* Next(const T& elem) const
{
auto item = Item(elem);
if(item == nullptr) return nullptr; // error
if(tail_.next == item) return nullptr; // return if item was last
item = item->next; // advance to the next element
if(item == nullptr) return nullptr; // return if item wasn't queued
return (T*) getptr1(item, diff_); // location of next item's vptr
}
还有用于查找队列长度的常用函数。
bool Empty() const { return (tail_.next == nullptr); }
size_t Size() const
{
if(diff_ == NilDiff) return 0; // queue is not initialized
Q1Link* item = tail_.next; // start at the last item
if(item == nullptr) return 0; // check for an empty queue
size_t count = 1; // there is at least one element
item = item->next; // advance to the first element
while(item != tail_.next) // stop when we reach the tail
{
item = item->next;
++count;
}
return count; // report the result
}
最后,还有一个相当于 `clear` 的函数。
void Purge()
{
while(tail_.next != nullptr)
{
auto item = Deq();
delete item;
}
}
如果您想修改这些模板,NtIncrement.cpp 包含 `Q1WayCommands` 和 `Q2WayCommands`,它们允许通过 RSC 的 CLI 来测试 `Q1Way` 和 `Q2Way` 函数。
关注点
提供了以下函数来测试试图在队列损坏后恢复的软件。
// Corrupts ELEM's next pointer for testing purposes. If ELEM is
// nullptr, the queue's tail pointer is corrupted instead.
//
void Corrupt(T* elem)
{
if(elem == nullptr) // if no item provided
{
tail_.next = (Q1Link*) BAD_POINTER; // corrupt queue header
return;
}
auto item = (Q1Link*) // start at the current item
getptr2(elem, diff_);
item->next = (Q1Link*) BAD_POINTER; // corrupt ELEM's next pointer
}
如果对象池中可用块的队列损坏了,对象池审计如何修复它的描述 在此。
历史
- 2020 年 6 月 13 日:初始版本