同步、线程安全和性能
关于线程安全同步问题及解决方案的讲解。
介绍
本文概述了通往线程安全和同步智慧的愚者之路。虽然我主要使用 C# 编码,但本文可能对跨语言和平台都有益处。
目录
- 背景
促使我写下这篇文章的原因。 - 基本同步
简单同步问题的示例。 - 超时!
通过使锁超时简单且防错,改进锁定模式。 - 读/写锁
简化ReaderWriterLockSlim
以便更常用。 - 双重检查锁定 (DCL) 模式 = 不好的做法?
乍听起来是个好主意,但通常会遇到问题。 - 结合 DCL 和读/写
当您确实需要时,并且准备好进行编码时,使用 DCL 可以提高读/写锁定的性能。 - 结论/总结
经验教训...
背景
我花了大量时间尝试创建能减轻我担心线程安全的类。这可能看起来是愚蠢的企图,但我认为我在代码和理解方面都取得了很大的进步。
我开始时编写了一个类来帮助我解决这些难题:线程安全同步更容易(在本文发布后部分已过时)。感谢评论中许多知识渊博的参与者,我能够发现许多问题,解决了其中大部分,并理解了剩余的。
如果您只是想为 .NET 寻找一种快速简便的同步解决方案,那么 System.Collections.Concurrent
命名空间通常可以满足您的需求。但有很多情况下,您会收到一个本身并非线程安全的集合,并且您需要处理给定的对象而不是利用已有的线程安全版本。有些类提供了同步版本,可以帮助减轻一些担忧。例如 .NET 的 HashTable
有一个 Synchronized
成员,它是一个包装器。但最终,它可能无法满足您的同步需求,并且实际上只是防止在读写时抛出错误。
基本同步
我们都希望代码能神奇般地工作,但有时会出现意想不到的问题。并发就是其中一个令人头疼的问题。如果您以前从未为线程安全编写过代码,请注意,可能会在代码行之间发生意想不到的情况。在异步条件下,常见操作根本不是线程安全的。
// Synchronus example:
dictionary.Add(key, newValue);
// ... Sometime later ...
object value = dictionary[key];
上面的示例 在隔离到本地函数和本地字典时应该可以正常工作。但当在异步条件下以高频率重复时,错误会相当频繁地发生。
// Thread A:
dictionary.Add(key, newValue);
// Thread B:
object value = dictionary[key];
希望我们大多数人看到上面的示例时,都会清楚它不是线程安全的。假设添加操作会在值已存在时失败并引发异常,并且按键查找操作在键不存在时也会失败……让我们一步一步地尝试修复它……
// Thread A:
if(!dictionary.ContainsKey(key))
dictionary.Add(key, newValue);
// Thread B:
object value = dictionary.ContainsKey(key) ?
dictionary[key] : null;
我们纠正问题的第一个尝试是对异常的反应。通过尝试使用 ContainsKey
方法,我们希望阻止异常发生,并允许后续代码处理这些情况。
但立即就出现了问题。线程 A 可能会经常失败,因为当本地添加操作发生时,另一个线程可能已经在“代码行之间”添加了该值。另一方面,线程 B 不应该抛出异常,但下面的代码即使值实际存在,也可能返回 null。
所以您会想,“看起来我需要同步这些操作。” 好的,我们来做……
// Thread A:
lock(dictionary)
if(!dictionary.ContainsKey(key))
dictionary.Add(key, newValue);
// Thread B:
object value;
lock(dictionary)
value = dictionary.ContainsKey(key) ?
dictionary[key] : null;
现在上面的示例是线程安全的,并且性能可以接受,只要这些是字典上唯一的操作。但在读取时锁定字典似乎有点大材小用。读取操作本身难道不是线程安全的吗? 答案是否定的。很遗憾,几乎总是否定的。大多数时候,您只是可以侥幸逃脱。假设我们已经解决了添加新值的所有问题,我们如何改进上面的示例?
// Thread B:
object value = null;
dictionary.TryGetValue(key, out value);
对于线程 B,这样看起来更好,对吧? 摆脱 Contains
调用,而只是通过字典的 TryGetValue
方法获取值应该可以消除所有担忧,对吧? 嗯,是也不是。是的,读取值时的锁消失了,这将提高性能,但现在添加操作可能在尝试获取值的过程中发生。
这里的问题是我们无法确定在写入操作进行过程中 TryGetValue
的行为。
如果您走到这里,那么您在这个过度简化的场景中基本上是幸运的。对于 .NET 的 Dictionary<TKey,TValue>
,文档指出在没有任何写入操作的情况下,多次读取是安全的。如果在多次读取过程中发生写入,可能会导致数据损坏。
很遗憾,为了确保安全,我们必须这样做……
// Thread B:
object value = null;
lock(dictionary)
dictionary.TryGetValue(key, out value);
到目前为止,似乎唯一真正的线程安全同步是锁定写入和读取。
总得有更好的方法,对吧? 让我们深入研究……
超时!
如果您处理的是隔离的集合,那么对于大多数情况,lock
关键字就足够了。但随着您进入复杂性,使用带超时的锁定是更安全、更具揭示性的方法。就我而言,我几乎总是使用超时值。锁不应该持续很长时间,如果您有长时间运行的操作,强烈建议使用超时来揭示可能的死锁或性能不佳的代码。
/// <summary>
/// Applies a lock on the syncObject before executing the provided Action with a timeout.
/// Throws a TimeoutException if throwsOnTimeout is true (default) and a lock could not be aquired.
/// </summary>
/// <param name="lockObject">Object used for synchronization.</param>
/// <param name="closure">The action to execute once a lock is acquired.</param>
/// <param name="millisecondsTimeout">Maximum time allowed to wait for a lock.</param>
/// <param name="throwsOnTimeout">If true and a timeout is reached, then a TimeoutException is thrown.
/// If false and a timeout is reached, then it this method returns false
/// and allows the caller to handle the failed lock.</param>
/// <returns>
/// True if a lock was acquired and the Action executed.
/// False if throwsOnTimeout is false and could not acquire a lock.
/// </returns>
public static bool Lock(object lockObject,
Action closure, int millisecondsTimeout, bool throwsOnTimeout = true)
{
Contract.Requires<ArgumentNullException>(lockObject != null);
Contract.Requires<ArgumentNullException>(closure != null);
Contract.Requires<ArgumentOutOfRangeException>(millisecondsTimeout >= 0);
bool lockTaken = false;
try
{
Monitor.TryEnter(lockObject, millisecondsTimeout, ref lockTaken);
if (!lockTaken)
{
if (throwsOnTimeout)
throw new TimeoutException("Could not gain a lock within the timeout specified.");
return false;
}
closure();
}
finally
{
if (lockTaken) Monitor.Exit(lockObject);
}
return true;
}
上面的静态函数可用于简化带超时的锁定过程。您可以轻松地替换此……
lock(dictionary)
if(!dictionary.ContainsKey(key))
dictionary.Add(key, newValue);
用这个……
Lock(dictionary,()=>{
if(!dictionary.ContainsKey(key))
dictionary.Add(key, newValue);
}, 1000); // If a lock could be acquired, a timeout exeption is thrown.
或者这个……
if(
!Lock(
dictionary,
()=> if(!dictionary.ContainsKey(key)) dictionary.Add(key, newValue),
1000, false)
) // If a lock could not be acquired...
doSomethingElse();
扩展和委托函数在管理我们的同步代码方面非常有帮助。此扩展程序可以正确且安全地使用 Monitor.TryEnter
方法并确保正确退出。它与 lock
关键字在后台执行的操作非常相似,但带有超时。因此,无论委托的 Action 执行什么操作,我们都可以避免打开锁,并提供一种处理无法获取锁的方式。
但是,如果我们必须在读取和写入时都锁定,这是否仍然存在性能问题?
读/写锁
当您有一系列可能导致不安全同步的操作时,使用读/写锁可以获得一些性能优势。读/写锁通常允许多个读取和一个写入。因此,基本上,在没有活动写入操作时,多个读取可以进出而不会被阻止。一旦请求写入操作,写入操作就会被阻止,直到之前的读取完成,并且后续的读取会被阻止,直到写入操作完成。
让我们开始通过一些 ReaderWriterLockSlim
的扩展来简化和防错我们的代码。
/// <summary>
/// ReaderWriterLockSlim extension for synchronizing read access.
/// </summary>
/// <param name="target">ReaderWriterLockSlim to execute on.</param>
/// <param name="closure">Action to execute once a lock is acquired.</param>
/// <param name="millisecondsTimeout">Indicates if and for how long a timeout is used to acquire a lock.</param>
/// <param name="throwsOnTimeout">If this parameter is true,
/// then if a timeout value is reached, an exception is thrown.</param>
/// <returns>Returns false if a timeout is reached.</returns>
public static bool Read(this ReaderWriterLockSlim target,
Action closure, int? millisecondsTimeout = null, bool throwsOnTimeout = false)
{
Contract.Requires<ArgumentNullException>(closure != null);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeout==null || millisecondsTimeout >= 0
);
bool lockHeld = false;
try
{
if(millisecondsTimeout==null)
target.EnterReadLock();
else if (!target.TryEnterReadLock(millisecondsTimeout.Value))
{
if (throwsOnTimeout)
throw new TimeoutException(
"Could not gain a read lock within the timeout specified. "+
"(millisecondsTimeout=" + millisecondsTimeout.Value + ") ");
return false;
}
lockHeld = true;
closure();
}
finally
{
if (lockHeld)
target.ExitReadLock();
}
return lockHeld;
}
/// <summary>
/// ReaderWriterLockSlim extension for synchronizing write access.
/// </summary>
/// <param name="target">ReaderWriterLockSlim to execute on.</param>
/// <param name="closure">Action to execute once a lock is acquired.</param>
/// <param name="millisecondsTimeout">Indicates if and for how long a timeout is used to acquire a lock.</param>
/// <param name="throwsOnTimeout">If this parameter is true,
/// then if a timeout value is reached, an exception is thrown.</param>
/// <returns>Returns false if a timeout is reached.</returns>
public static bool Write(this ReaderWriterLockSlim target,
Action closure, int? millisecondsTimeout = null, bool throwsOnTimeout = false)
{
Contract.Requires<ArgumentNullException>(closure != null);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeout==null || millisecondsTimeout >= 0
);
bool lockHeld = false;
try
{
if (millisecondsTimeout == null)
target.EnterWriteLock();
else if (!target.TryEnterWriteLock(millisecondsTimeout.Value))
{
if (throwsOnTimeout)
throw new TimeoutException(
"Could not gain a read lock within the timeout specified. "+
"(millisecondsTimeout=" + millisecondsTimeout.Value + ") ");
return false;
}
lockHeld = true;
closure();
}
finally
{
if (lockHeld)
target.ExitWriteLock();
}
return lockHeld;
}
上述扩展的作用是提供一种简单的机制,用于在持有相应锁的情况下执行读取和写入操作。 它们还允许像我们在上一节中演示的那样进行超时。
现在有了我们新的扩展,让我们尝试改进之前的示例……
// Some permanent instance in respect to the dictionary.
ReaderWriterLockSlim sync = new ReaderWriterLockSlim();
// Thread A:
sync.Write( ()=>
if(!dictionary.ContainsKey(key))
dictionary.Add(key, newValue)
);
// Thread B:
object value;
sync.Read( ()=>
value = dictionary.ContainsKey(key) ?
dictionary[key] : null
);
这个改进的示例确实是同步的。 它不会违反“写入时允许多次读取”的规则。因此,如果准确的同步很重要,那么这就是可行的方法。如其本身而言,它不会像简单地使用 lock
关键字进行添加那样高效。但如果您使用的是不同类型的集合/字典,或者有更复杂的同步操作,这可能会更好,尤其是当读取的比例远大于写入的比例时。
我在以下位置提供了一个有用的扩展列表:ReaderWriterLockSlim
扩展
注意:请记住,使用 ReaderWriterLockSlim
,您可以指定允许递归,但不能在拥有读取锁时简单地进入写入锁,反之亦然。但您可以升级或降级您的锁定,但一次只能有一个升级的锁……有关更多信息,请参阅 ReaderWriterLockSlim 文档。
双重检查锁定 (DCL) 模式 = 不好的做法?
为了提高常见同步操作的性能,我们可以采用在实际锁定之前先检查是否需要锁定的概念。
首先,让我们看看如何同步单个属性:
private readonly object _countLock = new object(); private int _count; public int Count { get { return _count; } } public int IncrementCount() { lock(_countLock) return _count++; }
在上面的示例中,我们可以查询我们的内部属性,并通过单个公共接口更改它。读取部分无需同步,因为在退出只读访问器后,值可能已经更改。如果对这个属性有更大的同步需求,那么我们需要公开一个 SyncRoot,外部代码可以从中进行同步。
就本身而言,这段代码没有固有的错误。 真正的问题在于接口本身。 如果准确性很重要,那么这段简单的代码可能不是我们所需要的一切。 我们可能期望当我们调用 count 时,我们得到的是那一刻的值。 没有已经持有锁,这是不可能实现的。
接下来,让我们将 DCL 应用于单个属性:
private bool _hasValue; public T HasValueAlready { get { return _hasValue; } } private readonly object _lock = new object(); private T _value; public T Value { get { if(!_hasValue) { lock(_lock) { if(!_hasValue) { // The important part is NOT TO CHANGE THE DCL CONDITION UNTIL THE END. // placing _hasValue = true here would break the pattern // and return inconsistent (null) results. _value = /* some code that generates the value */; // Place the condition update after the real work is done. _hasValue = true; } } } return _value; } }
上面的示例假设一旦值建立,它就不会回滚。
这是一个 DCL 的安全示例。 布尔值是同步值的“门”,所有同步都发生在 Value
访问器内。 如果您使用 _value==null
或类似的东西作为检查,您仍然需要确保仅在模式的最后才更新 _value
。 但使用值作为“门”存在一些有趣的风险,这超出了本文的范围。
尽可能使用 LazyInitializer.EnsureInitialized
如果您正在考虑使用 DCL 来初始化属性,请考虑改用 LazyInitializer.EnsureInitialized
。 它很简单,并为您完成了所有魔术,但有一个小小的注意事项:它必须是一个对象,并且不能初始化为 null。 如果您需要不符合此描述的内容,请小心,并确保您正确实现了 DCL。
接下来,我们将 DCL 应用于字典。
object value;
if(!dictionary.TryGetValue(key, out value))
lock(dictionary)
if(!dictionary.TryGetValue(key, out value))
dictionary.Add(key, value = newValue);
上面的示例基本上是一个 GetOrAdd
方法。它尝试获取现有值,如果不存在,则添加它。概念上,这简化了过程,因此只有添加需要锁定。您必须检查两次,因为当获取锁时,另一个线程可能已经添加了该值。如果上述示例是 100% 隔离的,并且没有其他代码可以访问字典,那么它是线程安全的。当允许在模式之外访问字典时,线程安全性就会中断。
但性能如何呢? 这取决于。在大多数情况下,基于您第一次获取值与后续获取值的频率,您可以期望这有助于提高性能。现在让我们看另一个示例。
object value;
if(!dictionary.TryGetValue(key, out value))
lock(dictionary)
if(!dictionary.TryGetValue(key, out value))
dictionary.Add(key, value = newValueFactory());
在这个示例中,我们不是简单地添加值,而是执行一个值工厂来生成该值。虽然在隔离下是线程安全的,但这可能导致性能非常差。newValueFactory
返回值所需的时间越长,此操作的性能就越差。它唯一的优点是读取现有值不会被阻止。问题在于,在读取缺失值时,不仅可能存在长时间的等待,而且还有一队其他线程在等待轮流向集合添加其他值。
如果我们把乐观主义应用到我们的公式中……
object value;
if(!dictionary.TryGetValue(key, out value)) {
value = newValueFactory();
lock(dictionary)
if(!dictionary.TryGetValue(key, out value))
dictionary.Add(key, value);
}
这个示例解决了一个问题,但创造了另一个。我们不是在生成值时锁定,而是在应用锁之前生成值。这意味着我们消除了长时间的锁定时间,但我们很可能执行冗余过程并浪费 CPU 周期。
ConcurrentDictionary
的 GetOrAdd
方法存在同样的问题。您可以传递一个值工厂,它会有效地管理值的添加和获取,但值工厂可能会被多次执行,并且您直到方法完成后才能确定哪个是获胜者。这对于您可能只需要初始化轻量级对象的情况来说性能很好。但在您进行一些繁重或耗时较长的操作时,您将浪费周期,因为线程会争夺成为第一个生成和添加值的线程。
DCL 坏? 是的,假设它很坏,几乎总是……
DCL 模式的真正问题在于,大多数时候它的实现方式不能保证不违反“写入时允许多次读取”规则。再次,如果您走到这里,您
- 可能很幸运,
- 没有看到潜在问题的症状,
- 或者实际上并没有将您的代码暴露给异步环境。
结合 DCL 和读/写
当您在黑暗的线程和同步的森林中蜿蜒前行时,最终会发现需要使用读/写锁来最大化性能并保持可靠性。
机制 1:基本集合/字典访问
再次,我们查看 System.Collections.Concurrent
,发现它可能正是我们需要的。但有时我们并没有使用这些类的奢侈,或者它们非阻塞的方式也无法胜任。
首先,我们必须同步对数据的读/写访问,以避免违反“无写入时允许多次读取”的规则。我将在此示例中使用 SortedDictionary
,因为它不仅未同步,而且还提供了保持条目排序的额外好处。我将使用我之前提供的ReaderWriterLockSlim
扩展来简化示例……
// Need a single instance to control read/write access to our collection.
private readonly ReadWriterLockSlim _syncLock
= new ReadWriterLockSlim();
private readonly SortedDictionary<string,object> _myDictionary
= new SortedDictionary<string,object>();
protected object GetEntryByKeyInternal(string key)
{
object result = null;
!_myDictionary.TryGetValue(key, out result);
return result;
}
public object GetEntryByKey(string key)
{
return _syncLock.ReadValue(()=>GetEntryByKeyInternal(key));
}
public void AddEntry(string key, object value)
{
_syncLock.Write(()=>_myDictionary.Add(key,value));
}
public object GetOrAddEntry(string key, object addValue)
{
object value = null;
// This incorporates the DCL within a cascade of read, read upgradable,
// and then write locks.
_syncLock.ReadWriteConditionalOptimized(
(isWriteLock) => (value = GetEntryByKeyInternal(key))==null,
() => _myDictionary.Add(key,value = addValue)
);
return value;
}
在上面的示例中,我们提供了一些接口方法来同步内部字典。这是相当基本的读/写锁定,并带有一个 GetOrAddEntry
的特殊示例。如果您查看ReaderWriterLockSlim
扩展的源代码,您将看到
ReadWriteConditionalOptimized
方法执行以下操作:
- 获取读取锁。
- 然后在读取锁存在时,尝试从字典中获取值。
- 如果不存在值,则获取可升级的读取锁。
- 然后,在可升级的读取锁存在时,尝试从字典中获取值。
- 如果不存在值,则在可升级的读取锁内获取写入锁。
- 然后,在写入锁存在时,它尝试从字典中获取值,如果不存在值,则更新值并添加
addValue
。 - 返回该值。
此过程优化了 DCL 和 ReaderWriterLockSlim
的使用。一次只能有一个可升级的读取锁,因此在第一次传递时执行标准读取锁更好。
您可能注意到该扩展在升级到写入锁后重新检查值。 虽然对于 ReaderWriterLockSlim 的后期版本来说这是不必要的,但我的看法是,我想确保安全,因为它似乎可能条件函数的返回值在一次锁定和下一次锁定之间会发生变化。
此时,您中的一些人可能会说,“废话”,或者“当然”,或者“好的,接下来是什么”。但这就提供了一个同步集合/字典使用读/写锁的重要初始过程。您确实无法避免同步添加值和获取值。要么使用 lock
语句,要么使用读/写机制。
那么我们完成了吗? 只有当您只关心更新集合内容的性能时。但当您需要同步单个键/值时怎么办? 如果您正在使用值工厂并且不想不必要地滥用 CPU 周期怎么办?
机制 2:在创建键/值时阻止线程
您可能想在这里停止。这个有点难以理解。人们(包括我)编写了整个类来处理这个问题。您可能想重构您的代码以使用并发类并擦干双手。但同样,您可能会面临一种情况,即您没有这种奢侈。
好的实践表明,如果可以避免锁定,就避免。获取任何类型的锁都需要额外的时间,然后可能会阻止其他线程。因此,只要有可能,避免锁定是好的。其中一个真实案例是在添加值之前创建值。如果您正在生成一个值,并且该操作需要一些时间来执行,如果您只使用机制 1,那么您可能会不必要地阻止所有线程。您只需要阻止那些需要访问您即将更新的值的线程。我们该怎么做?
您仍然需要利用之前的机制,但现在我们必须为字典中的每个条目重复该模式! 在实际应用程序中,我有一个基类,它会定期清理和处置未使用的 ReaderWriterLockSlim
对象。但这会偏离主题太远,涉及到处理模式以及清理线程和优化清理。所以为了简单起见,让我们假设在这个示例中,我们的 ReaderWriterLockSlim
注册表只是保留锁,直到父类被丢弃。
首先,让我们准备条目锁注册表。
private readonly ConcurrentDictionary<string, ReaderWriterLockSlim> _entryLockRegistry
= new ConcurrentDictionary<string, ReaderWriterLockSlim>();
protected ReaderWriterLockSlim GetLock(string key)
{
ReaderWriterLockSlim result;
{
// Since more than one ReaderWriterLockSlim can created,
// we need to dispose of the unneeded one.
ReaderWriterLockSlim created = null;
result = Locks.GetOrAdd(key, k => created = new ReaderWriterLockSlim());
if (created != null && created != result)
created.Dispose();
}
return result;
}
为了安全起见,让我们更新我们的公共 GetEntryByKey
和 AddEntry
方法……
public object GetEntryByKey(string key)
{
return GetLock(key).ReadValue(()=> // Always lock on key first to prevent deadlocks.
_syncLock.ReadValue(()=>GetEntryByKeyInternal(key))
);
}
public void AddEntry(string key, object value)
{
GetLock(key).Write(()=> // Always lock on key first to prevent deadlocks.
_syncLock.Write(()=>_myDictionary.Add(key,value))
);
}
现在让我们添加我们的工厂 GetOrAddEntry
方法……
public object GetOrAddEntry(string key, Func<object> addValueFactory)
{
object value = null;
GetLock(key).ReadWriteConditionalOptimized(
(isWriteLock) => (value = GetEntryByKeyInternal(key))==null,
() => value = GetOrAddEntry(key,addValueFactory())
);
return value;
}
上面的代码添加了一个通过工厂添加条目的新方法。它看起来与之前的 GetOrAddEntry
方法基本相同,除了几个不同之处。
它正在使用该特定键的唯一 ReaderWriterLockSlim
。并且在持有该键的锁时,它利用之前的 GetOrAddEntry
方法来添加值。
这种两级嵌套锁链确保了集合的实际读/写操作尽可能少。值的创建和添加会阻止后续线程尝试,直到第一个线程完成。因此,真正地以最佳性能实现了同步。
明显的批评:缺点是,正如我之前所说,ReaderWriterLockSlim
对象不会被清理,而是闲置在那里,但这是一个另一个话题。需要记住的重要一点是,如果您过于积极地清理和处置它们,您可能会对性能产生负面影响。
结论/总结
我非常感谢所有以前批评和审查过我的样本和代码的人。要做到正确并不容易。但我真的觉得,从你们的建议中,我尽了我最大的努力去理解并创造出一些真正有效的东西。 这段旅程很艰难。我确定很多人都同意,线程和同步不是一个容易的话题。但有解决方案。有些可能很简单,有些可能有些大材小用。但总有一个答案。
重要的技巧/要点……
- 重复的轻量级操作实际上可能在并行执行时变慢。
- 如果您不需要,请远离多线程和同步。
- 如果需要,请使用并发类,但要记住它们底层的工作原理。
- 使用扩展编写更简单、更安全的锁定和同步代码。
lock
关键字 (Monitor.Enter
) 实际上相当快,并且绝对比ReaderWriterLockSlim
快。在原型同步代码时,不要害怕使用它。- 使用超时! 在大多数情况下,如果您需要锁,就没有理由不设置超时。当您认为您不需要它时,可能实际上是您最需要它的时候。如果您预计您的锁会很快完成,那么任何超时都会帮助揭示死锁或代码更深层次的问题。
- 双重检查锁定模式不是线程安全的,除非检查部分在所有条件下都保证线程安全。最终,DCL 对于在正确读取锁定且写入操作可能需要一些时间才能完成时,在性能方面有其优点。