65.9K
CodeProject 正在变化。 阅读更多。
Home

C++ std::thread 事件循环,带消息队列和定时器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.78/5 (37投票s)

2017年2月5日

CPOL

4分钟阅读

viewsIcon

193805

downloadIcon

4464

使用 C++11 线程支持库创建一个带有事件循环、消息队列和定时器的工作线程

引言

事件循环,有时也称为消息循环,是一个等待和分发传入事件的线程。该线程会阻塞等待请求到达,然后将事件分发给事件处理函数。消息队列通常由循环用于保存传入的消息。每条消息都会被顺序地出队、解码,然后执行相应的操作。事件循环是实现进程间通信的一种方式。

所有操作系统都支持多线程应用程序。每个操作系统都有创建线程、消息队列和定时器的独特函数调用。随着 C++11 线程支持库的出现,现在可以创建可移植的代码,并避免使用特定于操作系统的函数调用。本文提供了一个简单的示例,说明如何在仅依赖 C++ 标准库的情况下创建线程事件循环、消息队列和定时器服务。任何支持线程库的 C++11 编译器都应该能够编译附带的源代码。

使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。

查看 GitHub 以获取最新源代码

背景

通常,我需要一个线程作为事件循环运行。传入的消息由线程出队,数据根据唯一的 message ID 分发到适当的函数处理程序。能够调用函数的定时器支持对于低速轮询或在预期时间内未发生某事时生成超时非常有用。很多时候,工作线程在启动时创建,直到应用程序终止才被销毁。

实现的一个关键要求是,传入的消息必须在同一个线程实例上执行。而例如 std::async 可能会使用线程池中的临时线程,这个类确保所有传入的消息都使用同一个线程。例如,一个子系统可以用非线程安全的代码来实现。使用单个 WorkerThread 实例可以安全地将函数调用分派到子系统中。

乍一看,C++ 线程支持似乎缺少一些关键功能。是的,std::thread 可用于启动一个线程,但没有线程安全的队列,也没有定时器——这些是大多数操作系统提供的服务。我将展示如何使用 C++ 标准库来创建这些“缺失”的功能,并提供一个许多程序员熟悉的事件处理循环。

WorkerThread

WorkerThread 类封装了所有必要的事件循环机制。一个简单的类接口允许线程创建、向事件循环发送消息以及最终的线程终止。接口如下所示:

class WorkerThread
{
public:
    /// Constructor
    WorkerThread(const char* threadName);

    /// Destructor
    ~WorkerThread();

    /// Called once to create the worker thread
    /// @return True if thread is created. False otherwise. 
    bool CreateThread();

    /// Called once a program exit to exit the worker thread
    void ExitThread();

    /// Get the ID of this thread instance
    /// @return The worker thread ID
    std::thread::id GetThreadId();

    /// Get the ID of the currently executing thread
    /// @return The current thread ID
    static std::thread::id GetCurrentThreadId();

    /// Add a message to the thread queue
    /// @param[in] data - thread specific message information
    void PostMsg(std::shared_ptr<UserData> msg);

private:
    WorkerThread(const WorkerThread&) = delete;
    WorkerThread& operator=(const WorkerThread&) = delete;

    /// Entry point for the worker thread
    void Process();

    /// Entry point for timer thread
    void TimerThread();

    std::unique_ptr<std::thread> m_thread;
    std::queue<std::shared_ptr<ThreadMsg>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::atomic<bool> m_timerExit;
    const char* THREAD_NAME;
};

首先要注意的是,std::thread 用于创建主工作线程。主工作线程函数是 Process()

bool WorkerThread::CreateThread()
{
    if (!m_thread)
        m_thread = new thread(&WorkerThread::Process, this);
    return true;
}

事件循环

下面是 Process() 事件循环。该线程依赖于 std::queue<ThreadMsg*> 作为消息队列。std::queue 不是线程安全的,因此所有对队列的访问都必须受到互斥锁的保护。std::condition_variable 用于挂起线程,直到收到通知表明队列中已添加新消息。

void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

    while (1)
    {
        std::shared_ptr<ThreadMsg> msg;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->id)
        {
            case MSG_POST_USER_DATA:
            {
                ASSERT_TRUE(msg->msg != NULL);

                auto userData = std::static_pointer_cast<UserData>(msg->msg);
                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                break;
            }

            case MSG_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case MSG_EXIT_THREAD:
            {
                m_timerExit = true;
                timerThread.join();
                return;
            }

            default:
                ASSERT();
        }
    }
}

PostMsg() 在堆上创建一个新的 ThreadMsg,将消息添加到队列,然后使用条件变量通知工作线程。

void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));

    // Add user data msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

循环将继续处理消息,直到收到 MSG_EXIT_THREAD 并退出线程。

void WorkerThread::ExitThread()
{
    if (!m_thread)
        return;

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));

    // Put exit thread message into the queue
    {
        lock_guard<mutex> lock(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }

    m_thread->join();
    m_thread = nullptr;
}

事件循环 (Win32)

下面的代码片段将上面的 std::thread 事件循环与使用 Windows API 的类似的 Win32 版本进行对比。请注意,GetMessage() API 被用于替代 std::queue。消息使用 PostThreadMessage() 发送给操作系统消息队列。最后,使用 timerSetEvent()WM_USER_TIMER 消息放入队列。所有这些服务都由操作系统提供。本文介绍的 std::thread WorkerThread 实现避免了原始的操作系统调用,但其功能与 Win32 版本相同,并且仅依赖于 C++ 标准库。

unsigned long WorkerThread::Process(void* parameter)
{
    MSG msg;
    BOOL bRet;

    // Start periodic timer
    MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired, 
                       reinterpret_cast<DWORD>(this), TIME_PERIODIC);

    while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
    {
        switch (msg.message)
        {
            case WM_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg.wParam != NULL);

                // Convert the ThreadMsg void* data back to a UserData*
                const UserData* userData = static_cast<const UserData*>(msg.wParam);

                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                // Delete dynamic data passed through message queue
                delete userData;
                break;
            }

            case WM_USER_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case WM_EXIT_THREAD:
                timeKillEvent(timerId);
                return 0;

            default:
                ASSERT();
        }
    }
    return 0;
}

定时器

一个低分辨率的周期性定时器消息被插入到队列中,使用一个次要的私有线程。定时器线程在 Process() 内部创建。

void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

...

定时器线程的唯一职责是每 250ms 插入一条 MSG_TIMER 消息。在这个实现中,没有机制可以防止定时器线程向队列注入多条定时器消息。如果工作线程落后,无法足够快地处理消息队列,就可能发生这种情况。根据工作线程的处理负载以及定时器消息的插入速度,可以采用额外的逻辑来防止队列泛滥。

void WorkerThread::TimerThread()
{
    while (!m_timerExit)
    {
        // Sleep for 250mS then put a MSG_TIMER into the message queue
        std::this_thread::sleep_for(250ms);

        std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));

        // Add timer msg to queue and notify worker thread
        std::unique_lock<std::mutex> lk(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }
}

用法

下面的 main() 函数展示了如何使用 WorkerThread 类。创建了两个工作线程,并向每个线程发送了一条消息。短暂延迟后,两个线程都退出。

// Worker thread instances
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");

int main(void)
{    
    // Create worker threads
    workerThread1.CreateThread();
    workerThread2.CreateThread();

    // Create message to send to worker thread 1
    std::shared_ptr<UserData> userData1(new UserData());
    userData1->msg = "Hello world";
    userData1->year = 2017;

    // Post the message to worker thread 1
    workerThread1.PostMsg(userData1);

    // Create message to send to worker thread 2
    std::shared_ptr<UserData> userData2(new UserData());
    userData2->msg = "Goodbye world";
    userData2->year = 2017;

    // Post the message to worker thread 2
    workerThread2.PostMsg(userData2);

    // Give time for messages processing on worker threads
    this_thread::sleep_for(1s);

    workerThread1.ExitThread();
    workerThread2.ExitThread();

    return 0;
}

结论

C++ 线程支持库提供了一种平台独立的方式来编写多线程应用程序代码,而无需依赖于特定于操作系统的 API。本文介绍的 WorkerThread 类是一个简陋的事件循环实现,但所有的基础都已具备,可以在此基础上进行扩展。

历史

  • 2017年2月5日
    • 首次发布
  • 2017年2月7日
    • 更新文章,对事件循环进行澄清,并与 Win32 事件循环进行对比。
  • 2020年9月13日
    • 对实现进行了小的现代化更新以简化。新的源代码和文章更新。 
© . All rights reserved.