65.9K
CodeProject 正在变化。 阅读更多。
Home

MPSC 无锁侵入式链表队列(带状态)

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2015年2月1日

CPOL

3分钟阅读

viewsIcon

22619

downloadIcon

97

MPSC 无锁侵入式链表队列(带状态)

引言

随着可用核心数量的增加,线程间通信 (ITC) 使用队列已成为一种常见模式。 一系列问题和硬件限制为队列实现提出了要求。 目前我们可以描述以下常见的 ITC 队列模式

  • 单生产者 - 单消费者队列 (SPSC)
  • 多生产者 - 单消费者队列 (MPSC)
  • 多生产者 - 多消费者队列 (MPMC)

自 Java 1.5 起,我们就有一些原生队列(甚至无锁队列),但它们都没有提供一种实现严格使用方式的机会。 这意味着什么?

背景

假设我们有一个事件生产者、事件消费者以及一个队列作为它们之间的传输媒介。 通常,生产者和消费者是线程。 我们的目标是避免任何无用的工作。 因此,消费者端的严格实现看起来像(伪代码)

while (1)
{
    if (queue.isEmpty())
        wait_not_empty();
    Item item = queue.get();
    process(item);
}        

而生产者端的严格实现将是

queue.put(item);
if (queue_was_empty) // and only if
    wakeup_consumer_thread();

所有 Java 原生队列和大多数开源项目(仅实现了 java.util.Queue 接口)都无法正确实现这一点。 使用 java.util.Queue 的生产者可能的代码将是

boolean wasEmpty = queue.isEmpty();
if (queue.offer(item) && wasEmpty)
    wakeup_consumer_thread();

但它仅在单生产者情况下才能正常工作。 对于多生产者情况,我们需要队列上的原子添加项目并检查空操作。

// Return true if queue was empty
boolean put( Item item );

因此,具有上述要求的 MPSC 队列接口将如下所示:队列项目获取器显然应该类似:它应该返回 2 个值(下一个可用项目和队列的状态,即是否为空)。 Java 没有一种好的方法从方法中返回 2 个值,所以让我们使用 2 步获取

Item get();
Item get_next();

第一个方法应该在确定队列不为空时调用,第二个方法删除队列中的第一个项目并返回下一个项目(如果删除时队列变为空,则返回 null),因此消费者端的实现将如下所示

while (1)
{
    sleep();
    // now we defenitely know the queue is not empty
    Item item = queue.get();
    do
    {
        process(item);
        item = queue.get_next();
    }
    while (item != null);
}

这种方法还有一个好处:队列会保持非空状态一段时间,因此我们可以减少线程唤醒的次数。

我们还能做些什么来改进队列?
C/C++ 提供了一种简单的方法来使用侵入式容器(容器不分配任何额外数据)。 Java GC 调整得很好,但减少 GC 压力的机会仍然很有吸引力。 让我们试试! 我们要求每个队列项目都有一个我们可以用来构建链表的引用,并且还需要一个用于此引用的修改器。 这看起来不是一个很大的代价!

最终版本如下所示

public class MpscIntrusiveLinkedQueue<T>
{
    private final AtomicReferenceFieldUpdater<T, T> m_itemUpdater;
    private T m_head;
    private final AtomicReference<T> m_tail;

    public MpscIntrusiveLinkedQueue( AtomicReferenceFieldUpdater<T, T> itemUpdater )
    {
        m_itemUpdater = itemUpdater;
        m_tail = new AtomicReference<T>();
    }

    public final boolean put( T item )
    {
        assert( m_itemUpdater.get(item) == null );
        for (;;)
        {
            final T tail = m_tail.get();
            if (m_tail.compareAndSet(tail, item))
            {
                if (tail == null)
                {
                    m_head = item;
                    return true;
                }
                else
                {
                    m_itemUpdater.set( tail, item );
                    return false;
                }
            }
        }
    }

    final T get()
    {
        assert( m_head != null );
        return m_head;
    }

    final T get_next()
    {
        assert( m_head != null );
        final T head = m_head;
        T next = m_itemUpdater.get( head );
        if (next == null)
        {
            m_head = null;
            if (m_tail.compareAndSet(head, null))
                return null;
            while ((next = m_itemUpdater.get(head)) == null);
        }
        m_itemUpdater.lazySet( head, null );
        m_head = next;
        return m_head;
    }
}

使用代码

现在我们可以使用队列实现严格的 ITC,信号量可能是线程同步的最佳选择。

class Item
{
    public volatile Item nextItem; // member to be used by the queue implementation
    public final int value;

    public Item( int value )
    {
        this.value = value;
    }
}

final Semaphore sema = new Semaphore();
final MpscIntrusiveLinkedQueue<Item> queue = new MpscIntrusiveLinkedQueue<Item>(
        AtomicReferenceFieldUpdater.newUpdater(Item.class, Item.class, "nextItem");

生产者

if (queue.put(new Item(1)))
    sema.release();

消费者

try
{
    for (;;)
    {
        sema.acquire();
        Item item = m_queue.get();
        do
        {
            process(item);
            item = m_queue.get_next();
        }
        while (item != null);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }

性能如何?

让我们测试一下(Intel Core i7-860@2.8Ghz,Win7-64):测试运行 2 个线程(生产者和消费者),生产者创建 1000000 个事件并将其分发给消费者。 测量生产者和消费者端的时间

test:
     [java] **** MpscIntrusiveLinkedQueue
     [java] 1000 events dispatched at 0.001243 sec (2 wake ups).
     [java] 1000 events processed at 0.001243 sec.
     [java] **** MpscIntrusiveLinkedQueue
     [java] 1000000 events dispatched at 0.053543 sec (3 wake ups).
     [java] 1000000 events processed at 0.053884 sec.

对于 1000000 个事件,生产者/消费者的时序分布在 0.05 到 0.11 秒之间(具体取决于线程唤醒的次数)。
是好是坏? 谁知道...
让我们用一些已知的东西(Disruptor)来尝试类似的测试

dtest:
     [java] **** Disruptor
     [java] 1000 events dispatched at 0.004439 sec.
     [java] Processed at 0.003985 sec.
     [java] **** Disruptor
     [java] Processed at 0.126224 sec.
     [java] 1000000 events dispatched at 0.126291 sec.

取 Disruptor 和 MpscIntrusiveLinkedQueue 的最佳时间。

源代码已附加到文章,并可在 Github 上找到:https://github.com/js-labs/mpscilq

© . All rights reserved.