65.9K
CodeProject 正在变化。 阅读更多。
Home

有界阻塞队列(单锁)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.57/5 (24投票s)

2004年8月18日

4分钟阅读

viewsIcon

143009

downloadIcon

1162

一个快速且灵活的有界阻塞队列。 非常适合生产者/消费者需求,例如网络队列和管道。

引言

本文包含一个 C# 实现的有界阻塞队列(即有界循环队列)。 一般来说,这是一个具有固定数量元素的队列(即有界),如果队列已满,将“阻塞”生产者;如果队列为空,将阻塞消费者。 这与标准的 .NET Queue 不同,因为 .NET 的 Queue 是无界的,并且不会阻塞。

有界阻塞队列对于生产者/消费者应用程序(如网络队列和管道)非常方便。 在生产者/消费者需求中,通常不希望队列无限增长,因为如果没有检查,它会占用所有可用内存。 此外,很多时候生产者和消费者是不平衡的,所以一个会超过另一个。 通常不希望生产者每秒生成 1000 个对象,而消费者每秒只能处理 100 个对象。 需要一种机制来限制生产者,以使消费者能够赶上(反之亦然)。 这种情况在网络应用程序中很容易看到,因为可以从网络获得比消费者可以处理的更多的包等等。 通常希望平衡这一点,以便在更多包排队之前允许消费者线程工作。 有界阻塞队列中的“阻塞”是指队列将阻塞生产者,直到有空闲槽可用,并在空闲槽可用时自动取消阻塞生产者。 相反,队列将阻塞空队列上的消费者,直到有对象可以出队。 线程调用它们各自的 EnqueueDequeue 方法,队列会自动处理阻塞和取消阻塞。

背景

我研究过许多阻塞队列的实现,例如单锁队列、双锁队列和用信号量锁定的队列。 我还把一个非阻塞队列的实现移植到了 C#。

一般来说,我发现单锁队列在性能、易于实现和正确性验证(同步原语和线程问题中最难的部分)之间取得了最佳平衡。 双锁队列将允许一个消费者和一个生产者在多 CPU 系统中同时工作。 但是,入队的对象仍然是两个线程之间的共享对象,因此需要另一个锁来添加内存屏障,以验证消费者是否正确“看到”了生成的对象。 可以在没有第三个锁的情况下执行此操作,但需要仔细考虑,并测试内存屏障问题等。 此外,添加第三个“共享”锁会消除双锁队列优于单锁队列的任何优势,并且实际上会更慢,因为每个操作现在都需要两个锁。 使用两个信号量进行元素计数也是同样的问题。 大多数信号量队列实现需要两个信号量(一个用于保护入队,一个用于出队)和一个第三个锁来保护中间的共享访问(例如,Sync-Sandwich)。 每个信号量在内部使用一个锁来同步自身,因此最终每个出队操作需要两个锁操作,每个入队操作需要两个锁操作。 非阻塞(即无锁)队列是完全不同的东西,需要*非常注意内存屏障和线程问题。 我已经在 C# 中实现了一个,但无法在 .NET 内存模型中验证其正确性,因此在我看到 CLR/.NET 中的更多信息之前,我不会推荐无锁队列。

代码

下面是单锁有界阻塞队列的完整列表。 每个方法应该在注释和代码本身之间进行大部分自我描述。 如果它们不一致,则应使用附加文件中的代码覆盖此列表。 如果需要更多说明或您有意见或问题,请 ping 我。

using System;
using System.Threading;
using System.Collections;

namespace Queues
{
    /// <summary>
    /// Author: William Stacey, MVP (staceyw@mvps.org)
    /// Modified Date: 08/03/2004
    /// One Lock Bounded Blocking Queue (e.g. Bounded Buffer).
    /// This queue is internally synchronized (thread-safe)
    /// and designed for one-many producers and one-many
    /// consumer threads.  This is ideal for pipelining
    /// or other consumer/producer needs.
    /// Fast and thread safe on single or multiple cpu machines.
    /// 
    /// Consumer thread(s) will block on Dequeue operations
    /// until another thread performs a Enqueue
    /// operation, at which point the first scheduled consumer
    /// thread will be unblocked and get the
    /// current object.  Producer thread(s) will block
    /// on Enqueue operations until another
    /// consumer thread calls Dequeue to free a queue slot,
    /// at which point the first scheduled producer
    /// thread will be unblocked to finish its Enqueue operation.
    /// No user code is needed to
    /// handle this "ping-pong" between locking/unlocking
    /// consumers and producers. 
    /// </summary>
    public sealed class BlockingQueue : ICollection
    {
        #region Fields
        // Buffer used to store queue objects with max "Size".
        private object[] buffer;
        // Current number of elements in the queue.
        private int count;
        // Max number of elements queue can hold without blocking.
        private int size;
        // Index of slot for object to remove on next Dequeue.
        private int head;
        // Index of slot for next Enqueue object.
        private int tail;
        // Object used to synchronize the queue.
        private readonly object syncRoot;
        #endregion

        #region Constructors
        /// <summary>
        /// Create instance of Queue with Bounded number of elements.
        /// After that many elements are used, another Enqueue
        /// operation will "block" or wait until a Consumer calls
        /// Dequeue to free a slot.  Likewise, if the queue
        /// is empty, a call to Dequeue will block until
        /// another thread calls Enqueue.
        /// </summary>
        /// <param name="size"></param>
        public BlockingQueue(int size)
        {
            if ( size < 1 )
                throw new ArgumentOutOfRangeException("size must" 
                                     + " be greater then zero.");
            syncRoot = new object();
            this.size = size;
            buffer = new object[size];
            count = 0;
            head = 0;
            tail = 0;
        }
        #endregion

        #region Properties
        /// <summary>
        /// Gets the object values currently in the queue.
        /// If queue is empty, this will return a zero length array.
        /// The returned array length can be 0 to Size.
        /// This method does not modify the queue,
        /// but returns a shallow copy
        /// of the queue buffer containing the objects
        /// contained in the queue.
        /// </summary>
        public object[] Values
        {
            get
            {
                // Copy used elements to a new array
                // of "count" size.  Note a simple
                // Buffer copy will not work as head
                // could be anywhere and we want
                // a zero based array.
                object[] values;
                lock(syncRoot)
                {
                    values = new object[count];
                    int pos = head;
                    for(int i = 0; i < count; i++)
                    {
                        values[i] = buffer[pos];
                        pos = (pos + 1) % size;
                    }
                }
                return values;
            }
        }

        #endregion

        #region Public Methods

        /// <summary>
        /// Adds an object to the end of the queue.
        /// If queue is full, this method will
        /// block until another thread calls one
        /// of the Dequeue methods.  This method will wait
        /// "Timeout.Infinite" until queue has a free slot.
        /// </summary>
        /// <param name="value"></param>
        public void Enqueue(object value)
        {
            Enqueue(value, Timeout.Infinite);
        }

        /// <summary>
        /// Adds an object to the end of the queue.
        /// If queue is full, this method will
        /// block until another thread calls one
        /// of the Dequeue methods or millisecondsTimeout
        /// expires.  If timeout, method will throw QueueTimeoutException.
        /// </summary>
        /// <param name="value"></param>
        public void Enqueue(object value, int millisecondsTimeout)
        {
            lock(syncRoot)
            {
                while(count == size)
                {
                    try
                    {
                        if (!Monitor.Wait(syncRoot, millisecondsTimeout))
                            throw new QueueTimeoutException();
                    }
                    catch
                    {
                        // Monitor exited with exception.
                        // Could be owner thread of monitor
                        // object was terminated or timeout
                        // on wait. Pulse any/all waiting
                        // threads to ensure we don't get
                        // any "live locked" producers.
                        Monitor.PulseAll(syncRoot);
                        throw;
                    }
                }
                buffer[tail] = value;
                tail = (tail + 1) % size;
                count++;
                if (count == 1) // Could have blocking Dequeue thread(s).
                    Monitor.PulseAll(syncRoot);
            }
        }

        /// <summary>
        /// Non-blocking version of Enqueue().
        /// If Enqueue is successfull, this will
        /// return true; otherwise false if queue is full.
        /// </summary>
        /// <param name="value"></param>
        /// <returns>true if successfull,
        /// otherwise false.</returns>
        public bool TryEnqueue(object value)
        {
            lock(syncRoot)
            {
                if ( count == size )
                    return false;
                buffer[tail] = value;
                tail = (tail + 1) % size;
                count++;
                if ( count == 1 ) 
                // Could have blocking Dequeue thread(s).
                    Monitor.PulseAll(syncRoot);
            }
            return true;
        }

        /// <summary>
        /// Removes and returns the object
        /// at the beginning of the Queue.
        /// If queue is empty, method will block until
        /// another thread calls one of
        /// the Enqueue methods. This method will wait
        /// "Timeout.Infinite" until another
        /// thread Enqueues and object.
        /// </summary>
        /// <returns></returns>
        public object Dequeue()
        {
            return Dequeue(Timeout.Infinite);
        }

        /// <summary>
        /// Removes and returns the object
        /// at the beginning of the Queue.
        /// If queue is empty, method will block until
        /// another thread calls one of
        /// the Enqueue methods or millisecondsTimeout expires.
        /// If timeout, method will throw QueueTimeoutException.
        /// </summary>
        /// <returns>The object that is removed from
        /// the beginning of the Queue.</returns>
        public object Dequeue(int millisecondsTimeout)
        {
            object value;
            lock(syncRoot)
            {
                while(count == 0)
                {
                    try
                    {
                        if (!Monitor.Wait(syncRoot, millisecondsTimeout))
                            throw new QueueTimeoutException();
                    }
                    catch
                    {
                        Monitor.PulseAll(syncRoot);
                        throw;
                    }
                }
                value = buffer[head];
                buffer[head] = null;
                head = (head + 1) % size;
                count--;
                if ( count == (size - 1) )
                // Could have blocking Enqueue thread(s).
                    Monitor.PulseAll(syncRoot);
            }
            return value;
        }

        /// <summary>
        /// Non-blocking version of Dequeue.
        /// Will return false if queue is empty and set
        /// value to null, otherwise will return true
        /// and set value to the dequeued object.
        /// </summary>
        /// <param name="value">The object that is removed from
        ///     the beginning of the Queue or null if empty.</param>
        /// <returns>true if successfull, otherwise false.</returns>
        public bool TryDequeue(out object value)
        {
            lock(syncRoot)
            {
                if ( count == 0 )
                {
                    value = null;
                    return false;
                }
                value = buffer[head];
                buffer[head] = null;
                head = (head + 1) % size;
                count--;
                if ( count == (size - 1) )
                // Could have blocking Enqueue thread(s).
                    Monitor.PulseAll(syncRoot);
            }
            return true;
        }

        /// <summary>
        /// Returns the object at the beginning
        /// of the queue without removing it.
        /// </summary>
        /// <returns>The object at the beginning
        /// of the queue.</returns>
        /// <remarks>
        /// This method is similar to the Dequeue method,
        /// but Peek does not modify the queue. 
        /// A null reference can be added to the Queue as a value. 
        /// To distinguish between a null value and the end of the queue,
        /// check the Count property or
        /// catch the InvalidOperationException,
        /// which is thrown when the Queue is empty.
        /// </remarks>
        /// <exception cref="InvalidOpertionException">
        ///                The queue is empty.</exception>
        public object Peek()
        {
            lock(syncRoot)
            {
                if (count == 0)
                    throw new InvalidOperationException("The Queue is empty.");
            
                object value = buffer[head];
                return value;
            }
        }

        /// <summary>
        /// Returns the object at the beginning
        /// of the Queue without removing it.
        /// Similar to the Peek method, however this method
        /// will not throw exception if
        /// queue is empty, but instead will return false.
        /// </summary>
        /// <param name="value">The object at the beginning
        ///          of the Queue or null if empty.</param>
        /// <returns>The object at the beginning of the Queue.</returns>
        public bool TryPeek(out object value)
        {
            lock(syncRoot)
            {
                if ( count == 0 )
                {
                    value = null;
                    return false;
                }
                value = buffer[head];
            }
            return true;
        }

        /// <summary>
        /// Removes all objects from the Queue.
        /// </summary>
        /// <remarks>
        /// Count is set to zero. Size does not change.
        /// </remarks>
        public void Clear()
        {
            lock(syncRoot)
            {
                count = 0;
                head = 0;
                tail = 0;
                for(int i = 0; i < buffer.Length; i++)
                {
                    buffer[i] = null;
                }
            }
        }

        #endregion

        #region ICollection Members
        /// <summary>
        /// Gets a value indicating whether access
        /// to the Queue is synchronized (thread-safe).
        /// </summary>
        public bool IsSynchronized
        {
            get    { return true; }
        }

        /// <summary>
        /// Returns the max elements allowed
        /// in the queue before blocking Enqueue
        /// operations.  This is the size set in the constructor.
        /// </summary>
        public int Size
        {
            get { return this.size;    }
        }

        /// <summary>
        /// Gets the number of elements contained in the Queue.
        /// </summary>
        public int Count
        {
            get    { lock(syncRoot) { return count; } }
        }

        /// <summary>
        /// Copies the Queue elements to an existing one-dimensional Array,
        /// starting at the specified array index.
        /// </summary>
        /// <param name="array">
              /// The one-dimensional Array that is the destination
              /// of the elements copied from Queue.
              /// The Array must have zero-based indexing.</param>
        /// <param name="index">The zero-based index
        ///     in array at which copying begins.</param>
        public void CopyTo(Array array, int index)
        {
            object[] tmpArray = Values;
            tmpArray.CopyTo(array, index);
        }

        /// <summary>
        /// Gets an object that can be used to synchronize
        /// access to the Queue.
        /// </summary>
        public object SyncRoot
        {
            get    { return this.syncRoot; }
        }
        #endregion

        #region IEnumerable Members
        /// <summary>
        /// GetEnumerator not implemented. You can't enumerate
        /// the active queue as you would an array as it is dynamic
        /// with active gets and puts. You could
        /// if you locked it first and unlocked after
        /// enumeration, but that does not work well for GetEnumerator.
        /// The recommended method is to Get Values
        /// and enumerate the returned array copy.
        /// That way the queue is locked for
        /// only a short time and a copy returned
        /// so that can be safely enumerated using
        /// the array's enumerator. You could also
        /// create a custom enumerator that would
        /// dequeue the objects until empty queue,
        /// but that is a custom need. 
        /// </summary>
        /// <returns></returns>
        public IEnumerator GetEnumerator()
        {
            throw new NotImplementedException("Not Implemented.");
        }
        #endregion
    } // End BlockingQueue

    public class QueueTimeoutException : Exception
    {
        public QueueTimeoutException() : base("Queue method timed out on wait.")
        {
        }
    }
}

关注点

zip 文件还包含一个测试类 BlockingQueueTest.cs,它允许您使用多个线程测试队列。 这是一个使用两个队列、一个 Listener 对象和一个 Server 对象的简单网络模拟。 Listener 模拟从网络接收到的数据包并将对象放入其输出队列。 该队列用作 Server 对象的输入队列; Server 对象从队列中取出对象、处理它们,并将它们放入第二个队列(即 Server 的输出队列)。 Server 的输出队列也是 Listener 的输入队列,因为 Listener 兼作侦听器和发送器。

历史

  • 2004 年 8 月 17 日 – V1 - 首次发布。
© . All rights reserved.