MPSC 无锁侵入式链表队列(带状态)





5.00/5 (2投票s)
MPSC 无锁侵入式链表队列(带状态)
引言
随着可用核心数量的增加,线程间通信 (ITC) 使用队列已成为一种常见模式。 一系列问题和硬件限制为队列实现提出了要求。 目前我们可以描述以下常见的 ITC 队列模式
- 单生产者 - 单消费者队列 (SPSC)
- 多生产者 - 单消费者队列 (MPSC)
- 多生产者 - 多消费者队列 (MPMC)
自 Java 1.5 起,我们就有一些原生队列(甚至无锁队列),但它们都没有提供一种实现严格使用方式的机会。 这意味着什么?
背景
假设我们有一个事件生产者、事件消费者以及一个队列作为它们之间的传输媒介。 通常,生产者和消费者是线程。 我们的目标是避免任何无用的工作。 因此,消费者端的严格实现看起来像(伪代码)
while (1)
{
if (queue.isEmpty())
wait_not_empty();
Item item = queue.get();
process(item);
}
而生产者端的严格实现将是
queue.put(item);
if (queue_was_empty) // and only if
wakeup_consumer_thread();
所有 Java 原生队列和大多数开源项目(仅实现了 java.util.Queue 接口)都无法正确实现这一点。 使用 java.util.Queue 的生产者可能的代码将是
boolean wasEmpty = queue.isEmpty();
if (queue.offer(item) && wasEmpty)
wakeup_consumer_thread();
但它仅在单生产者情况下才能正常工作。 对于多生产者情况,我们需要队列上的原子添加项目并检查空操作。
// Return true if queue was empty
boolean put( Item item );
因此,具有上述要求的 MPSC 队列接口将如下所示:队列项目获取器显然应该类似:它应该返回 2 个值(下一个可用项目和队列的状态,即是否为空)。 Java 没有一种好的方法从方法中返回 2 个值,所以让我们使用 2 步获取
Item get();
Item get_next();
第一个方法应该在确定队列不为空时调用,第二个方法删除队列中的第一个项目并返回下一个项目(如果删除时队列变为空,则返回 null),因此消费者端的实现将如下所示
while (1)
{
sleep();
// now we defenitely know the queue is not empty
Item item = queue.get();
do
{
process(item);
item = queue.get_next();
}
while (item != null);
}
这种方法还有一个好处:队列会保持非空状态一段时间,因此我们可以减少线程唤醒的次数。
我们还能做些什么来改进队列?
C/C++ 提供了一种简单的方法来使用侵入式容器(容器不分配任何额外数据)。 Java GC 调整得很好,但减少 GC 压力的机会仍然很有吸引力。 让我们试试! 我们要求每个队列项目都有一个我们可以用来构建链表的引用,并且还需要一个用于此引用的修改器。 这看起来不是一个很大的代价!
最终版本如下所示
public class MpscIntrusiveLinkedQueue<T>
{
private final AtomicReferenceFieldUpdater<T, T> m_itemUpdater;
private T m_head;
private final AtomicReference<T> m_tail;
public MpscIntrusiveLinkedQueue( AtomicReferenceFieldUpdater<T, T> itemUpdater )
{
m_itemUpdater = itemUpdater;
m_tail = new AtomicReference<T>();
}
public final boolean put( T item )
{
assert( m_itemUpdater.get(item) == null );
for (;;)
{
final T tail = m_tail.get();
if (m_tail.compareAndSet(tail, item))
{
if (tail == null)
{
m_head = item;
return true;
}
else
{
m_itemUpdater.set( tail, item );
return false;
}
}
}
}
final T get()
{
assert( m_head != null );
return m_head;
}
final T get_next()
{
assert( m_head != null );
final T head = m_head;
T next = m_itemUpdater.get( head );
if (next == null)
{
m_head = null;
if (m_tail.compareAndSet(head, null))
return null;
while ((next = m_itemUpdater.get(head)) == null);
}
m_itemUpdater.lazySet( head, null );
m_head = next;
return m_head;
}
}
使用代码
现在我们可以使用队列实现严格的 ITC,信号量可能是线程同步的最佳选择。
class Item
{
public volatile Item nextItem; // member to be used by the queue implementation
public final int value;
public Item( int value )
{
this.value = value;
}
}
final Semaphore sema = new Semaphore();
final MpscIntrusiveLinkedQueue<Item> queue = new MpscIntrusiveLinkedQueue<Item>(
AtomicReferenceFieldUpdater.newUpdater(Item.class, Item.class, "nextItem");
生产者
if (queue.put(new Item(1))) sema.release();
消费者
try
{
for (;;)
{
sema.acquire();
Item item = m_queue.get();
do
{
process(item);
item = m_queue.get_next();
}
while (item != null);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
性能如何?
让我们测试一下(Intel Core i7-860@2.8Ghz,Win7-64):测试运行 2 个线程(生产者和消费者),生产者创建 1000000 个事件并将其分发给消费者。 测量生产者和消费者端的时间
test:
[java] **** MpscIntrusiveLinkedQueue
[java] 1000 events dispatched at 0.001243 sec (2 wake ups).
[java] 1000 events processed at 0.001243 sec.
[java] **** MpscIntrusiveLinkedQueue
[java] 1000000 events dispatched at 0.053543 sec (3 wake ups).
[java] 1000000 events processed at 0.053884 sec.
对于 1000000 个事件,生产者/消费者的时序分布在 0.05 到 0.11 秒之间(具体取决于线程唤醒的次数)。
是好是坏? 谁知道...
让我们用一些已知的东西(Disruptor)来尝试类似的测试
dtest: [java] **** Disruptor [java] 1000 events dispatched at 0.004439 sec. [java] Processed at 0.003985 sec. [java] **** Disruptor [java] Processed at 0.126224 sec. [java] 1000000 events dispatched at 0.126291 sec.
取 Disruptor 和 MpscIntrusiveLinkedQueue 的最佳时间。
源代码已附加到文章,并可在 Github 上找到:https://github.com/js-labs/mpscilq