65.9K
CodeProject 正在变化。 阅读更多。
Home

创建跨平台线程无关事件循环

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.89/5 (5投票s)

2007年5月12日

6分钟阅读

viewsIcon

46720

downloadIcon

704

本文揭示了 Windows API GetMessage() 和 PostThreadMessage() 背后的机制,并使用基本的操作系统函数在 Linux 和 Windows 平台上实现了它们。

引言

事件驱动编程是一种计算机编程范例,其中程序的流程由用户操作(鼠标点击、按键)、其他硬件事件或来自其他程序的[1]消息决定。另一种是批处理编程,程序员编写固定的控制序列来决定程序的动作。在 Microsoft Windows 环境中,事件驱动程序包含一个与操作系统注册的事件循环。操作系统将描述事件的消息传递给程序;事件可能包括 GUI 操作,如鼠标和键盘点击、文件系统更改或计时器。对于每个不同的事件,程序都会执行一个事件处理程序或触发函数来处理它。事件驱动编程强调灵活性和异步性作为优点,并力求尽可能无模式。事件驱动编程是编写图形用户界面、嵌入式系统和传感器网络节点程序的流行模型。

背景

在 Windows 平台上,您可以使用 GetMessageDispatchMessage 函数为独立线程创建消息循环。GetMessage 函数从调用线程的消息队列中检索消息,并将其放入指定的结构中。此函数可以检索与指定窗口关联的消息以及通过 PostThreadMessage 函数发布的线程消息。GetMessage 会挂起线程,直到新消息放入线程的消息队列中才会返回。PeekMessage 是一个非阻塞替代方案,它仅检查队列中是否存在消息,并立即返回消息。DispatchMessage 函数将消息分派给窗口过程。

Windows 在这方面相当独特。Linux 和 UNIX 不提供这类 API 函数。

在本文中,我介绍了一种使用互斥体、事件/条件变量、线程、进程和消息队列数据结构等基本操作系统对象来创建跨平台、线程无关的消息循环的方法。

在文章末尾,我提供了一个示例,其中两个独立的播放器状态机基于相同的状态机配置文件在单独的线程上运行。

创建事件同步机制的一种方法

每个事件驱动程序都有一个循环,用于捕获接收到的事件并处理它们。事件可能由操作系统环境、其他进程或线程生成。在后面的代码中,我提出了一组 API 来构建一个满足三个要求的三种消息循环:消息循环应适用于线程无关程序,一个线程可以向消息队列发布消息,另一个线程可以读取消息,并且 API 必须是跨平台的,可以同时用于 Win32 和 Linux。

通常,在我的方法中,我为事件循环线程分配一个独立的消息池、一个事件/条件变量和一个互斥体。当新的外部事件发布时,将事件/条件变量设置为已通知状态。用于线程池的互斥体是为了同步事件发送线程和接收线程之间的消息池访问。

您可能会问:“当事件发布时计数器增加,事件消耗时计数器减少,为什么我不使用计数信号量而不是事件/条件变量?”这个问题有两个原因。信号量用于同步进程或线程,而事件/条件变量仅用于同步进程内的线程。事件/条件变量的性能优于信号量。信号量存在一个限制。在使用 CreateSemaphore() Windows 函数创建信号量时,您必须指定最大计数。在 Linux 平台上,信号量的最大计数为 SEMVMX(32767)。但是,事件在队列中的数量应该没有限制。

在我的方法中,事件同步机制的原型定义如下:

#if defined LINUX
 typedef pthread_t         XTHREADID;
 typedef pthread_t         XTHREADHANDLE;
 typedef pthread_mutex_t   XMUTEX;
 typedef pthread_cond_t    XEVENT;
#elif defined WIN32
 typedef DWORD             XTHREADID;
 typedef HANDLE            XTHREADHANDLE;
 typedef HANDLE            XMUTEX;
 typedef DWORD             XEVENT;
#endif
typedef int (*XTHREAD_SAFE_ACTION_T)(void*p);
typedef BOOL (*XIS_CONDITION_OK_T)(void*p);
int XCreateEvent(XEVENT *pEvent);
int XDestroyEvent(XEVENT *pEvent)
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MiliSec, 
        XIS_CONDITION_OK_T pIsConditionOK,  void *pCondParam,
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex, 
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)

XCreateEvent()

XCreateEvent() 函数创建一个虚拟事件,线程可以发布或等待该事件。在 Windows 平台上,此函数创建一个线程无关的消息队列,并返回一个 XSignalEvent() 函数用于发布消息的线程 ID。XSignalEvent() 将消息放入已识别线程的消息队列中。在 Linux 平台上,此函数创建一个 Linux 线程条件变量。当队列中有可用事件时,它将为 true,否则条件为 false。

XDestroyEvent()

XDestroyEvent() 函数释放事件队列的已分配资源。

XWaitForEvent()

XWaitForEvent() 函数等待事件,并在事件发生时采取操作。该操作实现为一个线程安全的由互斥体保护的回调函数。当队列中有可用事件时,条件将为 true。该操作可以是获取事件数据并将其从队列中移除的函数。

在 Windows 上,此函数调用 GetMessage() 来等待通过 Windows PostThreadMessage 函数发布到线程队列的消息。

在 Linux 上,此函数调用 pthread_cond_wait() 函数来自动解锁输入互斥体(根据 pthread_unlock_mutex),并等待条件变量被信号。线程执行被挂起,直到条件变量被信号,不会消耗任何 CPU 时间。输入互斥体在进入 pthread_cond_wait 时被调用线程锁定。

XSignalEvent()

XSignalEvent() 函数将事件发布到消息队列,然后调用输入参数中指定的函数。该函数可能会将事件数据发布到事件队列。在 Windows 上,此函数调用 PostThreadMessage 函数将 Windows 消息发布到线程消息队列。

在 Linux 上,此函数修改受互斥体保护的共享资源,满足条件并信号条件。

int XCreateEvent(XEVENT *pEvent)
{
    if (pEvent==NULL)
    return -1;
    #ifdef WIN32
    *pEvent = GetCurrentThreadId();
    return 0;
    #else
    return pthread_cond_init(pEvent, NULL);
    #endif
}
int XDestroyEvent(XEVENT *pEvent)
{
    if (pEvent==NULL)
    return -1;
    #ifdef WIN32
    return 0;
    #else
    return pthread_cond_destroy(pEvent);
    #endif
}
// Wait for an event signalled and then take some thread-safe actions.
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MilliSec, 
                XIS_CONDITION_OK_T pIsConditionOK, void *pCondParam,
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
    if (pEvent==NULL || pMutex==NULL || pIsConditionOK==NULL)
    return -1;
    #ifdef WIN32
    DWORD nRet=WAIT_OBJECT_0;

    MSG WinMsg;
    // GetMessage()
    // If the function retrieves a message other than WM_QUIT, 
    // the return value is nonzero.
    // If the function retrieves the WM_QUIT message, 
    // the return value is zero.
    while (GetMessage(&WinMsg, NULL, 0, 0))
    {
        if (WinMsg.message == WM_EXT_EVENT_ID)
        {
            // External event is triggered. App go to running state.
            if (pAction)
            {
                XMutexLock(pMutex);
                (*pAction)(pActionParam);
                XMutexUnlock(pMutex);
            }
            return 0;
        }
        // Handle Windows messages in MMI thread, for example audio messages.
        else //if (handle_win_msg_in_mmi_thread(&WinMsg) == FALSE)
        {
            DispatchMessage(&WinMsg);
        };
    }
    return 0;
    #else
    // pthread_cond_wait() atomically unlocks the mutex 
    // (as per pthread_unlock_mutex) and waits for the 
    // condition variable cond to be signalled. The thread
    // execution is suspended and does not consume any CPU time 
    // until the condition variable is signalled. 
    // The mutex must be locked by the calling thread on
    // entrance to pthread_cond_wait. Before returning to the calling thread, 
    // pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
    pthread_mutex_lock(pMutex);
    struct timespec timeout;
    time_t now;
    int rc=0;
    if (!(*pIsConditionOK)(pCondParam))
    {
        if (time(&now) < 0)
        return -1;
        timeout.tv_sec = now + (MilliSec / 1000);
        timeout.tv_nsec = 0;
        rc = pthread_cond_timedwait(pEvent, pMutex, &timeout);
    }
    if (rc != ETIMEDOUT)
    {
        if (pAction)
        (*pAction)(pActionParam);
    }
    pthread_mutex_unlock(pMutex);
    if (rc == ETIMEDOUT)
        return XWAIT_TIMEOUT; // Actually XWAIT_TIMEOUT is ETIMEDOUT
    else
        return rc;
#endif
}
// Take some thread-safe actions before signal the event.
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex, 
            XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
    int ret=0;
    if (pEvent==NULL || pMutex==NULL)
    return -1;
    #ifdef WIN32
    if (pAction)
    {
        XMutexLock(pMutex);
        (*pAction)(pActionParam); 
        // At this time, (*pIsConditionOK)() should return TRUE.
        XMutexUnlock(
        pMutex);
    }
    if (0==*pEvent) return -1;
        PostThreadMessage(*pEvent, WM_EXT_EVENT_ID, 0, 0);
        return 0;
    #else
    if (pMutex==NULL)
        return -1;
        pthread_mutex_lock(pMutex);
        // Modifications on the shared resources, 
        // which are protected by the mutex, 
        // meet the condition and should signal the if needed.
    if (pAction)
        (*pAction)(pActionParam);
        // pthread_cond_signal restarts one of the threads 
        // that are waiting on the condition variable condition.
        ret = pthread_cond_signal(pEvent);
        pthread_mutex_unlock(pMutex);
        return ret;
    #endif
}

线程无关事件队列创建和事件溢出预防

以下代码基于跨平台 API 创建一个线程无关的事件队列。每个线程都有一个独立的消息池、一个事件/条件变量和一个互斥体。
当新的外部事件发布时,事件/条件变量将被信号。用于线程池的互斥体是为了同步事件发送线程和接收线程之间的消息池访问。

有一个问题需要我们处理。如果事件处理速度慢于事件触发速度,事件缓冲区将溢出。例如,定时器事件,如果触发了过多的定时器事件而定时器处理程序无法及时处理它们,定时器事件将压倒其他更重要的事件。我们可以通过在将重复的定时器事件放入事件队列之前将其移除来解决这个问题。

typedef struct tagEXTMSG
{
    int nMsgID; // 0: stand for empty entity
    unsigned char nDataFormat ; 
    /* Flag for this event. 
    SME_EVENT_DATA_FORMAT_INT=0, SME_EVENT_DATA_FORMAT_PTR*/
    unsigned char nCategory ; /* Category of this event. */
    union SME_EVENT_DATA_T Data;
    SME_APP_T *pDestApp;
    unsigned long nSequenceNum;
    SME_THREAD_CONTEXT_T* pDestThread;
} X_EXT_MSG_T;
#define MSG_BUF_SIZE 100
typedef struct tagEXTMSGPOOL
{
    int nMsgBufHdr;
    int nMsgBufRear;
    X_EXT_MSG_T MsgBuf[MSG_BUF_SIZE];
    XEVENT EventToThread;
    XMUTEX MutextForPool;
} X_EXT_MSG_POOL_T;
/* Initialize the external event buffer at the current thread. */
BOOL XInitMsgBuf()
{
    SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    pThreadContext->pExtEventPool = (void*)malloc(sizeof(X_EXT_MSG_POOL_T));
    pMsgPool =(X_EXT_MSG_POOL_T*)(pThreadContext->pExtEventPool);
    if (NULL==pMsgPool)
        return FALSE;
        memset(pMsgPool, 0, sizeof(X_EXT_MSG_POOL_T));
        XCreateMutext(&(pMsgPool->MutextForPool));
        XCreateEvent(&(pMsgPool->EventToThread));
    return TRUE;
}
/* Free the external event buffer at the current thread. */
BOOL XFreeMsgBuf()
{
    SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
    if (NULL!=pThreadContext && NULL!=pThreadContext->pExtEventPool)
    {
        free(pThreadContext->pExtEventPool);
        pThreadContext->pExtEventPool= NULL;
        return TRUE;
    }
    return FALSE;
}
/* Is message available at the current thread event pool?*/
static BOOL XIsMsgAvailable(void *pArg)
{
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==p || NULL== p->pExtEventPool)
        return FALSE;
        SME_UNUSED_VOIDP_PARAM(pArg);
        pMsgPool = (X_EXT_MSG_POOL_T*)p->pExtEventPool;
        if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
            return FALSE; // empty buffer.
        return TRUE;
}
/* Thread-safe action to append an external event to the rear 
of the queue at the destination thread.
Timer event overflow prevention.
*/
static void XAppendMsgToBuf(void *pArg)
{
    X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
    int nHdr;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pMsg || NULL==pMsg->pDestThread || 
                NULL==pMsg->pDestThread->pExtEventPool)
    return;
    pMsgPool = (X_EXT_MSG_POOL_T*)(pMsg->pDestThread->pExtEventPool);

    if (((pMsgPool->nMsgBufRear+1) % MSG_BUF_SIZE) == pMsgPool->nMsgBufHdr)
        return; // buffer full.
        // Prevent duplicate SME_EVENT_TIMER event 
        // triggered by a timer in the queue.
        nHdr = pMsgPool->nMsgBufHdr;
        if (SME_EVENT_TIMER == pMsg->nMsgID)
        {
            while (nHdr != pMsgPool->nMsgBufRear)
            {
                if (SME_EVENT_TIMER == pMsgPool->MsgBuf[nHdr].nMsgID
                && pMsg->nSequenceNum == pMsgPool->MsgBuf[nHdr].nSequenceNum)
                return;
                nHdr = (nHdr+1) % MSG_BUF_SIZE;
            }
        }
    memcpy(&(pMsgPool->MsgBuf[pMsgPool->nMsgBufRear]),
                    pMsg,sizeof(X_EXT_MSG_T));
    pMsgPool->nMsgBufRear = (pMsgPool->nMsgBufRear+1)%MSG_BUF_SIZE;
}
/* Thread-safe action to remove an external event from the 
current thread event pool.*/
static void XGetMsgFromBuf(void *pArg)
{
    X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pMsg || NULL==p || NULL==p->pExtEventPool)
    return;
    pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
    if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
    return; // empty buffer.
    memcpy(pMsg,&(pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr]),
                        sizeof(X_EXT_MSG_T));
    pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr].nMsgID =0;
    pMsgPool->nMsgBufHdr = (pMsgPool->nMsgBufHdr+1)%MSG_BUF_SIZE;
}
int XPostThreadExtIntEvent(SME_THREAD_CONTEXT_T* pDestThreadContext, 
                    int nMsgID, int Param1, int Param2,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
    X_EXT_MSG_T Msg;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (nMsgID==0 || NULL== pDestThreadContext || 
                NULL==pDestThreadContext->pExtEventPool)
    return -1;
    Msg.nMsgID = nMsgID;
    Msg.pDestApp = pDestApp;
    Msg.pDestThread = pDestThreadContext;
    Msg.nSequenceNum = nSequenceNum;
    Msg.nDataFormat = SME_EVENT_DATA_FORMAT_INT;
    Msg.nCategory = nCategory;
    Msg.Data.Int.nParam1 = Param1;
    Msg.Data.Int.nParam2 = Param2;
    pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
    XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
                (XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
    return 0;
}
int XPostThreadExtPtrEvent(SME_THREAD_CONTEXT_T* pDestThreadContext, 
                int nMsgID, void *pData, int nDataSize,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
    X_EXT_MSG_T Msg;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (nMsgID==0 || pDestThreadContext==NULL || 
                NULL==pDestThreadContext->pExtEventPool)
    return -1;
    Msg.nMsgID = nMsgID;
    Msg.pDestApp = pDestApp;
    Msg.pDestThread = pDestThreadContext;
    Msg.nSequenceNum = nSequenceNum;
    Msg.nCategory = nCategory;
    Msg.nDataFormat = SME_EVENT_DATA_FORMAT_PTR;
    if (pData!=NULL && nDataSize>0)
    {
        #if SME_CPP
        Msg.Data.Ptr.pData = new char[nDataSize];
        #else
        Msg.Data.Ptr.pData = malloc(nDataSize);
        #endif
        memcpy(Msg.Data.Ptr.pData, pData, nDataSize);
        Msg.Data.Ptr.nSize = nDataSize;
    }
    else
    {
        Msg.Data.Ptr.pData = NULL;
        Msg.Data.Ptr.nSize = 0;
    }
    pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
    XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
                (XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
    return 0;
}
BOOL XGetExtEvent(SME_EVENT_T* pEvent)
{
    X_EXT_MSG_T NativeMsg;
    int i=0;
    int ret=0;
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pEvent || NULL==p || NULL==p->pExtEventPool)
    return FALSE;
    pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
    memset(&NativeMsg,0,sizeof(NativeMsg));
    while (TRUE)
    {
        while (i<10)
        {
            ret = XWaitForEvent(&(pMsgPool->EventToThread), 
            &(pMsgPool->MutextForPool), 10000, 
            (XIS_CODITION_OK_T)XIsMsgAvailable, NULL, 
            (XTHREAD_SAFE_ACTION_T)XGetMsgFromBuf,&NativeMsg);

            if (ret != XWAIT_TIMEOUT)
            break;
            i++;
        }
        if (ret == XWAIT_TIMEOUT)
            return FALSE;
        else
        {
            if (NativeMsg.nMsgID == SME_EVENT_EXIT_LOOP)
            {
                return FALSE; //Request Exit
            }
        #ifdef WIN32
        #else
        // Built-in call back timer on Linux
        else if (SME_EVENT_TIMER == NativeMsg.nMsgID && 
            SME_TIMER_CALLBACK == NativeMsg.Data.Int.nParam1)
        {
            // Invoke the call back function.
            SME_TIMER_PROC_T pfnCallback = (SME_TIMER_PROC_T)
                        (NativeMsg.Data.Int.nParam2);
            (*pfnCallback)(NativeMsg.pDestApp, NativeMsg.nSequenceNum);
        }
        #endif
        else
        {
            // Translate the native message to SME event.
            memset(pEvent,0,sizeof(SME_EVENT_T));
            pEvent->nEventID = NativeMsg.nMsgID;
            pEvent->pDestApp = NativeMsg.pDestApp;
            pEvent->nSequenceNum = NativeMsg.nSequenceNum;
            pEvent->nDataFormat = NativeMsg.nDataFormat;
            pEvent->nCategory = NativeMsg.nCategory;
            pEvent->bIsConsumed = FALSE;
            memcpy(&(pEvent->Data),&(NativeMsg.Data), 
                    sizeof(union SME_EVENT_DATA_T));
        }
        //printf("External message received. \n");
        return TRUE;
    }
}; // while (TRUE)
}
BOOL XDelExtEvent(SME_EVENT_T *pEvent)
{
    if (0==pEvent)
    return FALSE;
    if (pEvent->nDataFormat == SME_EVENT_DATA_FORMAT_PTR)
    {
        if (pEvent->Data.Ptr.pData)
        {
            #if SME_CPP
            delete pEvent->Data.Ptr.pData;
            #else
            free(pEvent->Data.Ptr.pData);
            #endif
            pEvent->Data.Ptr.pData=NULL;
        }
    }
    return TRUE;
}

示例:两个线程无关的状态机

基于此方法,我们提供了一个示例,其中两个独立的播放器状态机基于相同的状态机配置文件在单独的线程上运行。

此示例中有 3 个运行线程:用于 Player-1 应用程序的状态机应用程序线程-1,用于 Player-2 应用程序的线程-2,以及外部事件触发线程。Player-1 和 Player-2 应用程序是基于 Player 状态机配置文件创建的,它们独立运行。Player-1 或 Player-2 都有一个独立的控制面板。

关注点

源代码中包含一组跨平台的与操作系统相关的函数,包括

  • 进程管理
  • 线程管理
  • 互斥体
  • 时钟
  • 内置计时器
  • 线程局部存储

更多信息

您可以在 http://www.intelliwizard.com/ 下载更多信息,这是 UML StateWizard 开源项目的官方网站,遵循 LGPL 许可。

[1] http://en.wikipedia.org/wiki/Event_driven

© . All rights reserved.