65.9K
CodeProject 正在变化。 阅读更多。
Home

快速读/写锁

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.70/5 (20投票s)

2007 年 2 月 26 日

CPOL

7分钟阅读

viewsIcon

99508

downloadIcon

945

对读者/写者锁同步对象的优化实现。

引言

读者/写者锁(RWLock)是一种用于在多线程软件中同步对资源/对象的访问的机制。更多信息可以在以下文章中找到:

背景

读者/写者锁的实现需要谨慎,因为如果处理不当,它可能会降低多线程应用程序/服务器的整体性能和稳定性。幸运的是,x86 CPU具有专门的指令,可以在多处理器系统上实现完美的同步,这使得完全稳定的RWLock实现成为可能。由于这只会带来好处,我们将使用汇编语言来完成整个实现,将其打包成一个.lib文件,并在我们的C/C++软件中使用它。

但是,仅仅用汇编编写并不是一个完整的解决方案——我们必须了解Windows是如何工作的,以及它实际上工作得有多快。我将使用运行在Sempron3000+上的Windows 2000 SP4进行基准测试。

首先,**上下文切换**。普遍认为它非常慢……但事实并非如此。实际的数字是:如果没有其他线程活动,需要322个周期;当有一个其他线程活动时,需要587个周期;当有2-80个活动线程时,需要600-700个周期。这个基准测试在运行530-610个线程(来自27-36个进程)且CPU负载为10-30%的情况下进行了多次。因此,如果我们的应用程序与另一个应用程序切换,而该应用程序只是简单地切回,那么只会有1175个周期过去。此外,切换到另一个进程的线程只需要额外的十几个周期。

应用程序可以通过SwitchToThread() API强制进行上下文切换。然而,此函数在Win98上不可用(只提供了一个存根),因此我们必须使用Sleep(0)。但是,Sleep(0)会忽略比当前线程优先级高的线程,因此我们应该避免在基于NT的Windows(Win2K、XP、Server2K3、Vista)上使用它。因此,我们的代码应该在Windows 98上调用Sleep(0),在较新的操作系统上调用SwitchToThread()

  • **内核事件**通常用于通知某个对象/数据已准备就绪:在这里,我们可以将它们用作通知RWLock对象已准备好进行读取或写入。Quynh Nguyen已经在这里介绍了这样一种方法。在那里,需要通知的线程调用CreateEvent(...)并开始等待,通过WaitForSingleObject(...)。从那时起,线程调度程序将忽略该线程,直到指定的事件被触发。但是,CreateEvent(...)本身需要1500-1900个周期,这相当于三次线程切换的时间。SetEvent(),用于触发事件,至少需要600个周期。因此,基于事件的方法适用于锁定时长(例如,独占写入)至少比系统的定时器周期(通常为15ms)长 (1900+600)/700*Quantum/3 倍的情况。“Quantum”在桌面系统上为6,在服务器上最多为36。因此,在服务器上,当锁定时长通常超过514ms(在桌面系统上为85ms)时,基于事件的方法最有用。
  • **临界区**——那些24字节的对象,允许对资源/对象进行同步独占访问——速度非常快。EnterCriticalSection()LeaveCriticalSection()只需要20个周期。我们将需要类似CSection的东西来检查/修改我们RWLock的读写计数。尽管如此,24字节可能有点多……考虑到我们实际上可以用一个字节来做同样的事情,并且做得更快。x86 CPU的“xchg”和“cmpxchg”指令在这里非常有用。
  • **其他同步**对象,如互斥体、信号量等,太慢了,不值得考虑使用:我们已经通过SwitchToThread()实现了快速的线程切换,并通过一个快速的锁来保护我们的numReadernumWriter。因此,我们开始基于RWLock的伪代码进行编码。

实现

typedef struct{
    char  locker1; // 0 for unlocked, 1 for locked

    char  numWriters; // [0..1]

    short numReaders; // [0..65535]

}RWLock; // 4 bytes


void StartRead(){
    do{
        LockByte();
        // get exclusive access to numReader
        // and numWriter via "locker1"

        if(numWriters==0){
            numReaders++;
            UnlockByte(); // release "locker1"

            return; // we've gained read-only access

        }
        UnlockByte();// unsatisfactory conditions,

        SwitchThread();// so we'd best yield execution

    }while(1);
}

**LockByte()**在这里很有意思,因为它必须根据系统是单核还是多核进行不同的行为。LockByte() -> UnlockByte()块之间的代码通常只有几条指令,所以在多处理器系统上,我们只需要浪费几个额外的周期,如果第一次尝试失败,就重试以获得访问权。但是,如果我们对单处理器系统做同样的事情,我们将浪费30到120毫秒!所以,如果只有一个CPU,我们最好立即调用SwitchThread()。另外,在多处理器系统上,如果一个线程在调用UnlockByte()之前被切换出去,而下一个线程想要LockByte()同一个对象……我们将浪费整个时间片的时间,因此需要一个自旋计数(类似于CriticalSections)。

**SwitchThread()**也依赖于环境,因为我们在Win98上调用Sleep(0),在基于NT的Windows上调用SwitchToThread()。

接下来,我们根据伪代码实现StartWrite()

void StartWrite(){
    do{
        LockByte();
        if(numWriters==0){
            numWriters=1; // halt subsequent StartRead() calls if numReaders>0

            do{
                if(numReaders==0){
                    UnlockByte();
                    return; // we've gained read-write access

                }
                UnlockByte();
                SwitchThread();
                LockByte();
            }while(1);
        }
        UnlockByte();
        SwitchThread();
    }while(1);
}

我们可以让StartWrite()允许创建比现有读者更多的读者,但这只会无限期地延迟写入操作。我们这样做可能永远无法开始写入。

EndRead()EndWrite()要简单得多。

void EndRead(){
    LockByte();
    numReaders--;
    UnlockByte();
}
void EndWrite(){
    LockByte();
    numWriters=0;
    UnlockByte();
}
// Both procs can be unified in one, whose pseudocode would be:

void EndAccess(){
    LockByte();
    if(numReaders)numReaders--;
    else numWriters=0;
    UnlockByte();
}

现在,我们有了访问锁定过程,但剩下的是Init()Free()过程。特别是,我们应该注意到Free()应该等待所有读者和写者离开我们的RWLock对象。代码

void Init(){
    locker1=0;
    numReaders=0;
    numWriters=0;
}
void Free(){
    do{
        LockByte();
        if(numReaders==0 && numWriters==0){
            return; // !! leaves the object locked on purpose

        }
        UnlockByte();
        SwitchThread();
    }while(1);
}

避免隐藏的风险

我们在Free()之后将RWLock对象保持锁定状态,因为这样,在应用程序开发过程中,如果我们不恰当地使用RWLock对象,我们会注意到死锁。我们会在哪里出错?只有当我们对其他线程知道的、直接或间接的对象调用Free()时。例如:

Dynamo* List1[10]={null};

class Dynamo{
    RWLock locker;
    int ID;
    char* myData;
    
    Dynamo(){
        myData = malloc(1000); // allocate memory for myData

        Init(&locker);
        ID = FindSuitableID_In_List1(); // returns 0..9

        List1[ID] = this; // "registers" the Dynamo object

    }
    ~Dynamo(){
        Free(&locker); 
        free(myData); // release the memory, taken for myData

        List1[ID] = null;
        // "unregisters" the Dynamo object. Here's our error.

    }    
    
    void SomeFunc(){
        if(random(2)==0){
            StartReading(&locker);
            [- read stuff from myData[] -]
            EndReading(&locker);
        }else{
            StartWriting(&locker);
            [- write stuff into myData[] -]
            EndWriting(&locker);
        }
    }
};

假设我们有一个多线程应用程序,并且有几个线程大部分时间都在迭代调用List1[iterator]->SomeFunc()。如果我们删除了一个Dynamo对象,并且在“List1[ID]=null”之前,某个线程启动了该对象的SomeFunc(),我们就麻烦了。为什么?因为这个Dynamo对象及其“myData”可能已不存在。相反,我们**必须**确保在Free(&locker)之前“取消注册”我们的Dynamo对象!并且,我们必须在调用free(myData)之前使用Free(&locker)。所以,正确的析构函数是:

    ~Dynamo(){
        List1[ID] = null;
        // at this point several threads
        // could be using this Dynamo object

        Free(&locker);
        // wait for all threads to release this Dynamo object

        free(myData);
        // ok, we can now safely destroy stuff from this Dynamo object
    }

如果我们的“Init(&locker)”行是在“List1[ID]=this”之后,也会发生同样的麻烦。到目前为止一切顺利。但是,为了确保我们的应用程序完美稳定,我们还应该保护对List1[]内容的访问。通过使用全局RWLock :)。否则,内存/应用程序缓存可能会给我们带来麻烦。即使是概率为1/(10^18)的微小风险也已得到处理,这使得软件稳定:最终让用户满意,让开发者更满意:)。

更改权限

到目前为止,实现似乎已经完成了……直到我们发现我们锁定的对象有某些操作会花费太长时间……让人想起高峰时段的交通堵塞。当我们需要对一个锁定的对象执行一个长的读+写操作时,最好是先获得只读访问权限,进行计算/分配,然后释放读访问权限,获得写访问权限,并修改该对象。当我们在从只读转换为读写时,我们的手指交叉,希望我们读取的数据在我们获得写访问权限之前没有被任何人修改。因此,我们需要一种方法让读者变成写者,同时又不将写访问权限授予其他人。因此,我们组合并随后实现以下伪代码:

void UpgradeToWriter(){
    do{
        LockByte();
        if(numReaders==1 && numWriters==0){
            numReaders=0;
            numWriters=1;
            UnlockByte();
            return;
        }
        UnlockByte();
        SwitchThread();
    }while(1);
}

// and let's provide for completeness:

void DowngradeToReader(){
    LockByte();
    numReaders=1;
    numWriters=0;
    UnlockByte();
}

多重锁定

如果你需要一次性写锁两个或多个对象,通常会这样做:

StartWrite(object1); 
StartWrite(object2);
StartWrite(object3);
//... now read+write from+to the three objects

但是,这样,当我们的应用程序等待获得对object1的独占访问权时,其他线程的读者和写者可以锁定object2object3。如果这是不可接受的,那么我们应该执行一个同时锁定对象的操作:

  1. 将对象标记为准备写入(这将阻止新的读者线程锁定object2object3),
  2. 等待所有现有读锁被释放。这样我们可以节省多达(NumObjects-1)次上下文切换。

因此,我们组合并随后实现以下伪代码:

void MultiStartWrite(int NumObjects,Objects){
    char credits[300]={2};
    // each object starts with 2 credits. Credits = # of chores to do

    int TotalCredits = 2*NumObjects; // total number of chores to do

    
    while(TotalCredits){
        for(int i=0;i<NumObjects;i++){
            if(credits[i]){
            // if this object still has something to be done on

                LockByte(); // this is Objects[i]->LockByte();

                if(credits==2){
                // if we've not marked this object for writing

                    if(numWriters==0){
                    // if there's no other writer

                        numWriters=1; // reserve this object for writing

                        credits[i]=1; // this object has one more thing to do

                        TotalCredits--;
                        if(numReaders==0){
                            credits[i]=0; // this object is completely ready

                            TotalCredits--;
                        }
                    }
                }else{ // credits=1

                    // we've marked the object for writing, now we only have

                    // to wait for the readers to release it

                    if(numReaders==0){
                        credits[i]=0; // this object is completely ready

                        TotalCredits--;
                    }
                }
                UnlockByte(); // this is Objects[i]->UnlockByte();

            }
        }
        if(TotalCredits==0)return; // we're ready

        SwitchThread();
    }
}
void MultiEndWrite(int NumObjects,Objects){
    for(int i=0;i<NumObjects;i++){
        LockByte();
        numWriters=0;
        UnlockByte();
    }
}

已实现的(ASM)代码可以通过两种方式调用:

//------[ stack-based parameter-passing ]---------[

RWLock_MultiStartWrite(3,object1,object2,object3);
... // do stuff

RWLock_MultiEndWrite(3,object1,object2,object3);
//------------------------------------------------/


//or

//------[ array-based parameter-passing ]-------[

RWLOCK* Array1[3];
... // fill-in Array1


RWLock_MultiStartWrite(-3,Array1);
... // do stuff

RWLock_MultiEndWrite(-3,Array1);
//----------------------------------------------/

扩展视图

由于此RWLock对象仅在其结构中包含数字,而不包含任何Windows对象,因此可以将其与CreateFileMapping+MapViewOfFile结合使用,以创建**进程间**RWLock对象。由于RWLOCK对象**仅占用4个字节**,并且不分配任何额外内存,因此可以大大简化大型数据库式设计。可以轻松地在没有压力的情况下分配数千万到数亿个受RWLock保护的对象。对象也可以保存到磁盘,并稍后恢复。每个Start/End过程占用**30-40个周期**,为您提供了足够的CPU功率来用于您的多线程应用程序。

最后,我们完成了这个简单但快速的读者/写者锁的实现。我包含了ASM源代码、.h头文件和预编译的.lib文件。您可以立即在C/C++中使用该库。如果您想修改+重新编译ASM代码,您需要MASM:在您的Visual Studio文件夹中找到ml.exe,或者从这里获取MASM32包。检查您的ml.exe版本非常重要:8.x版本有极多的bug,请避免使用。6.1X版本是最佳选择。

历史

  • 06.Feb.2007
    • 首次发布。
  • 06.Mar.2007
    • RWLock.asm中修复了cmpxchg前缺少“lock”前缀;
    • 修复了StartWrite()中的死锁;
© . All rights reserved.