快速读/写锁
对读者/写者锁同步对象的优化实现。
引言
读者/写者锁(RWLock)是一种用于在多线程软件中同步对资源/对象的访问的机制。更多信息可以在以下文章中找到:
背景
读者/写者锁的实现需要谨慎,因为如果处理不当,它可能会降低多线程应用程序/服务器的整体性能和稳定性。幸运的是,x86 CPU具有专门的指令,可以在多处理器系统上实现完美的同步,这使得完全稳定的RWLock实现成为可能。由于这只会带来好处,我们将使用汇编语言来完成整个实现,将其打包成一个.lib文件,并在我们的C/C++软件中使用它。
但是,仅仅用汇编编写并不是一个完整的解决方案——我们必须了解Windows是如何工作的,以及它实际上工作得有多快。我将使用运行在Sempron3000+上的Windows 2000 SP4进行基准测试。
首先,**上下文切换**。普遍认为它非常慢……但事实并非如此。实际的数字是:如果没有其他线程活动,需要322个周期;当有一个其他线程活动时,需要587个周期;当有2-80个活动线程时,需要600-700个周期。这个基准测试在运行530-610个线程(来自27-36个进程)且CPU负载为10-30%的情况下进行了多次。因此,如果我们的应用程序与另一个应用程序切换,而该应用程序只是简单地切回,那么只会有1175个周期过去。此外,切换到另一个进程的线程只需要额外的十几个周期。
应用程序可以通过SwitchToThread()
API强制进行上下文切换。然而,此函数在Win98上不可用(只提供了一个存根),因此我们必须使用Sleep(0)
。但是,Sleep(0)
会忽略比当前线程优先级高的线程,因此我们应该避免在基于NT的Windows(Win2K、XP、Server2K3、Vista)上使用它。因此,我们的代码应该在Windows 98上调用Sleep(0)
,在较新的操作系统上调用SwitchToThread()
。
- **内核事件**通常用于通知某个对象/数据已准备就绪:在这里,我们可以将它们用作通知RWLock对象已准备好进行读取或写入。Quynh Nguyen已经在这里介绍了这样一种方法。在那里,需要通知的线程调用
CreateEvent(...)
并开始等待,通过WaitForSingleObject(...)
。从那时起,线程调度程序将忽略该线程,直到指定的事件被触发。但是,CreateEvent(...)
本身需要1500-1900个周期,这相当于三次线程切换的时间。SetEvent()
,用于触发事件,至少需要600个周期。因此,基于事件的方法适用于锁定时长(例如,独占写入)至少比系统的定时器周期(通常为15ms)长 (1900+600)/700*Quantum/3 倍的情况。“Quantum”在桌面系统上为6,在服务器上最多为36。因此,在服务器上,当锁定时长通常超过514ms(在桌面系统上为85ms)时,基于事件的方法最有用。 - **临界区**——那些24字节的对象,允许对资源/对象进行同步独占访问——速度非常快。
EnterCriticalSection()
和LeaveCriticalSection()
只需要20个周期。我们将需要类似CSection
的东西来检查/修改我们RWLock的读写计数。尽管如此,24字节可能有点多……考虑到我们实际上可以用一个字节来做同样的事情,并且做得更快。x86 CPU的“xchg
”和“cmpxchg
”指令在这里非常有用。 - **其他同步**对象,如互斥体、信号量等,太慢了,不值得考虑使用:我们已经通过
SwitchToThread()
实现了快速的线程切换,并通过一个快速的锁来保护我们的numReader
和numWriter
。因此,我们开始基于RWLock的伪代码进行编码。
实现
typedef struct{ char locker1; // 0 for unlocked, 1 for locked char numWriters; // [0..1] short numReaders; // [0..65535] }RWLock; // 4 bytes void StartRead(){ do{ LockByte(); // get exclusive access to numReader // and numWriter via "locker1" if(numWriters==0){ numReaders++; UnlockByte(); // release "locker1" return; // we've gained read-only access } UnlockByte();// unsatisfactory conditions, SwitchThread();// so we'd best yield execution }while(1); }
**LockByte()
**在这里很有意思,因为它必须根据系统是单核还是多核进行不同的行为。LockByte()
-> UnlockByte()
块之间的代码通常只有几条指令,所以在多处理器系统上,我们只需要浪费几个额外的周期,如果第一次尝试失败,就重试以获得访问权。但是,如果我们对单处理器系统做同样的事情,我们将浪费30到120毫秒!所以,如果只有一个CPU,我们最好立即调用SwitchThread()
。另外,在多处理器系统上,如果一个线程在调用UnlockByte()
之前被切换出去,而下一个线程想要LockByte()
同一个对象……我们将浪费整个时间片的时间,因此需要一个自旋计数(类似于CriticalSections)。
**SwitchThread()
**也依赖于环境,因为我们在Win98上调用Sleep(0),在基于NT的Windows上调用SwitchToThread()。
接下来,我们根据伪代码实现StartWrite()
。
void StartWrite(){ do{ LockByte(); if(numWriters==0){ numWriters=1; // halt subsequent StartRead() calls if numReaders>0 do{ if(numReaders==0){ UnlockByte(); return; // we've gained read-write access } UnlockByte(); SwitchThread(); LockByte(); }while(1); } UnlockByte(); SwitchThread(); }while(1); }
我们可以让StartWrite()
允许创建比现有读者更多的读者,但这只会无限期地延迟写入操作。我们这样做可能永远无法开始写入。
EndRead()
和EndWrite()
要简单得多。
void EndRead(){ LockByte(); numReaders--; UnlockByte(); } void EndWrite(){ LockByte(); numWriters=0; UnlockByte(); } // Both procs can be unified in one, whose pseudocode would be: void EndAccess(){ LockByte(); if(numReaders)numReaders--; else numWriters=0; UnlockByte(); }
现在,我们有了访问锁定过程,但剩下的是Init()
和Free()
过程。特别是,我们应该注意到Free()
应该等待所有读者和写者离开我们的RWLock对象。代码
void Init(){ locker1=0; numReaders=0; numWriters=0; } void Free(){ do{ LockByte(); if(numReaders==0 && numWriters==0){ return; // !! leaves the object locked on purpose } UnlockByte(); SwitchThread(); }while(1); }
避免隐藏的风险
我们在Free()
之后将RWLock对象保持锁定状态,因为这样,在应用程序开发过程中,如果我们不恰当地使用RWLock对象,我们会注意到死锁。我们会在哪里出错?只有当我们对其他线程知道的、直接或间接的对象调用Free()
时。例如:
Dynamo* List1[10]={null}; class Dynamo{ RWLock locker; int ID; char* myData; Dynamo(){ myData = malloc(1000); // allocate memory for myData Init(&locker); ID = FindSuitableID_In_List1(); // returns 0..9 List1[ID] = this; // "registers" the Dynamo object } ~Dynamo(){ Free(&locker); free(myData); // release the memory, taken for myData List1[ID] = null; // "unregisters" the Dynamo object. Here's our error. } void SomeFunc(){ if(random(2)==0){ StartReading(&locker); [- read stuff from myData[] -] EndReading(&locker); }else{ StartWriting(&locker); [- write stuff into myData[] -] EndWriting(&locker); } } };
假设我们有一个多线程应用程序,并且有几个线程大部分时间都在迭代调用List1[iterator]->SomeFunc()
。如果我们删除了一个Dynamo
对象,并且在“List1[ID]=null
”之前,某个线程启动了该对象的SomeFunc()
,我们就麻烦了。为什么?因为这个Dynamo
对象及其“myData
”可能已不存在。相反,我们**必须**确保在Free(&locker)
之前“取消注册
”我们的Dynamo对象!并且,我们必须在调用free(myData)
之前使用Free(&locker)
。所以,正确的析构函数是:
~Dynamo(){ List1[ID] = null; // at this point several threads // could be using this Dynamo object Free(&locker); // wait for all threads to release this Dynamo object free(myData); // ok, we can now safely destroy stuff from this Dynamo object }
如果我们的“Init(&locker)
”行是在“List1[ID]=this
”之后,也会发生同样的麻烦。到目前为止一切顺利。但是,为了确保我们的应用程序完美稳定,我们还应该保护对List1[]
内容的访问。通过使用全局RWLock :)。否则,内存/应用程序缓存可能会给我们带来麻烦。即使是概率为1/(10^18)的微小风险也已得到处理,这使得软件稳定:最终让用户满意,让开发者更满意:)。
更改权限
到目前为止,实现似乎已经完成了……直到我们发现我们锁定的对象有某些操作会花费太长时间……让人想起高峰时段的交通堵塞。当我们需要对一个锁定的对象执行一个长的读+写操作时,最好是先获得只读访问权限,进行计算/分配,然后释放读访问权限,获得写访问权限,并修改该对象。当我们在从只读转换为读写时,我们的手指交叉,希望我们读取的数据在我们获得写访问权限之前没有被任何人修改。因此,我们需要一种方法让读者变成写者,同时又不将写访问权限授予其他人。因此,我们组合并随后实现以下伪代码:
void UpgradeToWriter(){ do{ LockByte(); if(numReaders==1 && numWriters==0){ numReaders=0; numWriters=1; UnlockByte(); return; } UnlockByte(); SwitchThread(); }while(1); } // and let's provide for completeness: void DowngradeToReader(){ LockByte(); numReaders=1; numWriters=0; UnlockByte(); }
多重锁定
如果你需要一次性写锁两个或多个对象,通常会这样做:
StartWrite(object1); StartWrite(object2); StartWrite(object3); //... now read+write from+to the three objects
但是,这样,当我们的应用程序等待获得对object1
的独占访问权时,其他线程的读者和写者可以锁定object2
和object3
。如果这是不可接受的,那么我们应该执行一个同时锁定对象的操作:
- 将对象标记为准备写入(这将阻止新的读者线程锁定
object2
和object3
), - 等待所有现有读锁被释放。这样我们可以节省多达(
NumObjects-1
)次上下文切换。
因此,我们组合并随后实现以下伪代码:
void MultiStartWrite(int NumObjects,Objects){ char credits[300]={2}; // each object starts with 2 credits. Credits = # of chores to do int TotalCredits = 2*NumObjects; // total number of chores to do while(TotalCredits){ for(int i=0;i<NumObjects;i++){ if(credits[i]){ // if this object still has something to be done on LockByte(); // this is Objects[i]->LockByte(); if(credits==2){ // if we've not marked this object for writing if(numWriters==0){ // if there's no other writer numWriters=1; // reserve this object for writing credits[i]=1; // this object has one more thing to do TotalCredits--; if(numReaders==0){ credits[i]=0; // this object is completely ready TotalCredits--; } } }else{ // credits=1 // we've marked the object for writing, now we only have // to wait for the readers to release it if(numReaders==0){ credits[i]=0; // this object is completely ready TotalCredits--; } } UnlockByte(); // this is Objects[i]->UnlockByte(); } } if(TotalCredits==0)return; // we're ready SwitchThread(); } } void MultiEndWrite(int NumObjects,Objects){ for(int i=0;i<NumObjects;i++){ LockByte(); numWriters=0; UnlockByte(); } }
已实现的(ASM)代码可以通过两种方式调用:
//------[ stack-based parameter-passing ]---------[ RWLock_MultiStartWrite(3,object1,object2,object3); ... // do stuff RWLock_MultiEndWrite(3,object1,object2,object3); //------------------------------------------------/ //or //------[ array-based parameter-passing ]-------[ RWLOCK* Array1[3]; ... // fill-in Array1 RWLock_MultiStartWrite(-3,Array1); ... // do stuff RWLock_MultiEndWrite(-3,Array1); //----------------------------------------------/
扩展视图
由于此RWLock对象仅在其结构中包含数字,而不包含任何Windows对象,因此可以将其与CreateFileMapping
+MapViewOfFile
结合使用,以创建**进程间**RWLock对象。由于RWLOCK对象**仅占用4个字节**,并且不分配任何额外内存,因此可以大大简化大型数据库式设计。可以轻松地在没有压力的情况下分配数千万到数亿个受RWLock保护的对象。对象也可以保存到磁盘,并稍后恢复。每个Start/End过程占用**30-40个周期**,为您提供了足够的CPU功率来用于您的多线程应用程序。
最后,我们完成了这个简单但快速的读者/写者锁的实现。我包含了ASM源代码、.h头文件和预编译的.lib文件。您可以立即在C/C++中使用该库。如果您想修改+重新编译ASM代码,您需要MASM:在您的Visual Studio文件夹中找到ml.exe,或者从这里获取MASM32包。检查您的ml.exe版本非常重要:8.x版本有极多的bug,请避免使用。6.1X版本是最佳选择。
历史
- 06.Feb.2007
- 首次发布。
- 06.Mar.2007
- 在RWLock.asm中修复了
cmpxchg
前缺少“lock”前缀; - 修复了
StartWrite()
中的死锁;