65.9K
CodeProject 正在变化。 阅读更多。
Home

基于堆或共享内存的互斥锁池管理器

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.41/5 (14投票s)

2006年9月24日

CPOL

3分钟阅读

viewsIcon

36458

downloadIcon

500

一篇关于基于堆或共享内存的互斥量池管理器的文章。

引言

本文开发了一个互斥量池管理器。该管理器用于管理具有大量互斥量的互斥量池。互斥量池基于指定的管理器名称参数,从堆或共享内存中分配。未命名的管理器从堆中分配池。具有未命名管理器的互斥量可以同步单个进程内的线程。另一方面,命名的管理器从共享内存中分配池。具有命名管理器的互斥量可以同步进程之间的线程。管理器可以根据运行时需求自动增长互斥量池。

池中的互斥量使用 Windows API InterlockedExchange 实现;仅在存在争用时才使用 Windows 事件对象来同步线程。

如果你有一个基于页面的内存缓冲区,页数巨大,并且每个页面都需要一个互斥量;或者如果你有一个内存中的哈希表,哈希桶的数量巨大,并且每个桶都需要一个互斥量,那么互斥量池管理器可能会非常适合你。

如何使用互斥量池管理器

第一步是创建一个互斥量池管理器对象。互斥量管理器类的构造函数如下

CMutexManager::CMutexManager(DWORD dwInitCount, DWORD dwIncCount, const wchar_t *pszName)

dwInitCount 指定管理器创建的初始互斥量计数。dwIncCount 指定用于扩展互斥量池的内存分配粒度。随着池的增长,内存以 dwIncCount 个互斥量为单位分配。pszName 指定管理器名称,限制为 128 个字符。名称比较区分大小写;如果该名称与现有命名互斥量管理器的名称匹配,则忽略 dwInitCountdwIncCount,因为它们已被最初创建管理器的进程设置。如果 pszNameNULL,则创建的互斥量管理器没有名称。具有命名管理器的互斥量可以同步进程之间的线程,具有未命名管理器的互斥量可以同步单个进程内的线程。

互斥量管理器公开了三个公共方法供应用程序调用,它们是

bool CMutexManager::CreateMutex(DWORD& dwIndex)

此方法从互斥量池创建一个互斥量,将互斥量引用计数设置为 1,并返回创建的互斥量的索引。每次创建互斥量时,线程都必须调用 CloseMutex

bool CMutexManager::CreateMutex(DWORD& dwIndex)
{
    //Check if the manager is initialized

    if (m_bInitialized == false)
    {
        return false;
    }

    //Lock the manager

    CAutoLock<CSysMutex> lockAuto(m_pSysMutex);

    //Get the shared manager header

    MutexSharedManagerHeader *pSharedHeader = 
                      m_sManagerHeader.pSharedHeader;
    //Check if there is free mutex

    if (pSharedHeader->dwMutexNextFree == INVALID_MUTEX_ID) 
    {
        //Check if we can add more mutex

        if (pSharedHeader->dwMutexCount == MAX_MUTEX_COUNT)
        {
            return false;
        }
        //Determine how many new mutex to add

        DWORD dwCount = MAX_MUTEX_COUNT - pSharedHeader->dwMutexCount;
        dwCount = (dwCount >= pSharedHeader->dwIncCount) ? 
                                pSharedHeader->dwIncCount : dwCount;
        //Create one more mutex block

        if (!CreateMutexBlock(dwCount, m_strName.c_str()))
        {
            return false;
        }
    }

    //Get the free mutex ID

    dwIndex = pSharedHeader->dwMutexNextFree;
    //Get the mutex address

    Mutex *pMutex = NULL;
    while (!GetMutex(dwIndex, &pMutex))
    {
        //Create more mutex block

        if (!CreateMutexBlock(pSharedHeader->dwIncCount, 
                              m_strName.c_str()))
        {
            return false;
        }
    }
    //Set the next free mutex index

    pSharedHeader->dwMutexNextFree = pMutex->dwMutexNextFree;
    //Decrease the free mutex count

    --pSharedHeader->dwMutexFree;
    //Increase the inuse mutex count

    ++pSharedHeader->dwMutexInUse;
    //Update the new max inuse mutex count

    if (pSharedHeader->dwMutexInUse > pSharedHeader->dwMutexInUseMax)
    {
        pSharedHeader->dwMutexInUseMax = pSharedHeader->dwMutexInUse;
    }

    //Set 0 to the mutex created

    memset(pMutex, 0, sizeof(pMutex));
    //Assign the index

    pMutex->dwIndex = dwIndex;
    //Increase the mutex reference count

    pMutex->dwMutexRefCount++;

    return true;
}

bool CMutexManager::OpenMutex(DWORD dwIndex)

此方法打开之前创建的互斥量,并将互斥量引用计数增加 1。每次打开互斥量时,线程都必须调用一次 CloseMutex

bool CMutexManager::OpenMutex(DWORD dwIndex)
{
    //Check if the manager is initialized

    if (m_bInitialized == false)
    {
        return false;
    }

    //Lock the manager

    CAutoLock<CSysMutex> lockAuto(m_pSysMutex);

    //Get the shared manager header

    MutexSharedManagerHeader *pSharedHeader = 
                        m_sManagerHeader.pSharedHeader;
    //Check if the mutex index is within the range

    if (dwIndex >= pSharedHeader->dwMutexCount)
    {
        return false;
    }

    Mutex *pMutex = NULL;
    //Get the mutex address

    while (!GetMutex(dwIndex, &pMutex))
    {
        //Create more mutex block

        if (!CreateMutexBlock(pSharedHeader->dwIncCount, 
                              m_strName.c_str()))
        {
            return false;
        }
    }

    //Check if the mutex has been created

    if (pMutex->dwMutexRefCount == 0)
    {
        return false;
    }
    //Increase the mutex reference count

    pMutex->dwMutexRefCount++;

    return true;
}

bool CMutexManager::CloseMutex(DWORD dwIndex)

对于之前创建或打开的互斥量,此方法将互斥量引用计数减少 1。如果引用计数达到 0,互斥量管理器会将互斥量标记为可用,并将其释放回互斥量池。每次创建或打开互斥量时,线程都必须调用一次 CloseMutex

bool CMutexManager::CloseMutex(DWORD dwIndex)
{
    //Check if the manager is initialized

    if (m_bInitialized == false)
    {
        return false;
    }

    //Lock the manager

    CAutoLock<CSysMutex> lockAuto(m_pSysMutex);

    //Get the shared manager header

    MutexSharedManagerHeader *pSharedHeader = 
                         m_sManagerHeader.pSharedHeader;
    //Check if the mutex index is within the range

    if (dwIndex >= pSharedHeader->dwMutexCount)
    {
        return false;
    }

    Mutex *pMutex = NULL;
    //Get the mutex address

    while (!GetMutex(dwIndex, &pMutex))
    {
        //Create more mutex block

        if (!CreateMutexBlock(pSharedHeader->dwIncCount, m_strName.c_str()))
        {
            return false;
        }
    }

    //Decrease the mutex reference count

    pMutex->dwMutexRefCount--;
    //If the reference count is 0, put the mutex back to the free list

    if (pMutex->dwMutexRefCount == 0)
    {
        pMutex->dwMutexNextFree = pSharedHeader->dwMutexNextFree;
        pSharedHeader->dwMutexNextFree = pMutex->dwIndex;
        //Increase the free mutex count

        ++pSharedHeader->dwMutexFree;
        //Decrease the in use mutex count

        --pSharedHeader->dwMutexInUse;
    }

    return true;
}

提供了辅助类 CMutex 来锁定和解锁互斥量。该类的构造函数如下

CMutex::CMutex(CMutexManager *pManager, DWORD dwIndex)

pManager 指定互斥量池管理器,dwIndex 指定互斥量索引。

该类公开了两个公共方法供应用程序调用,它们是 Lock()Unlock()

bool CMutex::Lock()

此方法等待指定互斥量对象的所有权。当调用线程被授予所有权时,该方法返回。线程拥有互斥量对象的所有权后,它可以多次调用 Lock() 而不会阻塞其执行。这可以防止线程在等待它已经拥有的互斥量时死锁。线程每次调用 Lock() 都必须调用一次 Unlock()

Lock() 的实现

bool CMutex::Lock()
{
    if ((m_pMutex == NULL) || (m_pManager == NULL))
    {
        return false;
    }

    //Get the process and thread IDs

    DWORD dwProcessId = GetCurrentProcessId();
    DWORD dwThreadId  = GetCurrentThreadId();
    //The event object

    CSysEvent *pEvent = NULL;

    while (true)
    {
        //Enter the critical section

        EnterCriticalSection();

        //Check if the mutex is locked

        if (m_pMutex->bLocked == false)
        {
            //Set the lock flag

            m_pMutex->bLocked = true;
            //Increase the lock reference count

            m_pMutex->dwLockRefCount++;
            //Assign the process and thread IDs

            m_pMutex->dwProcessId = dwProcessId;
            m_pMutex->dwThreadId  = dwThreadId;
            //Free the event

            delete pEvent;
            //Leave the critical section

            LeaveCriticalSection();
            return true;
        } 
        
        //Check if the same thread calls the lock again

        if ((m_pMutex->dwProcessId == dwProcessId) &&
            (m_pMutex->dwThreadId  == dwThreadId))
        {
            //Increase the lock reference count

            m_pMutex->dwLockRefCount++;
            //Free the event

            delete pEvent;
            //Leave the critical section

            LeaveCriticalSection();
            return true;
        }

        //Increase the waiter count

        m_pMutex->dwThreadsWaiting++;
        //Create the event if it's not created before, then wait on the event

        if (pEvent == NULL)
        {
            //Get the event name

            std::wstring strName;
            if (!GetEventName(strName))
            {
                //Leave the critical section

                LeaveCriticalSection();
                return false;
            }
            //Create the event

            pEvent = new (std::nothrow) CSysEvent(false, false, 
                                           strName.c_str(), NULL);
            if (pEvent == NULL)
            {
                //Leave the critical section

                LeaveCriticalSection();
                return false;
            }
        }

        //Leave the critical section

        LeaveCriticalSection();
        //Wait on the event

        if (!pEvent->Lock(INFINITE))
        {
            delete pEvent;
            return false; 
        }
    }

    //Free the event

    delete pEvent;
    //Leave the critical section

    LeaveCriticalSection();

    return true;
}

bool CMutex::Unlock()

此方法释放指定互斥量对象的所有权。线程使用 Lock 方法来获取互斥量对象的所有权。要释放其所有权,线程必须每次调用 Lock() 方法都调用一次 Unlock()

Unlock() 的实现

bool CMutex::Unlock()
{
    if ((m_pMutex == NULL) || (m_pManager == NULL))
    {
        return false;
    }

    //Enter the critical section

    EnterCriticalSection();

    //Get the process and thread IDs

    DWORD dwProcessId = GetCurrentProcessId();
    DWORD dwThreadId  = GetCurrentThreadId();

    //Check if the thread is the owner

    if ((m_pMutex->dwProcessId != dwProcessId) ||
        (m_pMutex->dwThreadId  != dwThreadId))
    {
        LeaveCriticalSection();
        return false;
    }

    //Check if the mutex is locked

    if (m_pMutex->bLocked == false)
    {
        LeaveCriticalSection();
        return false;
    }

    //Decrease the lock reference count

    m_pMutex->dwLockRefCount--;
    //Check if the lock reference count is not 0

    if (m_pMutex->dwLockRefCount > 0)
    {
        LeaveCriticalSection();
        return true;
    }

    //Release the lock

    m_pMutex->bLocked = false;
    //Reset the process and thread IDs

    m_pMutex->dwProcessId = 0;
    m_pMutex->dwThreadId  = 0;
   
    //Check if there is a thread waitting on the lock

    if (m_pMutex->dwThreadsWaiting > 0)
    {
        //Get the event name

        std::wstring strName;
        if (!GetEventName(strName))
        {
            LeaveCriticalSection();
            return false;
        }
        //Create the event

        CSysEvent *pEvent = new (std::nothrow)CSysEvent(false, false, 
                                 strName.c_str(), NULL);
        if (pEvent == NULL)
        {
            LeaveCriticalSection();
            return false;
        }
        //Set the event state to signaled

    if (!pEvent->SetEvent ())
        {
            LeaveCriticalSection();
            delete pEvent;
            return false;
        }
        //Free the event

    delete pEvent;
        //Decrease the waiter count

        m_pMutex->dwThreadsWaiting--;
    }
    //Leave the critical section

    LeaveCriticalSection();

    return true;
}

Lock()Unlock() 使用的自旋锁方法是

bool CMutex::EnterCriticalSection()

bool CMutex::EnterCriticalSection()
{
    if (m_pMutex == 0)
    {
        return false;
    }

    //Spin and get the spin lock. Each thread only owns the spin lock 

    //for a very short of period

    while (InterlockedExchange(&(m_pMutex->lSpinLock), 1) != 0)
    {
        //Yield execution to another thread that is ready to run on 

        //the current processor.

        SwitchToThread();
    }

    return true;
}

bool CMutex::LeaveCriticalSection()

bool CMutex::LeaveCriticalSection()
{
    if (m_pMutex == NULL)
    {
        return false;
    }
    //Rlease the spin lock

    InterlockedExchange(&(m_pMutex->lSpinLock), 0);

    return true;
}

参考文献

  • Dan Chou. "一种快速且通用的同步对象" (MSDN library, Technical articles)。
© . All rights reserved.