65.9K
CodeProject 正在变化。 阅读更多。
Home

WaitForMultipleObjects API 的 C++ 包装器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.75/5 (21投票s)

2013年12月18日

CPOL

9分钟阅读

viewsIcon

50706

downloadIcon

1174

描述了一个C++类,该类封装了WaitForMultipleObjects API的使用模式,使其易于与C++对象集成。

引言

WaitForMultipleObjects Win32 API是一个强大的构造,可用于编写非常高效且易于维护的多线程应用程序。该API允许您使用单个工作线程来实现由独立执行逻辑单元组成的复杂软件,该线程可以检测并处理多个等待句柄到已通知状态的转换,而不是为每个句柄生成单独的工作线程。然而,将此API与C++对象结合需要一些粘合代码,以便将已通知状态的检测转换为相关的C++对象方法调用。在本文中,我将展示一个基类如何抽象和隐藏此逻辑,并为客户端类提供一个自然的C++外观,以便轻松地将等待句柄与特定的C++对象的特定方法关联起来。

背景 

尽管WaitForMultipleObjects提供了一种有效的机制来编写高效的多线程程序,但涉及API调用和处理其返回值以采取适当操作的相关代码通常在多个模块中重复出现。除了涉及重复代码之外,这种方法不是很灵活,因为向现有列表添加或删除等待句柄可能需要重新排列现有代码。此外,处理API调用和返回值处理的代码与程序的函数逻辑混合在一起,降低了代码的可读性,从而降低了其可维护性。典型的实现通常如下所示:

class MyDaemon {
public:
    MyDaemon()
    {}
    void Start() {
        // initialize resources and spawn the worker thread
    }
    void Stop() {
        // set shutdown event and wait on worker thread
    }
private:
    void HandleEventResource() { // functional logic
    }
    void HandleSemaphoreResource() { // functional logic
    }
    // worker thread 
    unsigned int ThreadProc() {
        bool fMore = true;
        HANDLE aHandles[4] = {0}
        aHandle[0] = m_hShutdown;
        aHandle[1] = m_hEvent;
        aHandle[2] = m_hSemaphore;
        do {
            DWORD dwRet = ::WaitForMultipleObjects(
                    _countof(aHandle), aHandle, FALSE, INFINITE, TRUE);
            switch (dwRet) {
            case WAIT_OBJECT_0:	// m_hShutdown signalled
                fMore = false;
                break;
            case WAIT_OBJECT_0+1: // m_hEvent signalled
                HandleEventResource();
                break;
            case WAIT_OBJECT_0+2: // m_hSemaphore released
                HandleSemaphoreResource();
                break;
            default:
                fMore = false;	// error
                break;
            }
        } while (fMore);
        return 0;
    }
    static unsigned int __stdcall _ThreadProc(void* p) {
        _ASSERTE(p != NULL);
        return reinterpret_cast<WFMOHandler*>(p)->ThreadProc();
    }
private:
    HANDLE m_hShutdown; // thread shutdown event
    HANDLE m_hEvent;
    HANDLE m_hSemaphore;
};

在这里,WaitForMultipleObjects API调用和对其返回值的处理与MyDaemon类的函数逻辑混合在一起,该类构成了应用程序的主逻辑。如果需要监视更多等待句柄,则必须修改工作线程体以容纳附加句柄并处理它们各自的返回值。随着句柄的不断增加,switch-case体语句会越来越大,从而使其更难维护,并增加了因输入拼写错误而导致意外错误的几率。

通常采用的典型解决方案是使用一个表(std::map)来存储要等待的句柄及其对应的处理程序类方法指针。例如:

// in the class declaration
typedef void (MyDaemon ::*PFNHANDLER)();
typedef std::map<HANDLE, PFNHANDLER> HANDLERTABLE;
HANDLERTABLE m_handlers;

// in the thread body
std::vector<HANDLE> handles;
handles.resize(m_handlers.size());
int i=0;
for (auto it=m_handlers.begin(); it!=m_handlers.end(); it++)
	handles[i++] = it->first;
DWORD dwRet = ::WaitForMultipleObjectsEx(
                  handles.size(), &handles[0], FALSE, INFINITE, TRUE);
if (dwRet >= WAIT_OBJECT_0 && dwRet < (WAIT_OBJECT_0+handles.size())) {
	(*this.*m_handlers[handles[dwRet-WAIT_OBJECT_0]](); // invoke the handler
}

尽管这有助于避免冗长的switch-case体和硬编码返回值处理,但它也带来了一些限制。所有处理程序方法都必须具有相同的类型签名,并且处理程序必须是同一类的方法。您可以通过使用纯C++函数指针来解决此问题,但是然后要将其转换为C++对象的调用,则需要在需要此类转换的地方提供一个thunking代码。

在中大型软件中,这种代码模式会在使用API的多个模块中重复出现。

我称之为WaitForMultipleObjects粘合逻辑的正是这种模式(本质上是工作线程体),它被分解为一个可以轻松重用的独立类。不用说,将此代码抽象为一个独立的类带来了模块化的所有经典好处——使应用程序代码更清晰,从而提高了代码的可读性和可维护性,并且对基类代码的任何增强都会立即反映到其所有客户端类中。

Using the Code

WaitForMultipleObjects API调用及相关逻辑封装在WFMOHandler类中,该类全部包含在wfmohandler.h中。没有对应的CPP文件。该类有一个内部工作线程,该线程在WaitForMultipleObjects上阻塞,等待已注册的任何等待句柄被发出信号。当一个被发出信号时,相关的处理程序将被调用。

要使用,请声明您的类从WFMOFHandler派生,并注册您想跟踪其状态的等待句柄,如下面的示例所示。注册通过AddWaitHandle方法完成。此方法接受两个参数——等待句柄及其在句柄被发出信号时要调用的处理程序函数。处理程序函数必须作为函数对象提供。您可以提供显式创建的函数对象,也可以使用STL的std::bind()便利设施即时组合一个。下面的示例显示了这两种用法。

#include "wfmohandler.h"

// A simple class that implements an asynchronous 'recv' UDP socket.
// Socket binds to loopback address!
class AsyncSocket {
    USHORT m_port;
    WSAEVENT m_event;
    SOCKET m_socket;
public:
    AsyncSocket(USHORT port)
        : m_port(port)
        , m_event(::WSACreateEvent())
        , m_socket(::WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0))
    {
        // bind the socket
        struct sockaddr_in sin = {0};
        sin.sin_family = AF_INET;
        sin.sin_port = ::htons(port);
        sin.sin_addr.s_addr = ::inet_addr("127.0.0.1");
        if (m_event != NULL && m_socket != INVALID_SOCKET 
            && ::bind(m_socket, reinterpret_cast<const sockaddr*>(&sin), sizeof(sin)) == 0) {
            // put it in 'async' mode
            if (::WSAEventSelect(m_socket, m_event, FD_READ) == 0)
                return;
        }
        std::cerr << "Error initializing AsyncSocket, error code: " << WSAGetLastError() << std::endl;
        // something went wrong, release resources and raise an exception
        if (m_event != NULL) ::WSACloseEvent(m_event);
        if (m_socket != INVALID_SOCKET) ::closesocket(m_socket);
        throw std::exception("socket creation error");
    }
    ~AsyncSocket()
    {
        ::closesocket(m_socket);
        ::WSACloseEvent(m_event);
    }
    // for direct access to the embedded event handle
    operator HANDLE() { return m_event; }
    // Read all incoming packets in the socket's recv buffer. When all the packets 
    // in the buffer have been read, resets the associated Win32 event preparing it
    // for subsequent signalling when a new packet is copied into the buffer.
    void ReadIncomingPacket()
    {
        std::vector<char> buf(64*1024);
        struct sockaddr_in from = {0};
        int fromlen = sizeof(from);
        int cbRecd = ::recvfrom(m_socket, &buf[0], buf.size(), 
            0, reinterpret_cast<sockaddr*>(&from), &fromlen);
        if (cbRecd > 0) {
            std::cerr << cbRecd << " bytes received on port " << m_port << std::endl;
        } else {
            int rc = ::WSAGetLastError();
            if (rc == WSAEWOULDBLOCK) {
                // no more data, reset the event so that WaitForMult..will block on it
                ::WSAResetEvent(m_event);
            } else {
                // something else went wrong
                std::cerr << "Error receiving data from port " << m_port 
                      << ", error code: " << ::WSAGetLastError() << std::endl;
            }
        }
    }
};

class MyDaemon : public WFMOHandler {
    AsyncSocket m_socket1;
    AsyncSocket m_socket2;
    AsyncSocket m_socket3;
    class AnotherEventHandler { // an explicit functor
        AsyncSocket& m_socket;
    public:
        AnotherEventHandler(AsyncSocket& sock) : m_socket(sock) {}
        void operator()() {
            m_socket.ReadIncomingPacket(); // handle incoming socket data
        }
    };
    AnotherEventHandler m_aeh;
public:
    MyDaemon() 
        : WFMOHandler()
        , m_socket1(5000)
        , m_socket2(6000)
        , m_socket3(7000)
        , m_aeh(m_socket3)
    {
        // setup two handlers on the two AsyncSockets that we created
        WFMOHandler::AddWaitHandle(m_socket1, 
            std::bind(&AsyncSocket::ReadIncomingPacket, &m_socket1));
        WFMOHandler::AddWaitHandle(m_socket2, 
            std::bind(&AsyncSocket::ReadIncomingPacket, &m_socket2));
        WFMOHandler::AddWaitHandle(m_socket3, m_aeh);
    }
    virtual ~MyDaemon()
    {
        Stop();
        // just being graceful, WFMOHandler dtor will cleanup anyways 
        WFMOHandler::RemoveWaitHandle(m_socket2);
        WFMOHandler::RemoveWaitHandle(m_socket1);
    }
};

在上面的示例中,与m_socket1m_socket2关联的Win32事件直接由AsyncSocket方法ReadIncomingPacket处理。此方法使用std::bind()转换为函数对象。但是,m_socket3由显式定义函数对象AnotherEventHandler处理。

RemoveWaitHandle是与AddWaitHandle对应的成员函数,它允许您删除已注册的等待句柄。这两个方法提供了一个真正动态的WaitForMultipleObjects等待循环,可以根据需要注册和注销句柄。当然,考虑到WaitForMultipleObjects可以跟踪的句柄数量有限,对此有一个上限(这将在下面的“要点”部分更详细地讨论)。

WFMOHandler提供了两个附加方法——Start()Stop()——分别启动工作线程并关闭它。Stop()应在程序结束时调用,或者每当WFMOHandler对象需要被反初始化时调用。如果未显式调用Stop(),它将从WFMOHandler析构函数中调用。下面的代码是main()的摘录,显示了如何使用Start()使守护进程运行。

需要强调的是,AddWaitHandleRemoveWaitHandle仅更新用于保存等待句柄及其关联函数对象处理程序的内部数据结构。更新等待数组的实际任务由工作线程执行(Add/Remove例程会向工作线程发出信号)。这对RemoveWaitHandle有重要影响。用户可能会在RemoveWaitHandle调用返回后立即尝试从客户端代码中释放等待句柄。这不可避免地会导致异常,因为工作线程将(与其他句柄一起)阻塞在此句柄上,而您正试图关闭它。因此,为了提供对句柄资源的优雅释放,WFMOHandler通过虚拟函数OnWaitHandleRemoved提供了一个回调,当工作线程实际从其等待数组中移除句柄时,将调用该回调。客户端可以覆盖此方法以关闭句柄并释放任何关联资源。请注意,此方法是从WFMOHandler内部工作线程的上下文中调用的。因此,如果释放句柄还涉及更新客户端类数据成员,则必须小心与系统中其他线程进行同步。

try {
    MyDaemon md;
    md.Start();
    std::cout << "Daemon started, press Ctrl+C to stop." << std::endl;
    ::WaitForSingleObject(__hStopEvent, INFINITE);
} catch (std::exception e) {
    std::cerr << "std::exception: " << e.what() << std::endl;
} catch (...) {
    std::cerr << "Unknown exception" << std::endl;
}

关注点

在幕后,WFMOHandler使用函数和类模板的组合来实现其看似简单的接口。

首先,AddWaitHandle是一个函数模板,它使用传递给它的类型进行实例化,即函数对象的类型。其次,此函数对象类型在AddWaitHandle实现中使用,以从类模板WaitHandlerWFMOHandler的内部类)生成具体的类型。由于WFMOHandler需要统一处理所有WaitHandler类模板的实例化,因此WaitHandler类模板派生自WaitHandlerBaseWaitHandlerBase存储等待句柄并具有一个纯虚函数——invoke()。等待句柄成员用于生成传递给API的等待句柄数组,并在相应句柄被发出信号时调用invoke()。由于invoke()WaitHandler中被重写,而WaitHandler又调用传递给AddWaitHandle的函数对象参数,因此控制权被转移到客户端代码。

使用函数对象而不是具有固定类型签名的函数/方法指针提供了极大的灵活性。对可以绑定为等待句柄处理程序的函数或其参数的类型几乎没有任何限制。这些函数可以来自任何类,而不仅仅是WFMOHandler的子类。上面的示例演示了这一点——套接字事件处理程序设置为AsyncSocket::ReadIncomingPacket

此外,同一类型的多个实例可以被轻松管理,因为每个std::bind()调用都会生成一个新的函数对象。同样,示例代码中m_socket1m_socket2AsyncSocket::ReadIncomingPackets处理,也演示了这一点。最后,using std::bind允许您将其他参数绑定到处理程序方法调用。

注意事项

有一个警告是,在此实现中,传递给WaitForMultipleObjects的数组中同步句柄的顺序是任意的。这是因为在内部,WFMOHandler使用std::map来存储由等待句柄索引的处理程序方法指针,然后枚举这些指针来构建等待数组。因此,顺序实际上取决于句柄的数值。但是,如果需要控制这一点,代码可以进行调整以适应。要么使用不同的STL集合,这样调用AddWaitHandle的顺序就成为等待句柄的自然顺序,要么扩展AddWaitHandle以包含一个指示句柄优先级的附加参数。此句柄优先级可用于控制传递给API的等待数组中句柄的顺序。

另一个警告是,编译此代码需要VC++2010(或更高版本),其中包含支持可变数量参数的std::bind()。使用旧版编译器仍然可行,但您将被限制为只能绑定一个参数的处理程序方法,方法是使用std::bind1ststd::bind2nd。但是,如果您可以链接到boost库,它提供了一个std::bind()的替代品,该替代品支持从接受多个参数的方法组合函数对象,并且可以与VC++2008等旧编译器一起使用。我没有尝试过使用boost版本的std::bind()的代码,但它应该可以工作。

最后,尽管WaitForMultipleObjects可以接受一个包含64个句柄的数组,但在WFMOHandler实现中,您最多只能使用62个句柄。这是因为前两个句柄内部用于两个事件句柄——一个用于发出线程关闭信号,另一个用于重建等待句柄数组。通过采用不同的机制来发出这两个任务的信号,可以将这两个句柄合并成一个事件句柄,但就我而言,支持62个等待句柄已经绰绰有余。

希望您觉得这个有用。我曾在一个项目上工作,其中有4个不同的模块使用了WaitForMultipleObjects,我已成功将其中三个迁移到使用这个基类。负责维护代码的其他程序员们高兴极了,因为这种模式为整体代码带来了很多清晰度。即使他们不理解std::bind()是如何神奇地导致回调到客户端对象的函数中的,他们也可以完成他们的工作,因为应用程序的功能代码与线程/信号机制整齐地分开了。

在另一篇文章中,我将展示如何将其扩展以支持Win32定时器例程,从而消除了使用嵌入的Sleep()调用或连续轮询来执行周期性任务的额外工作线程的需要。

历史

2013年12月10日 - 初始发布

© . All rights reserved.