托管线程同步






4.83/5 (27投票s)
仅使用托管资源创建线程同步
背景
我经常使用线程,也经常使用同步对象。但除了少数例外,同步对象都是非托管资源。我用它们很长时间了,但最近我才更好地理解了如何使用 Monitor
方法、Wait
和 Pulse
/PulseAll
,并用它们创建了完全托管的同步原语。
EventWaitHandles
从一个线程向另一个线程传递消息并等待这些消息的最简单方法之一是使用某种 EventWaitHandle
,通常是带有共享变量用于数据的 AutoResetEvent
。
AutoResetEvent
有很多问题,因为它使用非托管资源,创建速度非常慢,而且如果你在有线程等待它时 Close()
它,那么这些线程将被永远挂起。
当我第一次看到 Pulse()
时,我认为它可以解决这个问题,但 MSDN 文档建议避免使用它。事实上,单独使用 Pulse()
并不能产生相同的结果,因为 Pulse()
只会释放一个已等待的线程。如果没有任何线程在等待,它不会保持“signalled
”状态,导致下一个尝试等待的线程立即被释放(就像使用 AutoResetEvent
时那样)。
但是,有一个方法可以解决这个问题。通过使用一个布尔变量。
所以,为了信号化对象,我们做类似这样的事情
lock(_lock)
{
_value = true;
Monitor.Pulse(_lock);
}
而 WaitOne()
仅在信号未被信号化时等待。如果信号已发出,它可以重置信号并立即返回。
lock(_lock)
{
while(!_signalled)
Monitor.Wait(_lock);
_value = false;
}
好的。有了这个简单的结构,我就有了 AutoResetEvent.Set()
和 AutoReset.WaitOne()
方法的等价物。
要创建一个 ManualResetEvent
,我需要将 Monitor.Pulse()
替换为 Monitor.PulseAll()
,并且必须删除 _value = false
。
当然,我仍然需要 Reset()
方法,但我还没有展示整个类。
正如我之前所说,AutoResetEvent
的问题之一是,如果 AutoResetEvent
在有线程等待它时被关闭/释放,那么 Thread
可能会永远等待。
要模拟这个问题,你可以使用这段代码
using System.Threading;
namespace ConsoleApplication1
{
public static class Program
{
private static AutoResetEvent _are = new AutoResetEvent(false);
static void Main(string[] args)
{
Thread thread = new Thread(_RunThread);
thread.Start();
Thread.Sleep(1000);
_are.Close();
}
private static void _RunThread(object state)
{
_are.WaitOne();
}
}
}
一秒钟后,AutoResetEvent
被释放,但是等待它的 Thread
没有被释放,也没有因为使用已释放的对象而抛出异常。
如果我只是用 Set()
替换 Close()
,那么等待的单个线程将被释放,它可以 Close()
事件(因为只有一个 Thread
在等待)。但是,如果我有一个以上的 Thread
在等待事件,会发生什么?
在 ManualResetEvent
上调用 Set()
会释放所有线程,但在 AutoResetEvent
上只释放一个线程。另外,如果我不知道哪个是完成的最后一个线程,我将永远无法确定我是否可以 Close()/Dispose()
,所以最好避免 Close()/Dispose()
而不是过早调用它。
我决定我的 ManagedAutoResetEvent
应该是可处置的,但是处置它只会以一种不可逆转的方式 Set
它。所以,当它不再需要时,你 Dispose()
它,所有实际等待的线程,或任何之后决定等待的线程,都将立即被释放。
所以,首先让我们用一个普通的句柄来试试
using System.Threading;
namespace ConsoleApplication1
{
public static class Program
{
private static AutoResetEvent _are = new AutoResetEvent(false);
static void Main(string[] args)
{
_are.Close();
Thread thread1 = new Thread(_RunThread);
thread1.Start();
Thread thread2 = new Thread(_RunThread);
thread2.Start();
}
private static void _RunThread(object state)
{
_are.WaitOne();
}
}
}
我在 Thread
有机会等待它之前就释放了 AutoResetEvent
。使用普通的 AutoResetEvent
会抛出 ObjectDisposedException
,消息是“Safe handle has been closed
”。
但是,将 AutoResetEvent
更改为 ManagedAutoResetEvent
,程序将正常退出
using System.Threading;
using Pfz.Threading;
namespace ConsoleApplication1
{
public static class Program
{
private static ManagedAutoResetEvent _are = new ManagedAutoResetEvent();
static void Main(string[] args)
{
_are.Dispose();
Thread thread1 = new Thread(_RunThread);
thread1.Start();
Thread thread2 = new Thread(_RunThread);
thread2.Start();
}
private static void _RunThread(object state)
{
_are.WaitOne();
}
}
}
你也可以在第一个示例中使用 ManagedAutoResetEvent
,程序将在 1 秒后退出。
为什么这更好?
好吧,我能想到的最好的例子是一个组件,它有一个 AutoResetEvent
,并且可能有任意数量的线程等待该事件。
处置组件应该立即处置其所有内部组件,而 AutoResetEvent
是其中之一。但是使用普通的 AutoResetEvent
会导致许多线程永远等待。
设置 AutoResetEvent
实际上并没有那么有用,因为它只会释放一个线程。但是拥有一个在处置时变为始终设置状态的 AutoResetEvent
将会释放所有等待的线程。当然,Thread
必须检查组件是否已被处置并妥善处理(抛出 Exception
,返回 null
或类似的东西),但那时 Dispose()
的调用将位于正确的位置,即组件的 Dispose()
。没有线程永远等待的风险,也没有复杂代码来发现这是最后一个线程,也没有等待垃圾回收。
所以,为了实现这一点,在处置 ManagedAutoResetEvent
时,我设置了 _value
、_wasDisposed
并调用了 PulseAll()
方法。然后,所有等待方法仅在事件未被处置时才重置事件。所以代码最终变成了这样
using System;
using System.Threading;
namespace Pfz.Threading
{
public sealed class ManagedAutoResetEvent:
IAdvancedDisposable,
IEventWait
{
private readonly object _lock = new object();
private bool _value;
private bool _wasDisposed;
public ManagedAutoResetEvent()
{
}
public ManagedAutoResetEvent(bool initialState)
{
_value = initialState;
}
public void Dispose()
{
lock(_lock)
{
_wasDisposed = true;
_value = true;
Monitor.PulseAll(_lock);
}
}
public bool WasDisposed
{
get
{
return _wasDisposed;
}
}
public void Reset()
{
lock(_lock)
{
if (_wasDisposed)
return;
_value = false;
}
}
public void Set()
{
lock(_lock)
{
_value = true;
Monitor.Pulse(_lock);
}
}
public void WaitOne()
{
lock(_lock)
{
while(!_value)
Monitor.Wait(_lock);
if (!_wasDisposed)
_value = false;
}
}
public bool WaitOne(int millisecondsTimeout)
{
lock(_lock)
{
while(!_value)
if (!Monitor.Wait(_lock, millisecondsTimeout))
return false;
if (!_wasDisposed)
_value = false;
}
return true;
}
public bool WaitOne(TimeSpan timeout)
{
lock(_lock)
{
while(!_value)
if (!Monitor.Wait(_lock, timeout))
return false;
if (!_wasDisposed)
_value = false;
}
return true;
}
}
}
正如你所见,Dispose()
使用了 PulseAll()
,而 WaitOne()
仅在事件未被处置时才重置该值。
有了它,在类的 Dispose()
方法中处置事件就非常安全了,并且可以确保没有线程会永远等待。
更多基于 Monitor.Wait() 和 Monitor.Pulse() 的同步类
我介绍了 AutoResetEvent
的等价物。通过仅将 Pulse()
更改为 PulseAll()
并从 WaitOne()
方法中删除 _value = false;
,我创建了一个 ManagedManualResetEvent
。我试图将两者放在同一个类中,但性能不是很好,所以我选择了保留两个完全独立的类。
但这还不是全部。利用相同的技术,我能够创建一个“真正精简”的 Semaphore
。它不使用布尔值,而是使用一个整数值作为“仍可用”。当你尝试 Wait()
(或 Enter()
……我更喜欢 Enter 这个名字)时,它会检查该值是否为 -1
(已处置)。利用相同的原理,当它被处置时,所有人都会被释放。如果不是,它会检查 availableCount
,如果为零则等待,如果大于零,则将值减一并返回。
缺点
好吧,我的代码有一些缺点。普通的 ManualResetEvent
和 AutoResetEvent
可以跨应用程序域边界。这些新组件不能。
但说实话,我只在尝试使用 MemoryMappedFile
时才使用跨应用程序的 WaitEventHandle
。这些事件在 Wait()
和 Set
上的性能相当(好吧,有时稍慢),但创建和销毁它们要快得多。事实上,我不知道为什么微软创建了 Pulse()/PulseAll()
和 Wait()
方法而不创建 WaitHandle
的托管版本。
锁、自旋锁和乐观锁
最基本的同步不是通过 WaitHandle
完成的,而是通过 lock
完成的。
虽然等待句柄等待某个事件发生(事件变为已信号化),并且在许多情况下会等待很长时间,但锁用于保护资源免受同时访问,因此如果两个线程要执行某项操作,只有一个获得锁,另一个等待。
我已经使用 lock
关键字创建了我的托管 WaitHandle
,但使用 Wait()
和 Pulse()
方法是必须的。
实际上,锁有各种类型。有 ReaderWriterLock
,其中许多读取器可以同时持有锁,但要进行任何更改,用户需要独占写入锁。还有 SpinLock
等等。
我使用锁很多年了,而且很多时候我避免使用 ReaderWriterLock
和 ReaderWriterLockSlim
。
ReaderWriterLock
有一些 bug,而精简版并没有那么精简。在很多情况下,我认为使用完全锁(lock
关键字)比使用 ReadLock
s 更好,因为它会消耗大量 CPU 来获取这样的锁。因此,在很多情况下,我花费在读取锁上的时间比等待另一个线程完成其工作(使用完全锁)的时间还要长。
但是当我做我的游戏时,我认为是时候检查新的锁定机制了。
SpinLock - 我永远不会使用实际的实现
据说 SpinLock
对于许多使用时间非常短的细粒度锁非常有用。它的优化在于它在等待时不会释放其处理器时间,它会一直“旋转”。
所以,当我认为它适用于非常短的锁定时,我非常失望。
未争用的 lock
关键字几乎是 SpinLock
的两倍快。这让我认为 SpinLock
是无用的,因为通过旋转它会快的时候远不如在获取未争用锁时慢的时候常见,毕竟,细粒度锁越多,两个线程尝试同时获取同一锁的可能性就越小。
即使它们仍然有一些优势,我游戏中的锁大多数时候都是未争用的,所以使用 SpinLock
s只会让事情变慢,并且消耗更多 CPU。所以它绝不是一个解决方案。
但我记得我编程 BeOS 的时候。他们谈论“Bemaphores”和乐观锁定。当时我不明白所有细节,但我决定尝试用 Interlocked
类自己创建一个锁。
Interlocked
对于不知道的人来说,Interlocked
类有对值进行加、减和比较交换的数学方法,这些方法是“原子”的。默认情况下,如果两个线程尝试将 1 加到同一个变量上,可能会只加一个,因为两个线程首先读取初始值(比如说 0),加 1(变成 1),然后存储 1。
Interlocked 执行这些操作,确保如果两个线程都加一,最终结果将是 2,每个线程将正确接收其操作的结果,并且它们比这样做快得多
lock(someLock)
_variable++;
所以,要使用 Interlocked
创建一个独占锁,我想到在进入时做类似这样的事情
while(true)
{
if (Interlocked.Increment(ref _value) == 1)
return;
Interlocked.Decrement(ref _value);
Thread.Yield();
}
这是一个乐观锁,因为它开始时加一,如果结果是 1,它就获得锁并可以返回。
如果它无法获取锁,那么它会减去该值并等待。在第一个版本中,我使用了 Thread.Yield()
来等待,因为我对旋转不够了解,不敢相信它,但实际实现使用了 SpinWait
结构,因为它效果更好。
我做了最初的测试,期望一个糟糕的结果,但令我惊讶的是,它的性能甚至超过了自定义的 lock
关键字。
所以,我决定更进一步,创建一个 ReaderWriterLock
。
我通过考虑将 0 到 65535(最后 16 位)的值用于读取器,而高于该值的值用于写入器来实现这一点。
所以,EnterReadLock()
方法看起来像这样
while(true)
{
if (Interlocked.Increment(ref _value) <= 65535)
return;
Interlocked.Decrement(ref _value);
Thread.Yield();
}
它增加值,如果小于写锁,那么只有读取器,所以它可以立即返回。
而 EnterWriteLock()
看起来像这样
while(true)
{
if (Interlocked.Add(ref _value, 65536) == 65536)
return;
Interlocked.Sum(ref _value, -65536);
Thread.Yield();
}
它加上 65536
,如果结果值恰好是那个,那么没有其他读取器或写入器,所以它获得了独占锁。
它成功了,当我将其与 ReaderWriterLockSlim
进行比较时,它大约快了 5 倍(这取决于 CPU 的数量,但在我所有的测试中它都更快)。
实际类更完整,因为它会给写入者一定的优先级,它不会在等待时不断增减,它使用 CompareExchange
来完成工作,它还支持可升级锁。所以,我真的替换了所有 ReaderWriterLockSlim
的用法。
缺点
当然,我的技术有缺点。
- 第一个是使用
SpinWait
。如果等待时间很长,它将消耗过多 CPU。这部分由OptimisticReaderWriterLock
(包含在示例中)解决(它适用于乐观锁,但为长时间等待做好准备……但在快速等待时速度较慢)。但是,你需要在编译时决定使用哪种锁,而在运行时发生等待,所以如果发生长时间等待,使用SpinReaderWriterLock(Slim)
会很糟糕; - 第二个是我只使用一个整数来检查一切,所以我不知道实际线程是否持有锁,并且尝试重新获取写锁会导致死锁。不会有异常告诉你你在做错事,而且递归永远不会被支持;
- 释放锁时没有检查,所以你可以进入写锁然后退出读锁。我通过将代码分成两个类来解决这个问题的一部分。
SpinReaderWriterLockSlim
是真正的锁,它是一个结构。然后SpinReaderWriterLock
是一个包含它的类,并提供返回可处置对象的释放锁的方法,因此会进行适当的退出,并且多次处置它只会退出锁一次。另外,可以使用using
关键字。它比SpinReaderWriterLockSlim
慢,但仍然比 .NET 的ReaderWriterLockSlim
快; - 它不是Abort安全的。
lock
关键字是Abort安全的,但ReaderWriterLockSlim
不是。所以我觉得我的不算太差,毕竟调用Abort()
本身就是一种坏习惯; - 当升级到
WriteLock
时,你**不能**调用EnterWriteLock
。你必须调用UpgradeToWriteLock
或UpgradeableLock()
方法返回的对象上的Upgrade
方法。如果你只是用SpinReaderWriterLock
替换ReaderWriterLockSlim
,这可能会有问题。
优点
- 到目前为止,它证明比
ReaderWriterLockSlim
快。我在单核 CPU 和 4 核 CPU 计算机上进行了测试,在这两种情况下SpinReaderWriterLock
都更快。 - 由于它只使用一个整数来完成工作,所以对象非常小。
- 它不使用非托管资源,所以你不需要
Dispose()
它(事实上,它没有Dispose
方法)。
SpinLockSlim
在最初发布很长时间后,我决定添加一个额外的类。它是 SpinLockSlim
。它与我最初的测试类似,但它使用 SpinWait
结构来等待,并且不增加/减少计数,它使用 Interlocked.CompareExchange()
方法。有了这个,如果锁未被获取,我不会有额外的缓存刷新,而且它在释放锁时也不使用任何交织方法,它简单地将变量设置为 0。很长一段时间以来,我一直认为这是不安全的,但是 .NET 提供了强烈的保证,所有写入都具有“释放”语义,所以当不需要知道其先前值时,设置一个 int
类型变量可以无需使用任何 volatile
或 Interlocked
方法,并且由于释放独占锁意味着我们知道我们拥有该锁,因此无需读取先前的值。通过这种方式,我创建了一种比 .NET 的 SpinLock
实现快得多的锁类型,并且对于许多核心处理器(它们在许多连续锁(如示例中)上可能性能不佳),SpinLockSlim
效果很好。
在我的电脑上,SpinReaderWriterLockSlim
(以及旧的 YieldReaderWriterLockSlim
)的性能比 .NET 的 ReaderWriterLockSlim
差,即使只有读取操作。显然,.NET 实现做了太多事情,并且花费太多时间来获取锁,这给了 CPU 缓存更好的工作机会,而不是这些实现,它们在太多并发情况下速度很快,但 SpinLockSlim
在这种情况下做得更好(即使它是一个完全锁而不是读/写锁)。然而,如果不存在那么多强制并发(如示例中),SpinReaderWriterLockSlim
仍然会更快。
示例
在本文中,你可以下载一个样本,它只进行锁的速度比较,并显示使用 AutoResetEvent
可能发生的死锁,而 ManagedAutoResetEvent
不会发生这种情况。
但是,如果你想要一个使用所有这些资源的完整应用程序,请参阅我的游戏,链接为 此链接 ,因为我决定创建所有这些来改进它。
版本历史
- 2014 年 5 月 30 日。修正了 OptimisticReaderWriterLock 类中可能导致死锁的 bug。
- 2013 年 5 月 3 日。用 SpinReaderWriterLockClasses 替换了 YieldReaderWriterLock 类,并添加了 SpinLockSlim。更新了样本;
- 2011 年 8 月 23 日。第一个版本。