65.9K
CodeProject 正在变化。 阅读更多。
Home

无死锁读/写锁和线程安全集合

starIconstarIcon
emptyStarIcon
starIcon
emptyStarIconemptyStarIcon

2.75/5 (7投票s)

2010 年 9 月 24 日

MIT

5分钟阅读

viewsIcon

59429

downloadIcon

285

一个不会死锁的读写锁,以及线程安全示例集合

引言

这是一个不会死锁的读写锁。它支持直观的升级锁,并允许高度递归。这意味着您不再需要跟踪线程锁定资源的次数和顺序,大大提高了可管理性。

ReaderWriterLock Wrapper Class 是一个关于将 .NET 的 ReaderWriterLock 类封装成可自动允许升级锁的可释放对象的 CodeProject 文章。但根据我的经验,由于该类存在 bug,使用 ReaderWriterLock 升级锁并非线程安全。

该类使用了一个从头开始设计的锁,不仅允许 ReaderWriterLock 中的所有功能,而且性能更好,并能处理原本会导致死锁的情况。

理解死锁问题

A 部分

lock(A)  // 1. Thread 1 has entered, no other threads can enter lock(A)
         // in Part A or Part B.
{
   sleep(20);  // Sleeps 20 milliseconds to demonstrate race condition failing.
   lock(B)      // 3.
   {

   }
}

B 部分

lock(B)  // 2. Thread 2 has entered, no other threads can enter lock(B)
         // in Part A or Part B.
{
   sleep(20);  // Sleeps 20 milliseconds to demonstrate race condition failing.
   lock(A)      // 4.
   {

   }
} 

以上是演示硬锁如何发生的伪代码。每个锁只允许一个线程进入。如果有两个线程进入,第一个进入的线程会通过,而其他线程会在其位置暂停。

线程 1 进入 A 部分 中的 lock(A)。然后短暂休眠。线程 2 进入 B 部分 中的 lock(B)。然后也短暂休眠。然后,在A 部分的 #3 处,线程 1 在尝试锁定 lock(B) 时被挂起,而 lock(B) 已被线程 2 锁定。然后,在B 部分的 #4 处,线程 2lock(A) 挂起,导致硬锁。

关于死锁的停止问题

一个普遍的误解是,为了停止死锁,必须解决 n-完备停止问题。然而,这是错误的。

"在死锁发生之前检测到死锁的可能性要困难得多,而且实际上是普遍不可判定的,因为停止问题可以被重新表述为死锁场景" - http://en.wikipedia.org/wiki/Deadlock

这是具有误导性的,因为它给人的印象是,在锁定机制中,我们无法控制停止的条件。 停止问题指出,“证明表明,不存在一个总可计算函数来判断任意程序 i 在任意输入 x 上是否会停止。”

分布式死锁预防在文章中表明,停止问题和锁定是互斥的,因为其中所有条件都是已知的。

因此,本文和分布式死锁预防实际上是防止死锁的一种可能的通用解决方案。

引用

"如果我摇一个带有数字文本的魔术 8 球,而且我可以在看到初始答案后更改文本,我相信“毋庸置疑,所有迹象都指向是”(鉴于我有写下这句话的空间)。" -- TamusJRoyce。

解决死锁问题

我的解决方案是允许这种硬锁几乎发生。但我会跟踪所有进入的线程。所以当进入读写锁的线程数量等于被锁定的线程数量时(在锁发生之前检查),我让该线程“按其流程进行”。

按其流程进行的意思是,我将其分配为一个超级线程后进行跟踪。如果它在成为超级线程时进入其他锁,它将阻止所有其他线程,直到这个原本会硬锁的线程完成并解锁。所以这种硬锁预防机制是递归的。

这个想法必须应用两次。一次用于读写锁本身(本地死锁预防),第二次用于所有读写锁之间的全局(分布式死锁预防)。为简单起见,下面仅显示本地死锁预防。

// Class to replace regular locks which one instance alone cannot be deadlocked.
public class RegularLock
{
  private class LockObject : IDisposable
  {
    RegularLock _lock;

    public LockObject(RegularLock __lock)
    {
      _lock = __lock;
    } // End Constructor

    public IDisposable GetLock()
    {
      // Begins locking
      _lock.LockFunction();
      return this;
    } // End GetLock() Method

    public void Dispose()
    {
      _lock.UnlockFunction();
    } // End Dispose() Method
  } // End LockObject class

  //          ManagedThreadId, Number of times each thread recursively locked.
  Dictionary <int,            int> ThreadsEntered  = new Dictionary<int,int>();

  //          ManagedThreadId, Number of times each thread recursively locked.
  Dictionary <int,             int> ThreadsLocked = new Dictionary<int,int>();

  int SuperThreadId    = -1;
  int SuperThreadCount = 0;

  LockObject lockObj = new LockObject(this);

  public IDisposable GetLock()
  {
    // Magic here is that lockObject isn't created each time it is used!
    return lockObject.GetLock();
  } // End GetLock() Method

  bool LockIsNeeded(int currentThreadId)
  {
    lock(ThreadsEntered)
    {
      lock(ThreadsLocked)
      {
        // Guarantees that a hard lock will occur.  Then lets the super thread continue.
        return SuperThreadId != currentThreadId;
      } // End lock
    } // End lock
  } // End LockIsNeeded() Function

  // This could be either read locking, write locking, or normal
  //   locking, depending on how LockIsNeeded() is setup.
  void LockFunction()
  {
    Dim CurrentThreadId As Integer = Thread.CurrentThread.ManagedThreadId;
    Dim CountValue      As Integer

    // This is part of the super thread's "Running its course."
    if (SuperThreadId == CurrentThreadId)
    {
      SuperThreadCount += 1;
    }

    lock(ThreadsEntered)
    {
      if (ThreadsEntered.TryGetValue(CurrentThreadId, CountValue))
      {
        ThreadsEntered(CurrentThreadId) = CountValue + 1;
      } else {
        ThreadsEntered.Add(CurrentThreadId, 0);
      } // End if
    } // End lock

    if (LockIsNeeded(CurrentThreadId))
    {
      lock(ThreadsLocked)
      {
        if (ThreadsLocked.TryGetValue(CurrentThreadId, CountValue))
        {
          ThreadsLocked(CurrentThreadId) = CountValue + 1;
        } else {
          ThreadsLocked.Add(CurrentThreadId, 0);
        } // End If
      } // End lock

      while (LockIsNeeded(CurrentThreadId))
      {
        lock(ThreadsEntered)
        {
          lock(ThreadsLocked)
          {
            if (SuperThreadId == -1 && ThreadsEntered.Count == ThreadsLocked.Count)
            {
              SuperThreadId = CurrentThreadId;
            } // End if
          } // End lock
        } // End lock

        if (SuperThreadId == CurrentThreadId)
        {
          break;
        } // End if

        Thread.Wait(6000);
      } // End while

      lock(ThreadsLocked)
      {
        if (ThreadsLocked.TryGetValue(CurrentThreadId, CountValue))
        {
          if (CountValue == 0)
          {
            ThreadsLocked.Remove(CurrentThreadId);
          } else {
            ThreadsLocked(CurrentThreadId) = CountValue - 1;
          } // End if
        } // End if
      } // End lock
    } // End [LockIsNeeded()] if
  } // End LockFunction() Function

  void UnlockFunction()
  {
    Dim CurrentThreadId As Integer = Thread.CurrentThread.ManagedThreadId;
    Dim CountValue        As Integer

    lock(ThreadsEntered)
    {
      if (ThreadsEntered.TryGetValue(CurrentThreadId, CountValue))
      {
        if (CountValue == 0)
        {
          ThreadsEntered.Remove(CurrentThreadId);
        } else {
          ThreadsEntered(CurrentThreadId) = CountValue - 1;
        } // End if
      } // End if
    } // End lock

    lock(ThreadsEntered)
    {
      lock(ThreadsLocked)
      {
        // Only after the super thread has ran its course is
        // another thread allowed to take its place.
        if (SuperThreadCount == 0 && SuperThreadId == CurrentThreadId)
        {
          SuperThreadId = -1;
        }

        // This is part of the super thread's "Running its course"
        if (SuperThreadId == CurrentThreadId)
        {
          SuperThreadCount -= 1;
        } // End if
      } // End lock
    } // End lock
  } // End UnlockFunction() Function
} // End class

Using the Code

下面的代码展示了使用这个读写锁的详细方法。更多示例可在此处找到。

Imports System
Imports System.Threading
Imports System.Collections.Generic

Public Module MainProgram
  Dim Lock   As New ReadWriteLocker(True)
  Dim List   As New List(Of Integer)(New Integer {0,5,0,4,0,3,0,2,0,1})
  Dim IsBusy As Integer = 0

  Public Sub main()
    InitializeThreads()
  End Sub

  Public Sub InitalizeThreads()
    For Index As Integer = 0 To 25
      Interlocked.Increment(IsBusy)
      ThreadPool.QueueUserWorkItem(FunctionToRunInMultipleThreads, Index)
    Next

    While IsBusy > 0
      Thread.Sleep(500)
    End While
  End Sub

  Public Sub FunctionToRunInMultipleThreads(threadContext As Object)
    Dim threadIndex As Integer = CInt(threadContext)
    Console.WriteLine("thread {0} started...", threadIndex);

    Using ReadLock As IDisposable = Lock.GetReadLock()
      ' Iterate through list
      For Index As Integer = 0 To List.Count - 1
        List(Index) += Index
      Next

      ' Upgrade read lock to write lock.  Yes.  It is that simple.
      Using WriteLock As IDisposable = Lock.GetWriteLock()
        For Index As Integer = 0 To List.Count - 1
          List.Add(List(Index))
        Next
      End Using

      ' Iterate through list
      For Index As Integer = 0 To List.Count - 1
        Console.WriteLine("List value: {0}, on thread {1}.", List(Index), threadIndex)
      Next
    End Using

    Interlocked.Decrement(IsBusy)
  End Sub
End Class

替换 ReaderWriterLock

包含的 NHLReaderWriterLock 设计为 .NET 的 ReaderWriterLock 的即插即用替换。NHLReaderWriterLock 通过线程安全的升级锁来确保正确性。在关闭死锁预防时(开启时性能相当),它的性能也更好。

除了将 ReaderWriterLock 替换为 NHLReaderWriterLock 并将此库包含在项目引用中之外,无需进行其他更改。

替换 ReaderWriterLockSlim

还包含了一个 NHLReaderWriterLockSlim 。但我的目标不是替换 ReaderWriterLockSlim,而是检测潜在的死锁,以便您能够修复它们。然后切换回使用 ReaderWriterLockSlim

如果将来 ReadWriteLocker 使用自旋锁进行设置,帮助其匹配 ReaderWriterLockSlim 的性能,那么它可能成为替换它的可行选项。

停止所有死锁的立场

在某些情况下,它确实会导致死锁。但这些情况是该锁的外部因素。要包含它们,我必须解决 n-完备停止问题才能保证它们不会导致死锁!

我绝没有做到这一点。这远远超出了我的能力。但就像如果您不允许使用 ReaderWriterLockSlim 进行递归锁定,您就不会遇到死锁一样;如果您对这个锁也这样做,您也不会遇到死锁。

然而,我已经将这些情况扩展到包括递归以及升级和降级锁。只要其他锁定机制与此锁互斥运行(它们可以在此锁内部非递归使用,但不能在此锁周围使用),并且每个启动的锁都会被解锁(因此,不涉及停止问题!),就不会发生死锁。

缓存枚举期间更改的集合

下面是显示具有选择是否线程安全功能的集合的代码。那么,为什么在使用这个锁时我们要关闭线程安全性呢?因为它包含事件,告诉我们在遍历列表时何时完成。

因此,这个线程锁即使在禁用线程安全性时,也会告诉我们的集合何时应该缓存修改,何时应该将这些修改直接写入集合。这意味着您可以对这个 ThreadSafeList 执行 for-each 循环并执行 insert/add/change/remove/clear,它不会在 for-each 循环完成后修改列表。

对于正在从中读取集合的多线程实例也是如此。即使允许写入,您也不希望数据在被读取时发生更改。此类是一个很好的示例,说明如何设置任何需要保持不变直到读取完成的资源。

' This example is for ThreadSafeList(Of T) Collection.
Imports NoHardLockReadWriteLocker
Imports ThreadSafeClasses

Public Module MainProgram
  Public Sub main()
    InitializeThreads()
  End Sub

  // List which has thread safety turned on, and initialized to an array of integers.
  Dim List As New ThreadSafeList(Of Integer)(True, New Integer {0,5,0,4,0,3,0,2,0,1})
  Dim IsBusy As Integer = 0

  Public Sub InitalizeThreads()
    For Index As Integer = 0 To 10
      Interlocked.Increment(IsBusy)
      ThreadPool.QueueUserWorkItem(FunctionToRunInMultipleThreads, Index)
    Next

    While IsBusy > 0
      Thread.Sleep(500)
    End While
  End Sub

  Public Sub FunctionToRunInMultipleThreads(threadContext As Object)
    Dim threadIndex As Integer = CInt(threadContext)
    Console.WriteLine("thread {0} started...", threadIndex);

    ' Thread-safety does not need to be turned on for 
    ' just the modifying while enumerating.
    For Each Item As Integer In List
      ' This normally results in a enumeration error!
      List.Add(Item)

      Console.WriteLine("Value {0} found...note that items added in _
			for-each won't appear yet.", Item);
    Next

    For Each Item As Integer In List
      Console.WriteLine("Value {0} found...note that items added in _
			previous for-each appear!", Item);
    Next

    ' This is not thread-safe!  This needs a read lock wrapped around it.
    While (List.Count > 17)
      ' This can throw an error if List.Count is grabbed, an item is removed
      ' via another thread.  Then RemoveAt is called.  That position would be gone.
      List.RemoveAt(List.Count)
    End While

    ' This is thread-safe.  Notice the read lock wrapped around it.
    '
    ' But this will loop forever, because removing an item will be cached
    ' and the count will never decrease.
    Using ReadLock As IDisposable = List.GetReadLock()
      While (List.Count > 14)
        List.RemoveAt(List.Count)
      End While
    End Using

    ' This is correctly thread-safe.
    While (List.Count > 10)
      Using ReadLock As IDisposable = List.GetReadLock()
        List.RemoveAt(List.Count)
      End Using
    End While

    Interlocked.Decrement(IsBusy)
  End Sub
End Module 

下面是使这个线程安全列表工作的实现的概述。

Imports System
Imports System.Threading
Imports System.Collections.Generic

Imports NoHardLockReadWriteLocker

Public Class ThreadSafeList(Of T)
 Implements IList(Of T)

  ' Both classes within this region are accessed within thread safe locking.
  ' Therefore, thread safety does not need to be done within them.
  #Region "Classes"
    ' This is the object that caches modifications to this collection.
    Private Class ModificationCache
      Private Enum ModificationType
        Added
        Removed
        Cleared
      End Enum

      ' This is used to tells what modification to make when applying this cache.
      Private Structure Item
        Public Type As ModificationType
        Public Item As T
        Public Index As Integer

        Public Sub New(ByVal __type As ModificationType, _
		ByVal __item As T, ByVal __index As Integer)
          Type = __type
          Item = __item
          Index = __index
        End Sub
      End Structure

      ' List to apply changes, when requested.
      Private _List As List(Of T)
      Private _Items As New List(Of Item)()

      Public Sub New(__list As List(Of T))
        _List = __list
      End Sub

      Public Sub Add(item As T)
        ' Adds a modification to the cache.
        _Items.Add(New Item(ModificationType.Added, item, -1))
      End Sub

      ...

      ' This is different than the other modification caching functions.
      '   It clears all the previously cached modifications and caches that a
      '   clear need to be performed.
      Public Sub Clear()
        _Items.Clear()
        _Items.Add(New Item(ModificationType.Cleared, Nothing, -1))
      End Sub

      ...
    End Class

    ' Wrapper around an iterator which disposes both the Iterator and ReadLock.
    Private Class ThreadSafeIterator
      Implements IEnumerator(Of T)

      Private _Iterator As IEnumerator(Of T)
      Private _ReadLock As IDisposable
      Private _IsDisposed As Boolean = False

      Public Sub New(ByVal __iterator As IEnumerator(Of T), _
		ByVal __readLockObject As IDisposable)
        _Iterator = __iterator
        _ReadLock = __readLockObject
      End Sub

      Public ReadOnly Property Current() As T _
        Implements IEnumerator(Of T).Current

        Get
          Return _Iterator.Current
        End Get
      End Property

      ...

      Public Sub Dispose()
        _Iterator.Dispose()
        _ReadLock.Dispose()
      End Sub
    End Class
  #End Region

  Private WithEvents _Locker As ReadWriteLocker
  Private _List As List(Of T)
  Private _ModificationCache As ModificationCache

  ' Thread-Safety is an option for this class. 
  ' Remember to set it to True if you need it.
  Public Sub New(Optional ByVal __isThreadSafe As Boolean = False)
    Dim List As New List(Of T)()

    _Locker = New ReadWriteLocker(__isThreadSafe)
    _List = List
    _ModificationCache = New ModificationCache(List)
  End Sub

  ...

  Public ReadOnly Property Count() As Integer _
    Implements ICollection(Of T).Count

    Get
      ' The first instance of locking so far.  As you see in the above sample,
      '   this is really meaningless in a multi-threaded environment unless
      '   you wrap your own Read Lock around this and whatever else is using Count.
      Using ReadLock As IDisposable = _Locker.GetReadLock()
        Return _List.Count

        ' Todo:  Don't Read-Lock, and throw an exception if this is called
        '        and it is not Read-Locked, and Thread-Safety is enabled!
      End Using
    End Get
  End Property

  Default Public Property Item(ByVal index As Integer) As T _
    Implements IList(Of T).Item

    Get
      Using ReadLock As IDisposable = _Locker.GetReadLock()
        Return _List.Item(index)
      End Using
    End Get
    Set(ByVal value As T)
      Using WriteLock As IDisposable = _Locker.GetWriteLock()
        If _Locker.IsReading Then
          _ModificationCache.Assign(_List(index), Value)
        Else
          _List(index) = Value
        End If
      End Using
    End Set
  End Property

  Public Function Contains(ByVal item As T) As Boolean _
    Implements ICollection(Of T).Contains

    Using ReadLock As IDisposable = _Locker.GetReadLock()
      Return _List.Contains(item)
    End Using
  End Function

  ...

  Public Function GetEnumerator() As IEnumerator(Of T) _
    Implements IEnumerable(Of T).GetEnumerator

    ' Starts Read-Locking before the ThreadSafeIterator is even made.
    Dim ReadLock As IDisposable = _Locker.GetReadLock()

    ' Returns a thread-safe iterator.
    Return New ThreadSafeIterator(_List.GetEnumerator(), ReadLock)
  End Function

  Public Sub Add(ByVal item As T) _
    Implements ICollection(Of T).Add

    Using WriteLock As IDisposable = _Locker.GetWriteLock()
      If _Locker.IsReading() Then
        ' Caching modifications if they happen while reading is occurring.
        _ModificationCache.Add(item)
      Else
        _List.Add(item)
      End If
    End Using
  End Sub

  ...

  Public Sub Clear() _
    Implements ICollection(Of T).Clear

    Using WriteLock As IDisposable = _Locker.GetWriteLock()
      If _Locker.IsReading() Then
        _ModificationCache.Clear()
      Else
        _List.Clear()
      End If
    End Using
  End Sub

  ' When iteration is complete, an event is fired that indicates
  ' a read lock has been released.
  '   When there is no more read locks, apply the changes that have been cached.
  Protected Overridable Sub OnUnlocked() _
    Handles _Locker.UnlockedForReading

    ' Doesn't raise the _Locker.UnlockedForReading event, 
    ' as write locking is being done.
    Using WriteLock As IDisposable = _Locker.GetWriteLock()
      If Not _Locker.IsReading Then
        _ModificationCache.PerformCachedModifications()
      End If
    End Using
  End Sub
End Class

历史

  • 2009 年 11 月,最初是作为 ReaderWriterLock 的包装器。
  • 然后,它于 2010 年 3 月修改为使用 ReaderWriterLockSlim
  • 最后,它使用 ReaderWriterLock Alternative 重新实现,该实现最终暴露了足够的功能,我能够准确地防止死锁,时间是 2010 年 9 月。
© . All rights reserved.