一个简单的带重入的 Win32 读写锁






4.29/5 (6投票s)
2006年1月13日
3分钟阅读

84521

1136
一个实现了读写锁的简单实现,支持重入和锁升级。
引言
该代码是一个读写锁的简单实现,支持重入和锁升级,即持有读锁的线程可以请求并被授予写访问权限,前提是没有其他线程持有读锁,并且持有写锁的线程被授予读锁访问权限。
背景
Windows 同步原语不包括对锁定读者和作者的支持。 有时,允许多个线程读取数据是有用的,而无需线程读取数据,而不必仅仅因为其他线程也在读取而阻塞。 仅当数据被更改时才会出现数据损坏的风险。 写访问必须是独占的(对其他写入者和任何读者),但读访问可以在读者之间共享。 允许多个读取器线程共享锁可以提高并发性并降低死锁的风险。 我能找到的现有实现不支持重入,这对于避免我正在处理的应用程序中的死锁至关重要。
使用代码
该代码易于使用,可以调用ClaimReader/ClaimWriter,然后调用ReleaseReader/ReleaseWriter,也可以使用自动锁类AutoLockReader和AutoLockWriter
class ReadWriteLockTLSRelease { public: class TimeoutExpiredException : std::exception {}; class ImplicitEscalationException : std::exception {}; private: volatile int numReaders; int numWriters; CRITICAL_SECTION atomizer; HANDLE spinEvent; // If the timeout exipres it's not a big deal, // as the ClaimXX() function rechecks if it can // claim lock static const int WaitSpinTimeout=1000; static const int MaxSpinIterations=4000; static const int MaxSpinIterationsWrite=4000; int writerThreadId; int tlsSlot; int anyThreadWaiting; class EscalatingPolicyAllow { public: static bool AllowImplicitEscalation() { return true; } }; class EscalatingPolicyDeny { public: static bool AllowImplicitEscalation() { return false; } }; __forceinline int GetTLSReaderCount() { return (int)(INT_PTR)TlsGetValue(tlsSlot); } __forceinline void SetTLSReaderCount(int count) { TlsSetValue(tlsSlot, (LPVOID)(INT_PTR)count); } __forceinline void SpinThreads() { // Unreliable but in case of failure the // timeout rescues us if(anyThreadWaiting>0) PulseEvent(spinEvent); } template<class EscalatingPolicy> inline bool CheckUpgradingFromReaderToWriter() { if(numReaders==0) return false; int readerCount=GetTLSReaderCount(); if(readerCount>0) { // exit read lock if(!EscalatingPolicy::AllowImplicitEscalation()) throw ImplicitEscalationException(); SetTLSReaderCount(-readerCount); while(true) { int old=numReaders; if(old==InterlockedCompareExchange((LONG*)&numReaders, numReaders-readerCount, old)) break; } return true; } return false; } inline void CheckRestorePreviousReaderLock() { int previous=-GetTLSReaderCount(); if(previous>0) { SetTLSReaderCount(previous); numReaders=previous; } } inline void IncrementReaderCount() { SetTLSReaderCount(GetTLSReaderCount()+1); } __forceinline void Spin() { Sleep(0); } __forceinline void AcquireReader() { _ASSERT(numReaders>=0); InterlockedIncrement((LONG*)&numReaders); IncrementReaderCount(); _ASSERT(numWriters==0); //||(numWriters>0&&writerThreadId==myThreadId)); LeaveCriticalSection(&atomizer); } __forceinline void AcquireWriter(int myThreadId) { numWriters++; _ASSERT(numReaders==0); writerThreadId=myThreadId; LeaveCriticalSection(&atomizer); } class TimeoutIgnore { public: __forceinline void CheckExpired(int timeout) const {} }; class TimeoutChecker { unsigned long ticksAtStart; public: TimeoutChecker() { ticksAtStart=timeGetTime(); } void CheckExpired(int timeout) { if(int(timeGetTime()-ticksAtStart)>timeout) throw TimeoutExpiredException(); } }; template<class TimeoutPolicy> __forceinline void ClaimReaderInternal(int timeout) { _ASSERT(numReaders>=0); int myThreadId=GetCurrentThreadId(); // Grant read access if thread already // has write access if(myThreadId==writerThreadId) return; int old=numReaders; if(old>0) { if(old==InterlockedCompareExchange((LONG*)&numReaders, old+1, old)) { _ASSERT(numReaders>=0); IncrementReaderCount(); return; } } TimeoutPolicy t; for(int i=0; i<MaxSpinIterations; ++i) { if(numWriters==0) { EnterCriticalSection(&atomizer); if(numWriters==0) { AcquireReader(); return; } LeaveCriticalSection(&atomizer); } t.CheckExpired(timeout); Spin(); } while(true) { EnterCriticalSection(&atomizer); if(numWriters==0) break; InterlockedIncrement((LONG*)&anyThreadWaiting); LeaveCriticalSection(&atomizer); t.CheckExpired(timeout); WaitForSingleObject(spinEvent, WaitSpinTimeout); InterlockedDecrement((LONG*)&anyThreadWaiting); } AcquireReader(); } template<class TimeoutPolicy, class EscalatingPolicy> __forceinline void ClaimWriterInternal(int timeout) { _ASSERT(numReaders>=0); int myThreadId=GetCurrentThreadId(); TimeoutPolicy t; for(int i=0; i<MaxSpinIterationsWrite; ++i) { EnterCriticalSection(&atomizer); if(numWriters==1&&myThreadId==writerThreadId) { // Reentering write lock AcquireWriter(myThreadId); return; } CheckUpgradingFromReaderToWriter<EscalatingPolicy>(); if(numReaders==0&&numWriters==0) { AcquireWriter(myThreadId); return; } LeaveCriticalSection(&atomizer); t.CheckExpired(timeout); Spin(); } while(true) { EnterCriticalSection(&atomizer); CheckUpgradingFromReaderToWriter<EscalatingPolicy>(); if(numReaders==0&&numWriters==0) { AcquireWriter(myThreadId); return; } t.CheckExpired(timeout); InterlockedIncrement((LONG*)&anyThreadWaiting); LeaveCriticalSection(&atomizer); WaitForSingleObject(spinEvent, WaitSpinTimeout); InterlockedDecrement((LONG*)&anyThreadWaiting); } } public: ~ReadWriteLockTLSRelease() { anyThreadWaiting=0; DeleteCriticalSection(&atomizer); CloseHandle(spinEvent); TlsFree(tlsSlot); } ReadWriteLockTLSRelease() { InitializeCriticalSection(&atomizer); numReaders=numWriters=0; spinEvent=CreateEvent(NULL,TRUE,FALSE,NULL); // The slot default value is 0 (NULL) for each // thread (not clearly documented) tlsSlot=TlsAlloc(); if(tlsSlot==TLS_OUT_OF_INDEXES) throw std::exception("Out of TLS slots"); } void ClaimWriterAllowEscalating() { return ClaimWriterInternal<TimeoutIgnore, EscalatingPolicyAllow>(INFINITE); } void ClaimWriterAllowEscalating(int timeout) { return ClaimWriterInternal<TimeoutChecker, EscalatingPolicyAllow>(timeout); } void ClaimWriterNoEscalating() { return ClaimWriterInternal<TimeoutIgnore, EscalatingPolicyDeny>(INFINITE); } void ClaimWriterNoEscalating(int timeout) { return ClaimWriterInternal<TimeoutChecker, EscalatingPolicyDeny>(timeout); } void ClaimReader() { return ClaimReaderInternal<TimeoutIgnore>(INFINITE); } void ClaimReader(int timeout) { return ClaimReaderInternal<TimeoutChecker>(timeout); } void ReleaseWriter() { _ASSERT(numReaders>=0); EnterCriticalSection(&atomizer); _ASSERT(numWriters>0); numWriters--; if(0==numWriters) writerThreadId=0; CheckRestorePreviousReaderLock(); SpinThreads(); LeaveCriticalSection(&atomizer); } void ReleaseReader() { _ASSERT(numReaders>=0); EnterCriticalSection(&atomizer); int myThreadId=GetCurrentThreadId(); // if numWriters>0 I am also have the writer lock if(numWriters==0) { InterlockedDecrement((LONG*)&numReaders); SetTLSReaderCount(GetTLSReaderCount()-1); _ASSERT(numReaders>=0); _ASSERT(numWriters==0); SpinThreads(); } LeaveCriticalSection(&atomizer); } };
关注点
当一个线程在获得读锁后请求写锁时,读锁被释放,线程等待直到没有读者,并且授予写锁。 这是为了避免两个线程持有读锁并同时请求写锁而导致的死锁。 锁升级时的释放和重新获取可能会导致数据损坏,因为线程在持有读锁时可能获得的信息可能与授予写锁后受锁保护的数据的状态不一致,因为其他写入者线程可能在线程将其锁从读取升级到写入时获得了该锁。 为了避免这种情况,升级后的线程必须意识到,在请求写锁之后,它必须重新获取关于锁保护的数据的信息,以便进行写操作。 如果不希望出现此行为,可以使用 ClaimWriterNoEscalating(),在这种情况下,如果锁发现线程已经持有读锁,则会抛出一个 ImplicitEscalationException 异常。
性能
我已经运行了一个简单的基准程序(包含在源代码示例中),结果如下图所示

完全可重入锁比简单的 Windows 临界区慢大约 3 倍,这还不错。 它比 .NET ReaderWriterLock 快。 最大的性能提升来自于避免使用重量级操作系统同步原语,而是使用主动等待加上 Sleep(0)。
历史
- 1-10-06 
- 首次发布。
 
- 1-12-06 
- 已测量 .NET ReaderWriterLock。
 
- 已测量 .NET 
- 1-20-06 
- 使用自旋锁和线程本地存储来实现 8 倍加速、超时、在升级时释放读锁,可以选择允许或禁止隐式升级。
 
