有界阻塞队列(单锁)






4.57/5 (24投票s)
2004年8月18日
4分钟阅读

143009

1162
一个快速且灵活的有界阻塞队列。 非常适合生产者/消费者需求,例如网络队列和管道。
引言
本文包含一个 C# 实现的有界阻塞队列(即有界循环队列)。 一般来说,这是一个具有固定数量元素的队列(即有界),如果队列已满,将“阻塞”生产者;如果队列为空,将阻塞消费者。 这与标准的 .NET Queue
不同,因为 .NET 的 Queue
是无界的,并且不会阻塞。
有界阻塞队列对于生产者/消费者应用程序(如网络队列和管道)非常方便。 在生产者/消费者需求中,通常不希望队列无限增长,因为如果没有检查,它会占用所有可用内存。 此外,很多时候生产者和消费者是不平衡的,所以一个会超过另一个。 通常不希望生产者每秒生成 1000 个对象,而消费者每秒只能处理 100 个对象。 需要一种机制来限制生产者,以使消费者能够赶上(反之亦然)。 这种情况在网络应用程序中很容易看到,因为可以从网络获得比消费者可以处理的更多的包等等。 通常希望平衡这一点,以便在更多包排队之前允许消费者线程工作。 有界阻塞队列中的“阻塞”是指队列将阻塞生产者,直到有空闲槽可用,并在空闲槽可用时自动取消阻塞生产者。 相反,队列将阻塞空队列上的消费者,直到有对象可以出队。 线程调用它们各自的 Enqueue
或 Dequeue
方法,队列会自动处理阻塞和取消阻塞。
背景
我研究过许多阻塞队列的实现,例如单锁队列、双锁队列和用信号量锁定的队列。 我还把一个非阻塞队列的实现移植到了 C#。
一般来说,我发现单锁队列在性能、易于实现和正确性验证(同步原语和线程问题中最难的部分)之间取得了最佳平衡。 双锁队列将允许一个消费者和一个生产者在多 CPU 系统中同时工作。 但是,入队的对象仍然是两个线程之间的共享对象,因此需要另一个锁来添加内存屏障,以验证消费者是否正确“看到”了生成的对象。 可以在没有第三个锁的情况下执行此操作,但需要仔细考虑,并测试内存屏障问题等。 此外,添加第三个“共享”锁会消除双锁队列优于单锁队列的任何优势,并且实际上会更慢,因为每个操作现在都需要两个锁。 使用两个信号量进行元素计数也是同样的问题。 大多数信号量队列实现需要两个信号量(一个用于保护入队,一个用于出队)和一个第三个锁来保护中间的共享访问(例如,Sync-Sandwich)。 每个信号量在内部使用一个锁来同步自身,因此最终每个出队操作需要两个锁操作,每个入队操作需要两个锁操作。 非阻塞(即无锁)队列是完全不同的东西,需要*非常注意内存屏障和线程问题。 我已经在 C# 中实现了一个,但无法在 .NET 内存模型中验证其正确性,因此在我看到 CLR/.NET 中的更多信息之前,我不会推荐无锁队列。
代码
下面是单锁有界阻塞队列的完整列表。 每个方法应该在注释和代码本身之间进行大部分自我描述。 如果它们不一致,则应使用附加文件中的代码覆盖此列表。 如果需要更多说明或您有意见或问题,请 ping 我。
using System;
using System.Threading;
using System.Collections;
namespace Queues
{
/// <summary>
/// Author: William Stacey, MVP (staceyw@mvps.org)
/// Modified Date: 08/03/2004
/// One Lock Bounded Blocking Queue (e.g. Bounded Buffer).
/// This queue is internally synchronized (thread-safe)
/// and designed for one-many producers and one-many
/// consumer threads. This is ideal for pipelining
/// or other consumer/producer needs.
/// Fast and thread safe on single or multiple cpu machines.
///
/// Consumer thread(s) will block on Dequeue operations
/// until another thread performs a Enqueue
/// operation, at which point the first scheduled consumer
/// thread will be unblocked and get the
/// current object. Producer thread(s) will block
/// on Enqueue operations until another
/// consumer thread calls Dequeue to free a queue slot,
/// at which point the first scheduled producer
/// thread will be unblocked to finish its Enqueue operation.
/// No user code is needed to
/// handle this "ping-pong" between locking/unlocking
/// consumers and producers.
/// </summary>
public sealed class BlockingQueue : ICollection
{
#region Fields
// Buffer used to store queue objects with max "Size".
private object[] buffer;
// Current number of elements in the queue.
private int count;
// Max number of elements queue can hold without blocking.
private int size;
// Index of slot for object to remove on next Dequeue.
private int head;
// Index of slot for next Enqueue object.
private int tail;
// Object used to synchronize the queue.
private readonly object syncRoot;
#endregion
#region Constructors
/// <summary>
/// Create instance of Queue with Bounded number of elements.
/// After that many elements are used, another Enqueue
/// operation will "block" or wait until a Consumer calls
/// Dequeue to free a slot. Likewise, if the queue
/// is empty, a call to Dequeue will block until
/// another thread calls Enqueue.
/// </summary>
/// <param name="size"></param>
public BlockingQueue(int size)
{
if ( size < 1 )
throw new ArgumentOutOfRangeException("size must"
+ " be greater then zero.");
syncRoot = new object();
this.size = size;
buffer = new object[size];
count = 0;
head = 0;
tail = 0;
}
#endregion
#region Properties
/// <summary>
/// Gets the object values currently in the queue.
/// If queue is empty, this will return a zero length array.
/// The returned array length can be 0 to Size.
/// This method does not modify the queue,
/// but returns a shallow copy
/// of the queue buffer containing the objects
/// contained in the queue.
/// </summary>
public object[] Values
{
get
{
// Copy used elements to a new array
// of "count" size. Note a simple
// Buffer copy will not work as head
// could be anywhere and we want
// a zero based array.
object[] values;
lock(syncRoot)
{
values = new object[count];
int pos = head;
for(int i = 0; i < count; i++)
{
values[i] = buffer[pos];
pos = (pos + 1) % size;
}
}
return values;
}
}
#endregion
#region Public Methods
/// <summary>
/// Adds an object to the end of the queue.
/// If queue is full, this method will
/// block until another thread calls one
/// of the Dequeue methods. This method will wait
/// "Timeout.Infinite" until queue has a free slot.
/// </summary>
/// <param name="value"></param>
public void Enqueue(object value)
{
Enqueue(value, Timeout.Infinite);
}
/// <summary>
/// Adds an object to the end of the queue.
/// If queue is full, this method will
/// block until another thread calls one
/// of the Dequeue methods or millisecondsTimeout
/// expires. If timeout, method will throw QueueTimeoutException.
/// </summary>
/// <param name="value"></param>
public void Enqueue(object value, int millisecondsTimeout)
{
lock(syncRoot)
{
while(count == size)
{
try
{
if (!Monitor.Wait(syncRoot, millisecondsTimeout))
throw new QueueTimeoutException();
}
catch
{
// Monitor exited with exception.
// Could be owner thread of monitor
// object was terminated or timeout
// on wait. Pulse any/all waiting
// threads to ensure we don't get
// any "live locked" producers.
Monitor.PulseAll(syncRoot);
throw;
}
}
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if (count == 1) // Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
}
/// <summary>
/// Non-blocking version of Enqueue().
/// If Enqueue is successfull, this will
/// return true; otherwise false if queue is full.
/// </summary>
/// <param name="value"></param>
/// <returns>true if successfull,
/// otherwise false.</returns>
public bool TryEnqueue(object value)
{
lock(syncRoot)
{
if ( count == size )
return false;
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 )
// Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}
/// <summary>
/// Removes and returns the object
/// at the beginning of the Queue.
/// If queue is empty, method will block until
/// another thread calls one of
/// the Enqueue methods. This method will wait
/// "Timeout.Infinite" until another
/// thread Enqueues and object.
/// </summary>
/// <returns></returns>
public object Dequeue()
{
return Dequeue(Timeout.Infinite);
}
/// <summary>
/// Removes and returns the object
/// at the beginning of the Queue.
/// If queue is empty, method will block until
/// another thread calls one of
/// the Enqueue methods or millisecondsTimeout expires.
/// If timeout, method will throw QueueTimeoutException.
/// </summary>
/// <returns>The object that is removed from
/// the beginning of the Queue.</returns>
public object Dequeue(int millisecondsTimeout)
{
object value;
lock(syncRoot)
{
while(count == 0)
{
try
{
if (!Monitor.Wait(syncRoot, millisecondsTimeout))
throw new QueueTimeoutException();
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) )
// Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return value;
}
/// <summary>
/// Non-blocking version of Dequeue.
/// Will return false if queue is empty and set
/// value to null, otherwise will return true
/// and set value to the dequeued object.
/// </summary>
/// <param name="value">The object that is removed from
/// the beginning of the Queue or null if empty.</param>
/// <returns>true if successfull, otherwise false.</returns>
public bool TryDequeue(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) )
// Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}
/// <summary>
/// Returns the object at the beginning
/// of the queue without removing it.
/// </summary>
/// <returns>The object at the beginning
/// of the queue.</returns>
/// <remarks>
/// This method is similar to the Dequeue method,
/// but Peek does not modify the queue.
/// A null reference can be added to the Queue as a value.
/// To distinguish between a null value and the end of the queue,
/// check the Count property or
/// catch the InvalidOperationException,
/// which is thrown when the Queue is empty.
/// </remarks>
/// <exception cref="InvalidOpertionException">
/// The queue is empty.</exception>
public object Peek()
{
lock(syncRoot)
{
if (count == 0)
throw new InvalidOperationException("The Queue is empty.");
object value = buffer[head];
return value;
}
}
/// <summary>
/// Returns the object at the beginning
/// of the Queue without removing it.
/// Similar to the Peek method, however this method
/// will not throw exception if
/// queue is empty, but instead will return false.
/// </summary>
/// <param name="value">The object at the beginning
/// of the Queue or null if empty.</param>
/// <returns>The object at the beginning of the Queue.</returns>
public bool TryPeek(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
}
return true;
}
/// <summary>
/// Removes all objects from the Queue.
/// </summary>
/// <remarks>
/// Count is set to zero. Size does not change.
/// </remarks>
public void Clear()
{
lock(syncRoot)
{
count = 0;
head = 0;
tail = 0;
for(int i = 0; i < buffer.Length; i++)
{
buffer[i] = null;
}
}
}
#endregion
#region ICollection Members
/// <summary>
/// Gets a value indicating whether access
/// to the Queue is synchronized (thread-safe).
/// </summary>
public bool IsSynchronized
{
get { return true; }
}
/// <summary>
/// Returns the max elements allowed
/// in the queue before blocking Enqueue
/// operations. This is the size set in the constructor.
/// </summary>
public int Size
{
get { return this.size; }
}
/// <summary>
/// Gets the number of elements contained in the Queue.
/// </summary>
public int Count
{
get { lock(syncRoot) { return count; } }
}
/// <summary>
/// Copies the Queue elements to an existing one-dimensional Array,
/// starting at the specified array index.
/// </summary>
/// <param name="array">
/// The one-dimensional Array that is the destination
/// of the elements copied from Queue.
/// The Array must have zero-based indexing.</param>
/// <param name="index">The zero-based index
/// in array at which copying begins.</param>
public void CopyTo(Array array, int index)
{
object[] tmpArray = Values;
tmpArray.CopyTo(array, index);
}
/// <summary>
/// Gets an object that can be used to synchronize
/// access to the Queue.
/// </summary>
public object SyncRoot
{
get { return this.syncRoot; }
}
#endregion
#region IEnumerable Members
/// <summary>
/// GetEnumerator not implemented. You can't enumerate
/// the active queue as you would an array as it is dynamic
/// with active gets and puts. You could
/// if you locked it first and unlocked after
/// enumeration, but that does not work well for GetEnumerator.
/// The recommended method is to Get Values
/// and enumerate the returned array copy.
/// That way the queue is locked for
/// only a short time and a copy returned
/// so that can be safely enumerated using
/// the array's enumerator. You could also
/// create a custom enumerator that would
/// dequeue the objects until empty queue,
/// but that is a custom need.
/// </summary>
/// <returns></returns>
public IEnumerator GetEnumerator()
{
throw new NotImplementedException("Not Implemented.");
}
#endregion
} // End BlockingQueue
public class QueueTimeoutException : Exception
{
public QueueTimeoutException() : base("Queue method timed out on wait.")
{
}
}
}
关注点
zip 文件还包含一个测试类 BlockingQueueTest.cs,它允许您使用多个线程测试队列。 这是一个使用两个队列、一个 Listener 对象和一个 Server 对象的简单网络模拟。 Listener 模拟从网络接收到的数据包并将对象放入其输出队列。 该队列用作 Server 对象的输入队列; Server 对象从队列中取出对象、处理它们,并将它们放入第二个队列(即 Server 的输出队列)。 Server 的输出队列也是 Listener 的输入队列,因为 Listener 兼作侦听器和发送器。
历史
- 2004 年 8 月 17 日 – V1 - 首次发布。