65.9K
CodeProject 正在变化。 阅读更多。
Home

使用实际示例理解 C++20 协程、可等待对象和 co_await 运算符

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.33/5 (2投票s)

2020年12月3日

CPOL

11分钟阅读

viewsIcon

15888

C++, C++20 协程, 可等待对象, co_await, 跨平台开发和非阻塞套接字通信

引言

C++20 引入了一系列新功能,这些功能将通过使用现代 C++ 改变我们设计和编写应用程序的方式。人们认为协程是 C++20 新功能中最重要的一个。

C++20 协程涉及许多新概念,例如可等待对象、任务、promise 类型、co_awaitco_yieldco_return 等等。然而,在这个早期阶段,网络上还没有太多可用于学习的实际示例。

UDAParts 开发了一个强大而安全的通信框架 SocketPro,该框架采用连续的内联请求/结果批处理和实时流处理能力,通过异步数据传输和并行计算实现最佳的网络效率、开发简洁性、性能、可伸缩性,以及许多伟大甚至独特的功能,可在 GitHub 站点 找到。该框架充分支持现代语言开发功能,如匿名函数、闭包、Lambda 表达式、async/await、future、promise、yield 等。它已经集成了 C++20 协程功能,并提供了许多用于客户端/服务器通信、数据库访问、客户端和服务器之间文件交换以及持久化消息队列的示例。我们相信这些示例一定能帮助您理解 C++20 协程。

这篇短文将涉及除 co_yieldco_return 之外的所有概念。但是,它将主要关注可等待对象、任务和 co_await。此外,本文附带一个真实的演示示例,演示了在客户端和服务器之间进行流式传输请求和响应,以实现最佳的网络效率。提供的示例代码以及结尾列出的其他代码都可以使用 GCC 10.0.1 和 Visual C++ 2017 或更高版本进行编译。

准备工作

首先,请在 GitHub 站点 克隆 SocketPro 包。原始示例代码位于文件 ../socketpro/tutorials/cplusplus/hello_world/client/cw_hw.cpp 中。接下来,导航到 SocketPro 开发索引文件 ../socketpro/doc/dev_guide.htm

强烈建议按顺序阅读以下短文

  1. SocketPro 通信框架基础知识
  2. CUQueue 和不同开发语言之间的兼容性
  3. 开始使用 SocketPro
  4. C/C++ (C++11 或更高版本)

第三篇文章将指导您分发所有必需的组件。具体来说,我们将使用示例服务器应用程序 all_servers 进行接下来的测试。此时,在正确分发这些组件后,运行它并在端口 20901 上侦听,作为远程 SocketPro 服务器。

主要测试代码

下面的代码片段 1 显示了与 C++20 协程任务和可等待对象队列相关的主要单元测试代码。首先,需要引用代码片段顶部所示的 C++20 协程头文件 coroutine

#if __has_include(<coroutine>)
#include <coroutine> //GCC 10.0.1 or later
                     //Visual C++ 2019 16.8.0 preview 3.1 or later
#elif __has_include(<experimental/coroutine>)
#include <experimental/coroutine> //Visual C++ 2017 & 2019 16.8.0 before
#else
static_assert(false, "No co_await support");
#endif

#include <iostream>
#include <deque>
#include "../../uqueue_demo/mystruct.h"
#include "HW.h"

using namespace std;
using namespace SPA;
using namespace SPA::ClientSide;

CMyStruct ms0;
typedef CSocketPool<HelloWorld> CMyPool;

deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
    // ......
}

CAwTask MyTest(CMyPool::PHandler& hw) {
    // ......
}

//compile options
//Visual C++ 2017 & 2019 16.8.0 before -- /await
//Visual C++ 2019 16.8.0 preview 3.1 or later -- /std:c++latest
//GCC 10.0.1 or later -- -std=c++20 -fcoroutines -ldl -lstdc++ -pthread
//GCC 11 or clang 14 -- -std=c++20 -ldl -lstdc++ -pthread
int main(int argc, char* argv[]) {
    CMyPool spHw;
    CConnectionContext cc("localhost", 20901, L"MyUserId", L"MyPassword");
    //spHw.SetQueueName("qhw");
    if (!spHw.StartSocketPool(cc, 1)) {//create a pool with one session 
                                       //within one worker thread
        wcout << "No connection to remote hello world server\n";
    }
    else {
        auto hw = spHw.Seek(); //find an async handler from a socket pool
        SetMyStruct(ms0);
        MyTest(hw); //call a C++20 test coroutine
    }

    wcout << L"Press a key to kill the demo ......\n";
    ::getchar();
    return 0;
}
代码片段 1:涉及 C++20 协程任务和可等待对象队列的主要单元测试代码

稍后我们将重点关注 CreateAwaitablesMyTest 这两个函数。第一个函数将返回一个包含 C++20 可等待对象 RWaiter<wstring> 数组的 deque 实例。第二个函数将返回一个 C++20 任务 CAwTask,其功能与 .NET、JavaScript 和 Python 等其他开发语言中的关键字 async 非常相似。稍后我们将讨论它们。

现在,让我们转到 main 函数。在该函数内部,我们首先创建一个套接字池,其中包含一个非阻塞套接字会话,该会话托管在一个工作线程中。请注意,套接字池可以包含任意数量的工作线程。每个工作线程都可以托管任意数量的不同远程服务器的非阻塞套接字会话。在这里,为了代码清晰,我们在演示中使用一个工作线程托管的非阻塞套接字会话。实际上,建议在大多数情况下,套接字池有一个工作线程。

最后,我们调用 MyTest 方法来演示 C++20 coroutineco_wait。您可以编译 main 函数之前的注释掉的编译选项的小代码片段,并将此示例客户端应用程序与上述服务器 all_servers 进行测试。

剖析 MyTest, CAwTask 和 CreateAwaitables

首先,下面的代码片段 2 中的 MyTest 方法是一个 C++20 协程,它始终返回 CAwTask 的实例。在功能上,CAwTask 与 .NET、JavaScript 和 Python 等其他语言中的关键字 async 相似。然而,我们在 C++20 中可以自定义 CAwTask 类及其内部类 promise_type,如顶部所示。在大多数情况下,此类定义将满足您的需求,无需任何修改。有关详细信息,您可以参考 本文。请注意第 4 行和第 10 行的注释。一旦调用协程,就会通过调用 promise_type::get_return_object 方法创建 CAwTask 的实例。最后,当 C++20 协程 MyTest 即将退出时,将调用 promise_type::final_suspend 方法。建议您设置调试断点并逐步执行以获得更好的理解。

struct CAwTask {
    struct promise_type {
        CAwTask get_return_object() {
            return { }; //line 4 -- called once a coroutine is going to be called
        }
        std::suspend_never initial_suspend() {
            return { };
        }
        std::suspend_never final_suspend() {
            return { }; //line 10 -- called when a coroutine is about to exit
        }
        void return_void() {
        }
        void unhandled_exception() {
        }
    };
};

deque<RWaiter<wstring>> CreateAwaitables(CMyPool::PHandler& hw) {
    auto aw0 = hw->wait_send<wstring>(idSayHello, L"John", L"Dole"); //line 20
    auto aw1 = hw->wait_send<wstring>(idSayHello, L"Hillary", L"Clinton");
    auto aw2 = hw->wait_send<wstring>(idSayHello, L"Donald", L"Trump");
    auto aw3 = hw->wait_send<wstring>(idSayHello, L"Joe", L"Biden");
    auto aw4 = hw->wait_send<wstring>(idSayHello, L"Mike", L"Pence"); //line 24
    //auto aw4 = hw->wait_send<wstring>(idSayHello, L"", L"Pence"); //line 25
    return {aw0, aw1, aw2, aw3, aw4}; //line 26
}
CAwTask MyTest(CMyPool::PHandler& hw) {
    try {
        //requests/results streamed with inline batching
        auto qWaiter = CreateAwaitables(hw); //line 31
        BWaiter ws = hw->wait_sendRequest(idSleep, (int)5000); //line 32
        RWaiter<CMyStruct> wms = hw->wait_send<CMyStruct>(idEcho, ms0); //line 33

        //co_await for all results
        while(qWaiter.size()) { //line 36
            wcout << co_await qWaiter.front() << "\n";
            qWaiter.pop_front();
        }
        wcout << "Waiting sleep ......\n";
        CScopeUQueue sb = co_await ws;
        //sleep request returns nothing
        assert(sb->GetSize() == 0);
        CMyStruct ms = co_await wms; //line 44
        wcout << "(ms == ms0): " << ((ms == ms0) ? 1 : 0)
            << "\nAll requests processed\n";
    }
    catch (CServerError& ex) { //line 48
        wcout << ex.ToString() << "\n";
    }
    catch (CSocketError& ex) {
        wcout << ex.ToString() << "\n";
    }
    catch (exception& ex) {
        wcout << "Unexpected error: " << ex.what() << "\n";
    } //line 56
}
代码片段 2:用于演示 C++20 协程任务和可等待对象的 CreateAwaitables 和 MyTest

在这里,我们使用第 31 行的 CreateAwaitables 方法创建五个可等待对象(aw0aw1aw2aw3aw4),如第 20 至 26 行所示。每个可等待对象对应一个从客户端到服务器的请求。我们将它们放入 deque 容器以备后用。此外,我们通过向服务器发送两个请求来获取另外两个可等待对象,用于处理。所有七个请求和预期响应都已流式传输。SocketPro 内部使用其内联批处理算法在客户端和服务器两端对这些请求和响应数据进行批处理。SocketPro 通过设计充分支持请求和响应的流式传输,以实现最佳的网络效率,这极大地提高了应用程序的性能和可伸缩性。我们的研究结果表明,性能改进可能很容易从局域网的 90% 到广域网的 30000%。此功能是 SocketPro 的亮点之一。您很难在其他框架中找到此出色功能。

收集完所有可等待对象后,我们从第 36 行到 44 行开始 co_await 它们。请注意,所有 RWaiter<wstring>BWaiterRWaiter<CMyStruct> 可等待对象都是可复制和可移动的。您可以将它们放入标准库容器,如 vectordequestackmap 等。

在 C++20 协程函数 MyTest 内部,您可以使用 try/catch 进行异常处理,如第 48 行到 56 行所示。错误 CServerError 来自远程 SocketPro 服务器的异常。例如,如果您取消注释第 25 行但注释掉第 24 行,您将收到此类异常。或者,您可以通过为第 32 行的第二个输入提供负值而不是正数 5000 来获得 CServerError 异常。在底层套接字关闭或请求被取消的情况下,SocketPro 客户端适配器还会引发 CSocketError 通信错误。要测试 CSocketError 异常,您可以在此客户端应用程序运行后,粗暴地终止测试示例服务器应用程序 all_servers

揭秘 RWaiter/wait_send 和 BWaiter/wait_sendRequest

下面的代码片段 3 显示了两个可等待类 RWaiterBWaiter 的定义,分别位于第 2 行和第 31 行。第一个是模板类,它继承自基类可等待类 CWaiterBase<R>,而第二个是常规类,它继承自 CWaiterBase<CScopeUQueue>。这里,模板参数 RCScopeUQueue 分别表示预期的返回数据和来自远程 SocketPro 服务器的字节数组。请注意,字节数组将用于稍后反序列化零个、一个或多个不同类型的数据。

template<typename R>
struct RWaiter : public CWaiterBase<R> { //line 2
    RWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
        const unsigned char* pBuffer, unsigned int size)
    : CWaiterBase<R>(reqId) {
        auto& wc = this->m_wc;
        if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
                try {
                    ar >> wc->m_r; //unpack ar buffer into m_r (R) directly
                } catch (...) { //de-serialization or other errors
                    wc->m_ex = std::current_exception();
                }
                //resume coroutine from a socket pool worker thread
                wc->resume();
            }, this->get_aborted(), this->get_se())) {
            //throw CSocketError exception if socket already closed
            ash->raise(reqId);
        }
    }
};

// ......

template<typename R, typename ... Ts>
RWaiter<R> wait_send(unsigned short reqId, const Ts& ... args) {
    CScopeUQueue sb;
    sb->Save(args ...);
    return RWaiter<R>(this, reqId, sb->GetBuffer(), sb->GetSize());
}

struct BWaiter : public CWaiterBase<CScopeUQueue> { //line 31
    BWaiter(CAsyncServiceHandler* ash, unsigned short reqId,
        const unsigned char* pBuffer, unsigned int size)
    : CWaiterBase<CScopeUQueue>(reqId) {
        auto& wc = m_wc;
        if (!ash->SendRequest(reqId, pBuffer, size, [wc](CAsyncResult & ar) {
                //move server returned buffer from ar into m_r (CScopeUQueue)
                wc->m_r->Swap(ar.UQueue);
                //resume coroutine from a socket pool worker thread
                wc->resume();
            }, get_aborted(), get_se())) {
            //throw CSocketError exception if socket already closed
            ash->raise(reqId);
        }
    }
};

// ......

template<typename ... Ts>
BWaiter wait_sendRequest(unsigned short reqId, const Ts& ... args) {
    CScopeUQueue sb;
    sb->Save(args ...);
    return BWaiter(this, reqId, sb->GetBuffer(), sb->GetSize());
}
代码片段 3:解码 RWaiter/wait_send 和 BWaiter/wait_sendRequest

在查看了两个可等待类的构造函数后,您会发现 SocketPro 总是使用请求 ID reqId、给定长度的字节数组 pBuffer 以及三个回调(Lambda 表达式 [wc](CAsyncResult & ar) {......}get_abortedget_se)从客户端向服务器发送请求。第一个 Lambda 表达式回调用于监视来自服务器的返回结果。在回调中,处理完服务器返回的结果后,必须调用 resume 方法(wc->resume())来恢复协程,如注释所示。第二个回调 get_aborted 用于监视两个事件:请求取消和套接字会话关闭。最后一个回调 get_se 用于跟踪来自远程服务器的异常错误。简而言之,这三个回调涵盖了所有通信错误和可能的返回结果,包括预期的结果和来自服务器的异常错误。我们将在接下来的代码片段 4 中讨论这两个回调 get_abortedget_se

在结束本节之前,值得注意的是,所有三个回调始终在套接字池工作线程中调用。您可以使用两个模板方法 wait_sendwait_sendRequest 将任何类型的请求发送到远程 SocketPro 服务器。它们将立即返回 C++20 可等待对象,而无需等待服务器响应,稍后将通过 co_awaited 来获取任何类型的预期结果或可能的不同类型的异常。

解码模板类 CWaiterBase

下面的代码片段 4 显示了位于文件 ../socketpro/include/aclientw.h 中的 SocketPro C++20 可等待对象的实现。首先,请看第 78 行到 110 行的代码,它通过调用 get_aboredget_se 方法生成两个回调 DDiscardedDServerException。如上一个代码片段 3 所示,它们也被重用于 SocketPro remoting 文件(../socketpro/include/streamingfile.h)、服务器持久化消息队列(../socketpro/include/aqhandler.h)和数据库句柄(../socketpro/include/udb_client.h)中。请注意,生成的两个回调将 *始终* 被套接字池工作线程调用。此外,C++20 协程句柄将 *始终* 在设置异常后在末尾被调用以恢复,如第 84 行和第 106 行所示。

typedef std::coroutine_handle<> CRHandle;

template<typename R>
struct CWaiterBase {

    struct CWaiterContext {

        CWaiterContext(unsigned short reqId)
        : m_done(false), m_reqId(reqId) {
        }

        CWaiterContext(const CWaiterContext& wc) = delete;
        CWaiterContext(CWaiterContext&& wc) = delete;
        CWaiterContext& operator=(const CWaiterContext& wc) = delete;
        CWaiterContext& operator=(CWaiterContext&& wc) = delete;

        bool await_ready() noexcept {
            CSpinAutoLock al(m_cs);
            return m_done;
        }

        //always call this method from pool worker thread
        void resume() noexcept {
            CSpinAutoLock al(m_cs);
            if (!m_done) {
                m_done = true;
                if (m_rh) {
                    m_rh.resume(); //line 28
                }
            }
        }

        unsigned short get_id() {
            return m_reqId;
        }

        R m_r;
        std::exception_ptr m_ex;

    private:
        bool await_suspend(CRHandle rh) noexcept {
            CSpinAutoLock al(m_cs);
            if (!m_done) {
                m_rh = rh; //line 44
                return true; //will resume from worker thread
            }
            return false; //resume immediately
        }
        CSpinLock m_cs;
        bool m_done; //protected by m_cs
        CRHandle m_rh; //protected by m_cs
        unsigned short m_reqId;
        friend struct CWaiterBase;
    };

    CWaiterBase(unsigned short reqId)
    : m_wc(new CWaiterContext(reqId)) {
    }

    bool await_suspend(CRHandle rh) noexcept {
        return m_wc->await_suspend(rh);
    }

    //R should support moveable (preferred) or copy constructor
    R&& await_resume() {
        if (m_wc->m_ex) {
            std::rethrow_exception(m_wc->m_ex); //line 67
        }
        return std::move(m_wc->m_r);
    }

    bool await_ready() noexcept {
        return m_wc->await_ready();
    }

protected:

    DServerException get_se() noexcept { //line 78
        auto& wc = m_wc;
        return [wc](CAsyncServiceHandler* ash, unsigned short reqId, const
            wchar_t* errMsg, const char* errWhere, unsigned int errCode) {
            wc->m_ex = std::make_exception_ptr(
                CServerError(errCode, errMsg, errWhere, reqId));
            wc->resume(); //line 84
        };
    }

    DDiscarded get_aborted() noexcept {
        auto& wc = m_wc;
        return [wc](CAsyncServiceHandler* h, bool canceled) {
            if (canceled) {
                wc->m_ex = std::make_exception_ptr(CSocketError(
                   REQUEST_CANCELED, REQUEST_CANCELED_ERR_MSG, wc->get_id(), false));
            } else {
                CClientSocket* cs = h->GetSocket();
                int ec = cs->GetErrorCode();
                if (ec) {
                    std::string em = cs->GetErrorMsg();
                    wc->m_ex = std::make_exception_ptr(
                  CSocketError(ec,Utilities::ToWide(em).c_str(),wc->get_id(),false));
                } else {
                 wc->m_ex =std::make_exception_ptr(CSocketError(SESSION_CLOSED_AFTER,
                    SESSION_CLOSED_AFTER_ERR_MSG, wc->get_id(), false));
                }
            }
            wc->resume(); //line 106
        };
    }

    std::shared_ptr<CWaiterContext> m_wc; //line 110
};
代码片段 4:SocketPro C++20 基类可等待对象的实现

C++20 可等待类必须实现三个必需的方法:await_readyawait_suspendawait_resume,它们由 co_await 运算符使用。当 co_await 一个可等待对象时,首先调用 await_ready 方法来检查预期的结果是否已可用。如果此方法返回 true,则表示结果确实可用,co_awaiting 将立即调用 await_resume 方法返回结果,而无需创建协程句柄或调用 await_suspend 方法。这种情况在 SocketPro 通信框架中可能经常发生,特别是当流式传输多个请求时,如本示例所示。相反,如果 await_ready 方法在大多数情况下返回 false,则表示结果当前不可用,co_awaiting 将创建一个协程句柄(恢复点)并调用 await_suspend 方法。

此处的 await_suspend 方法返回一个 bool 值。此外,该方法还可以定义为返回 void 或协程句柄。如果在大多数情况下,该方法返回 truevoid,则会记住一个协程句柄(如第 44 行所示),并且 co_awaiting 将返回一个恢复点。预期套接字池工作线程稍后将通过调用协程句柄方法 resume(位于第 28 行)从恢复点恢复。如果该方法返回 falseco_awaiting 将立即恢复,并立即调用 await_resume 方法以获取预期的结果或异常。这种情况也可能发生在 SocketPro 中,尽管几率较低。await_suspend 方法也可以定义为返回自定义的协程句柄,这超出了本文的范围。

最后,在发生以下三种情况之一后,将通过 co_await 运算符调用 await_resume 方法:

  1. await_ready 返回 true
  2. await_suspend 返回 false
  3. 由套接字池工作线程调用协程句柄方法 resume,如第 28 行所示

await_resume 方法内部,如果确实记录了异常,则可能抛出异常,如第 67 行所示。这里,该方法返回一个 rvalue。预期模板参数 R 支持移动构造函数或拷贝构造函数。然而,为了避免内存复制,优先使用移动构造函数。还请注意,如果没有预期结果,await_resume 方法也可以定义为返回 void

其他示例

在理解了以上所有内容之后,您可以通过使用以下示例,借助示例 SocketPro 服务器 all_servers,进一步研究 C++20 协程功能:

  1. 客户端和服务器之间的文件交换:../socketpro/tutorials/cplusplus/remote_file/client/rf_cw.cpp
  2. 服务器持久化队列:../socketpro/tutorials/cplusplus/server_queue/client/sq_cw.cpp
  3. SQLite 数据库:../socketpro/stream_sql/usqlite/test_csqlite/cw_sqlite.cpp

C++20 协程/co_await 的性能优势

C++20 协程是无栈的,不需要复制栈信息数据,这除了它们的异步计算之外,还可以带来显著的性能提升。SocketPro 在 ../socketpro/samples/latency 目录中提供了示例客户端/服务器代码,供您进行性能测试。请注意,SocketPro C++ 适配器充分支持现代语言开发功能,如匿名函数、Lambda 表达式、std::promisestd::futurecoroutineco_await 等。

我们的性能研究表明,与 std::promise/std::future/get 方法相比,C++20 co_await 每次客户端/服务器请求总是可以减少约 4 ~ 6 微秒,无论客户端平台是 Windows 还是 Linux。C++20 co_await 的性能提升是巨大的!

历史

  • 2020 年 12 月 3 日:首次发布
  • 2023 年 2 月 10 日: minor update for adding support to clang 14 or later
© . All rights reserved.