沒有調度器的協程不是好協程——零基礎深入淺出 C++20 協程
前言
上一篇《協程本質是函數加狀態機》談到 C++20 協程的本質,是編譯器基于 duff device 的精巧封裝,經過一番乾坤大挪移,協程體內容被掉包只保留協程初始化代碼,實際運行代碼被包裹在編譯器自動生成的 resume 函數中,這一點通過 C++ Insights 在線工具觀察的一清二楚。
然而上一篇舉的數列生成器例子中,協程的運行還是需要用戶通過 while 循環來驅動,顯得不夠貼近實際,因此這一篇引入協程調度器,看看 C++20 協程是如何自動運行的,文章仍然遵守之前的創作原則:
* 選取合適的 demo 是頭等大事
* 以協程為目標,涉及到的新語法會簡單說明,不涉及的不旁征博引,很多新語法都是有了某種需求才創建的,理解這種需求本身比硬學語法規則更為重要
* 若語法的原理非常簡單,也會簡單展開講講,有利于透過現象看本質,用起來更得心應手
上一篇文章里不光探討了協程的本質,還說明了一系列 C++20 協程概念:
* 協程體
* 協程狀態
* 承諾對象
* 返回對象
* 協程句柄
及它們之間的關系:

并簡單說明了接入 C++20 協程時用戶需要實現的類型、接口、及其含義。如果沒有這些內容鋪墊,看本文時會有很多地方將會難以理解,還沒看過的小伙伴,墻裂建議先看那篇。
工具還是之前介紹過的 C++ Insights 和 Compile Explorer,也在上一篇中介紹過了,這里不再贅述。
協程調度器
話不多說,直接上 demo:
#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
class SingleThreadScheduler {
public:
void schedule(std::function<void()> task) {
tasks.push(std::move(task));
}
void run() {
while (!tasks.empty()) {
auto task = tasks.front();
tasks.pop();
task();
}
}
private:
std::queue<std::function<void()>> tasks;
};
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~AsyncTask() { if (handle) handle.destroy(); }
};
struct ScheduleAwaiter {
SingleThreadScheduler* scheduler;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
void await_resume() {}
};
AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
std::cout << "Task " << id << " started on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " resumed on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " finish on thread: "
<< std::this_thread::get_id() << std::endl;
}
int main() {
SingleThreadScheduler scheduler;
auto task1 = demo_coroutine(scheduler, 1);
auto task2 = demo_coroutine(scheduler, 2);
auto task3 = demo_coroutine(scheduler, 3);
std::cout << "init done" << std::endl;
scheduler.run();
}
這個例子演示了擁有三個協程任務的單線程協程調度器,有如下輸出:
Task 1 started on thread: 128258074408768
Task 2 started on thread: 128258074408768
Task 3 started on thread: 128258074408768
init done
Task 1 resumed on thread: 128258074408768
Task 2 resumed on thread: 128258074408768
Task 3 resumed on thread: 128258074408768
Task 1 finish on thread: 128258074408768
Task 2 finish on thread: 128258074408768
Task 3 finish on thread: 128258074408768
用戶只需要調用SingleThreadScheduler::run 方法,就可以源源不斷的驅動注冊在其上的協程運行了!
demo 比較長,下面分段看下。
#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
調度器類型,schedule 方法注冊協程,run 會阻塞當前線程、不停的運行其上的協程,協程 resume 方法被包裹在 std::function 中,放置在先進先出的隊列里,保證執行的先后順序
class SingleThreadScheduler {
public:
void schedule(std::function<void()> task) {
tasks.push(std::move(task));
}
void run() {
while (!tasks.empty()) {
auto task = tasks.front();
tasks.pop();
task();
}
}
private:
std::queue<std::function<void()>> tasks;
};
協程返回對象的定義,與之前大體一樣,包含了承諾對象與協程句柄,承諾對象主要的變化是:1) initial_suspend 不再掛起協程; 2) 增加了 return_void 接口; 3) 減少了 yield_value 接口;
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~AsyncTask() { if (handle) handle.destroy(); }
};
專用的等待對象,主要實現了 await_suspend 方法以便在協程掛起時、向調度器注冊協程 resume 方法。增加這個等待對象一來可以掛起協程,二來方便獲取協程句柄及其 resume 方法
struct ScheduleAwaiter {
SingleThreadScheduler* scheduler;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
void await_resume() {}
};
協程體,接收調度器、返回返回對象,內部 co_await 等待兩次異步事件,會產生兩次中斷,每次中斷前將 resume 注冊到調度器,以便之后喚醒時繼續執行,直到協程結束
AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
std::cout << "Task " << id << " started on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " resumed on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " finish on thread: "
<< std::this_thread::get_id() << std::endl;
}
程序入口,初始化調度器與三個協程任務,最后 run 搞定一切
int main() {
SingleThreadScheduler scheduler;
auto task1 = demo_coroutine(scheduler, 1);
auto task2 = demo_coroutine(scheduler, 2);
auto task3 = demo_coroutine(scheduler, 3);
std::cout << "init done" << std::endl;
scheduler.run();
}
這里完善一條規則:
* 若協程體中有明確的 co_yield,則承諾對象必需實現 yield_value 接口;
* 若協程體中有明確的 co_return xxx,則承諾對象必需實現 return_value 接口;
* 若協程體中有明確的 co_return 或沒有任何 co_return,則承諾對象至少需要實現 return_void 接口。
相比之前的例子,沒有顯式的 co_yield 和 co_return,這里承諾對象只需要實現 return_void 即可,規范上說沒實現的話可能導致未定義行為,實測 clang 去掉沒引發崩潰,不過最好還是帶上。
老規矩,下面有請 C++ Insights 上場,看看編譯器底層做的工作與之前相比有何差異:
查看代碼
/*************************************************************************************
* NOTE: The coroutine transformation you've enabled is a hand coded transformation! *
* Most of it is _not_ present in the AST. What you see is an approximation. *
*************************************************************************************/
#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
class SingleThreadScheduler
{
public:
inline void schedule(std::function<void ()> task)
{
this->tasks.push(std::move(task));
}
inline void run()
{
while(!this->tasks.empty()) {
std::function<void ()> task = std::function<void ()>(this->tasks.front());
this->tasks.pop();
task.operator()();
}
}
private:
std::queue<std::function<void ()>, std::deque<std::function<void ()>, std::allocator<std::function<void ()> > > > tasks;
public:
// inline SingleThreadScheduler() noexcept(false) = default;
// inline ~SingleThreadScheduler() noexcept = default;
};
struct AsyncTask
{
struct promise_type
{
inline AsyncTask get_return_object()
{
return AsyncTask(AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this)));
}
inline std::suspend_never initial_suspend()
{
return {};
}
inline std::suspend_always final_suspend() noexcept
{
return {};
}
inline void return_void()
{
}
inline void unhandled_exception()
{
std::terminate();
}
// inline constexpr promise_type() noexcept = default;
};
std::coroutine_handle<promise_type> handle;
inline explicit AsyncTask(std::coroutine_handle<promise_type> h)
: handle{std::coroutine_handle<promise_type>(h)}
{
}
inline ~AsyncTask() noexcept
{
if(this->handle.operator bool()) {
this->handle.destroy();
}
}
};
struct ScheduleAwaiter
{
SingleThreadScheduler * scheduler;
inline bool await_ready() const
{
return false;
}
inline void await_suspend(std::coroutine_handle<void> h)
{
class __lambda_47_29
{
public:
inline /*constexpr */ void operator()() const
{
h.resume();
}
private:
std::coroutine_handle<void> h;
public:
// inline /*constexpr */ __lambda_47_29(const __lambda_47_29 &) noexcept = default;
// inline /*constexpr */ __lambda_47_29(__lambda_47_29 &&) noexcept = default;
__lambda_47_29(const std::coroutine_handle<void> & _h)
: h{_h}
{}
};
this->scheduler->schedule(std::function<void ()>(__lambda_47_29{h}));
}
inline void await_resume()
{
}
};
struct __demo_coroutineFrame
{
void (*resume_fn)(__demo_coroutineFrame *);
void (*destroy_fn)(__demo_coroutineFrame *);
std::__coroutine_traits_impl<AsyncTask>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
SingleThreadScheduler & scheduler;
int id;
std::suspend_never __suspend_52_11;
ScheduleAwaiter __suspend_56_14;
ScheduleAwaiter __suspend_61_14;
std::suspend_always __suspend_52_11_1;
};
AsyncTask demo_coroutine(SingleThreadScheduler & scheduler, int id)
{
/* Allocate the frame including the promise */
/* Note: The actual parameter new is __builtin_coro_size */
__demo_coroutineFrame * __f = reinterpret_cast<__demo_coroutineFrame *>(operator new(sizeof(__demo_coroutineFrame)));
__f->__suspend_index = 0;
__f->__initial_await_suspend_called = false;
__f->scheduler = std::forward<SingleThreadScheduler &>(scheduler);
__f->id = std::forward<int>(id);
/* Construct the promise. */
new (&__f->__promise)std::__coroutine_traits_impl<AsyncTask>::promise_type{};
/* Forward declare the resume and destroy function. */
void __demo_coroutineResume(__demo_coroutineFrame * __f);
void __demo_coroutineDestroy(__demo_coroutineFrame * __f);
/* Assign the resume and destroy function pointers. */
__f->resume_fn = &__demo_coroutineResume;
__f->destroy_fn = &__demo_coroutineDestroy;
/* Call the made up function with the coroutine body for initial suspend.
This function will be called subsequently by coroutine_handle<>::resume()
which calls __builtin_coro_resume(__handle_) */
__demo_coroutineResume(__f);
return __f->__promise.get_return_object();
}
/* This function invoked by coroutine_handle<>::resume() */
void __demo_coroutineResume(__demo_coroutineFrame * __f)
{
try
{
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_demo_coroutine_1;
case 2: goto __resume_demo_coroutine_2;
case 3: goto __resume_demo_coroutine_3;
case 4: goto __resume_demo_coroutine_4;
}
/* co_await insights.cpp:52 */
__f->__suspend_52_11 = __f->__promise.initial_suspend();
if(!__f->__suspend_52_11.await_ready()) {
__f->__suspend_52_11.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
__resume_demo_coroutine_1:
__f->__suspend_52_11.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " started on thread: "), std::this_thread::get_id()).operator<<(std::endl);
/* co_await insights.cpp:56 */
__f->__suspend_56_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_56_14.await_ready()) {
__f->__suspend_56_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 2;
return;
}
__resume_demo_coroutine_2:
__f->__suspend_56_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " resumed on thread: "), std::this_thread::get_id()).operator<<(std::endl);
/* co_await insights.cpp:61 */
__f->__suspend_61_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_61_14.await_ready()) {
__f->__suspend_61_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 3;
return;
}
__resume_demo_coroutine_3:
__f->__suspend_61_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " finish on thread: "), std::this_thread::get_id()).operator<<(std::endl);
/* co_return insights.cpp:52 */
__f->__promise.return_void()/* implicit */;
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
__final_suspend:
/* co_await insights.cpp:52 */
__f->__suspend_52_11_1 = __f->__promise.final_suspend();
if(!__f->__suspend_52_11_1.await_ready()) {
__f->__suspend_52_11_1.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 4;
return;
}
__resume_demo_coroutine_4:
__f->destroy_fn(__f);
}
/* This function invoked by coroutine_handle<>::destroy() */
void __demo_coroutineDestroy(__demo_coroutineFrame * __f)
{
/* destroy all variables with dtors */
__f->~__demo_coroutineFrame();
/* Deallocating the coroutine frame */
/* Note: The actual argument to delete is __builtin_coro_frame with the promise as parameter */
operator delete(static_cast<void *>(__f), sizeof(__demo_coroutineFrame));
}
int main()
{
SingleThreadScheduler scheduler = SingleThreadScheduler();
AsyncTask task1 = demo_coroutine(scheduler, 1);
AsyncTask task2 = demo_coroutine(scheduler, 2);
AsyncTask task3 = demo_coroutine(scheduler, 3);
std::operator<<(std::cout, "init done").operator<<(std::endl);
scheduler.run();
return 0;
}
內容比較多,只撿關鍵的看下:
struct __demo_coroutineFrame
{
void (*resume_fn)(__demo_coroutineFrame *);
void (*destroy_fn)(__demo_coroutineFrame *);
std::__coroutine_traits_impl<AsyncTask>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
SingleThreadScheduler & scheduler;
int id;
std::suspend_never __suspend_52_11; // initial_suspend
ScheduleAwaiter __suspend_56_14; // 第一個 co_await
ScheduleAwaiter __suspend_61_14; // 第二個 co_await
std::suspend_always __suspend_52_11_1; // final_suspend
};
協程狀態基本結構與之前一致,除了返回類型、參數、棧變量外,等待對象的數量與類型也發生了變更,看起來編譯器根據返回值類型推導直接得到了成員類型 (std::suspend_never、SchedulerAwaiter、suspend_always等)。
下面進入協程的 resume 方法看看,它是整個協程的核心:
/* This function invoked by coroutine_handle<>::resume() */
void __demo_coroutineResume(__demo_coroutineFrame * __f)
{
try
{
熟悉的 duff device 上場
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_demo_coroutine_1;
case 2: goto __resume_demo_coroutine_2;
case 3: goto __resume_demo_coroutine_3;
case 4: goto __resume_demo_coroutine_4;
}
promise_type::initial_suspend 返回 suspend_never 導致這里不掛起,協程直接略過這個條件繼續運行,這也是 main 中 init done 輸出位于 Task N start on thread 輸出之后的原因,在構建并返回返回對象前就會向下執行到第一個 co_await
/* co_await insights.cpp:52 */
__f->__suspend_52_11 = __f->__promise.initial_suspend();
if(!__f->__suspend_52_11.await_ready()) {
__f->__suspend_52_11.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
__resume_demo_coroutine_1:
__f->__suspend_52_11.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " started on thread: "), std::this_thread::get_id()).operator<<(std::endl);
第一個 co_await,ScheduleAwaiter 會掛起協程,掛起前調用的 ScheduleAwaiter::await_suspend 將 resume 添加到調度器隊列,以便下次喚醒
/* co_await insights.cpp:56 */
__f->__suspend_56_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_56_14.await_ready()) {
__f->__suspend_56_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 2;
return;
}
再次被調度器調度到時,根據狀態值與 switch-case 直接跳轉到這里執行。由于調度器內部使用先進先出隊列,因此三個協程任務是嚴格按順序執行的
__resume_demo_coroutine_2:
__f->__suspend_56_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " resumed on thread: "), std::this_thread::get_id()).operator<<(std::endl);
第二個 co_await,如法炮制
/* co_await insights.cpp:61 */
__f->__suspend_61_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_61_14.await_ready()) {
__f->__suspend_61_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 3;
return;
}
__resume_demo_coroutine_3:
__f->__suspend_61_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " finish on thread: "), std::this_thread::get_id()).operator<<(std::endl);
協程退出前,沒有 co_yield 或 co_return xxx 顯示調用,則默認調用 co_return 無參版本,對應的就是 return_void 啦;如果有未捕獲的異常,promise_type::unhandle_exception 將會被調用進而退出整個進程
/* co_return insights.cpp:52 */
__f->__promise.return_void()/* implicit */;
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
協程繼續運行,promise_type::final_suspend 返回 suspend_always 會導致協程掛起,配合返回對象的析構函數可以銷毀協程
__final_suspend:
/* co_await insights.cpp:52 */
__f->__suspend_52_11_1 = __f->__promise.final_suspend();
if(!__f->__suspend_52_11_1.await_ready()) {
__f->__suspend_52_11_1.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 4;
return;
}
就不會走到這里協程體的自動銷毀邏輯啰
__resume_demo_coroutine_4:
__f->destroy_fn(__f);
}
有上一篇文章的鋪墊,看起來沒什么尿點,下面來一張圖總覽下:

為了便于理解只畫了一個協程任務的執行順序,跟著箭頭方向和標號就能梳理清楚啦。
final_suspend 與協程自清理
上面例子中,每個協程的返回對象需要保存在臨時變量 task1/2/3 中,不然在調度器運行時會因協程狀態銷毀而崩潰:
int main() {
SingleThreadScheduler scheduler;
demo_coroutine(scheduler, 1);
demo_coroutine(scheduler, 2);
demo_coroutine(scheduler, 3);
std::cout << "init done" << std::endl;
scheduler.run();
}
輸出:
Task 1 started on thread: 124850410948416
Task 2 started on thread: 124850410948416
Task 3 started on thread: 124850410948416
init done
Program terminated with signal: SIGSEGV
這主要是因為返回對象的析構有銷毀協程狀態的動作:
~AsyncTask() { if (handle) handle.destroy(); }
當不使用變量保持返回對象的生命周期時,臨時對象走不到 SingleTaskScheduler::run 就被析構了,后面再引用時就會崩潰。
參考 C++ Insights 的輸出,__demo_corotineResume 尾部有協程的自銷毀邏輯,能否利用這個破解協程狀態與返回對象的耦合關系呢?答案是肯定的。借助于 promise_type::final_suspend就能實現,下面是改進后的代碼:
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
~promise_type() { std::cout << "promise_type destroy" << std::endl; }
};
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~AsyncTask() { /*if (handle) handle.destroy();*/ }
};
主要有三點:
* promise_type::final_suspend 返回 std::suspend_never
* AsyncTask 析構不再調用 handle.destroy()
* promise_type 增加析構輸出日志,以確認協程狀態被正確回收
main 中保持不接收返回對象,新的輸出:
Task 1 started on thread: 133157948458816
Task 2 started on thread: 133157948458816
Task 3 started on thread: 133157948458816
init done
Task 1 resumed on thread: 133157948458816
Task 2 resumed on thread: 133157948458816
Task 3 resumed on thread: 133157948458816
Task 1 finish on thread: 133157948458816
promise destroy
Task 2 finish on thread: 133157948458816
promise destroy
Task 3 finish on thread: 133157948458816
promise destroy
程序是可以正常退出的,原理簡單說明如下:
* AsyncTask 析構不再調用 handle.destroy()后,返回對象臨時變量析構時不銷毀底層的協程狀態
* promise_type::final_suspend 返回 std::suspend_never 后,協程在最后一次 resume 時會一直運行到末尾,此時調用 __demo_coroutineDestroy 銷毀協程狀態及其成員承諾對象
借用上次寫的關系圖,稍做修改看下整個過程:

出于清晰起見,返回對象的銷毀使用數字標號,協程狀態的銷毀使用字母標號,表示他們是獨立不相關的。圖中,由于AsyncTask析構destroy協程的路線被中斷了,且final_suspend不掛起協程,這里就走了協程自清理的邏輯,你看明白了嗎?
coroutine_handle<> 與類型擦除
程序邏輯梳理完了,回頭來看個語法,注意等待對象的一個接口定義:
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
這里參數是協程句柄,但奇怪的是模板參數空空如也,按理說不應該是 coroutine_handle<AsyncTask::promise_type>么?這涉及到一個 C++20 的新語法特性:類型擦除。
其實類型擦除算不上什么新鮮事,早在 C 語言中就有通過 void* 擦除類型的能力;后面 C++ 面向對象的虛函數也是如此,只關心接口不關心類型;不過他們都有這樣那樣的不足:C 語言的 void* 具有類型不安全的問題;面向對象虛函數又帶來了指針跳轉的性能損失、以及無法對三方庫進行處理的問題。C++20 基于模板的類型擦除技術,既能忽略具體類型將關注點集中在通用操作層面,又能避免上述不足。
首先解釋 std::coroutine_handle<> 類型,它實際上是 std::coroutine_handle<void> 的簡寫,后者是 std::coroutine_handle<T> 模板的一個特化。從上一篇的協程關系圖可知,協程句柄底層持有的是一個協程狀態的指針,std::coroutine_handle<void>封裝了與底層指針直接相關的接口,包括:
* 構造、拷貝構造、賦值構造:接收一個協程狀態指針,用于初始化內部指針
* address:返回協程狀態指針
* from_address:接收一個協程狀態指針,構建一個 std::coroutine_handle<void>并返回
* resume:委托給編譯器內置的 __builtin_coro_resume
* done:委托給編譯器內置的 __builtin_coro_done
* destroy:委托給編譯器內置的 __builtin_coro_destroy
* operator bool:判斷底層指針是否為空
* operator ():調用 resume
像 resume、done、destroy 這些方法,都是委托給編譯器內置接口來實現的,普通用戶看不到也不用關心,這一點有點兒類似 void* 指針,本質上是個黑盒,因此直到這一步,協程中的類型擦除還和 void* 沒有本質區別。
接著解釋具體的 std::coroutine_handle<T> 類型,T 一般是 promise_type,不過不同的返回對象的這個 traits 類型也不同,目前我們已經見識過了 Generator::promise_type 和 AsyncTask::promise_type,每個用戶協程都有自己獨特的 promise_type,不勝枚舉。它主要實現了三個額外的接口:
* promise:獲取協程狀態中的承諾對象
* from_promise:接收一個承諾對象,定位到包含它的協程狀態地址,再基于此構造一個 coroutine_handle<void> 對象并返回
* operator coroutine_handle<>():將自身顯示轉換為 coroutine_handle<void> 類型,就是基于底層指針直接構建一個 void 特化并返回,有點類似 from_address
這三個接口各有用處,之前的例子已經見識了前兩個的用法:
int value() { return handle.promise().current_value; }
回顧上一篇文章中 co_yield 生成數列值時,數值是保存在承諾對象中的,外部想要獲取的話就是通過返回對象 -> 協程句柄 -> 承諾對象拿到的,這里用到了協程句柄的 promise 接口。
struct promise_type {
int current_value;
auto get_return_object() { return Generator{this}; }
...
}
...
Generator(promise_type* p) : handle(std::coroutine_handle<promise_type>::from_promise(*p)) {}
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
...
};
...
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
...
};
兩個例子中的返回對象都是使用 from_promise 來構建協程句柄的,不同之處是前者構造函數傳遞的是 promise 對象,在內部通過 from_promise 生成協程句柄;后者是在 get_return_object 中直接生成協程句柄,再傳遞給構造函數。這個接口的存因也好理解,因為用戶不知道有協程狀態的存在,只能用承諾對象去構造。
第三個接口的調用點用戶看不到,是編譯器在底層自己做的:
__f->__suspend_52_11 = __f->__promise.initial_suspend();
if(!__f->__suspend_52_11.await_ready()) {
__f->__suspend_52_11.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
這段經典的 co_await 翻譯過來的 C++ 代碼中,await_suspend 的參數大有講究,這一長串代碼可分兩部分解讀:
* std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)):根據協程狀態調用 from_address 生成具象的協程句柄 coroutine_handle<AsyncTask::promise_type>
* operator std::coroutine_handle<void>():上面返回的臨時對象上調用 operator coroutine_handle<void> 強轉為通用的協程句柄
這樣一來等待對象的 await_suspend 就可以不關心具象的、與用戶承諾對象相關的協程句柄,因為它只依賴通用協程句柄的接口,反而大大拓寬了等待對象的使用范圍,基本能用于任何用戶定義的協程體中,你看明白了嗎?
網上有一些文章,說具象的 coroutine_handle<AsyncTask::promise_type> 是派生于特化的 coroutine_handle<void>,這根本是無稽之談,去看看標準庫實現就能知道。雖然派生是類型擦除的一種途徑,C++20 卻沒有采用這種方式,主要是為了避免面向對象繼承和虛函數帶來的性能負擔,目前這種 operator 強轉的方式,只是將底層的指針轉移到新對象,性能開銷非常小。
有心的讀者可能問了,這里編譯器為何不直接生成下面的代碼:
__f->__suspend_52_11.await_suspend(std::coroutine_handle<void>::from_address(static_cast<void *>(__f)));
反正最后參數是 coroutine_handle<>,我也覺得這樣更簡潔,而且強轉后原來具象的協程句柄也沒用了會自動銷毀,至于編譯器為什么不這樣搞,搞不清楚。
最后來欣賞下 coroutine_handle<T> 的 from_promise & promise 的實現:
static coroutine_handle from_promise(_Promise& __p)
{
coroutine_handle __self;
__self._M_fr_ptr = __builtin_coro_promise((char*) &__p, __alignof(_Promise), true);
return __self;
}
_Promise& promise() const
{
void* __t = __builtin_coro_promise (_M_fr_ptr, __alignof(_Promise), false);
return *static_cast<_Promise*>(__t);
}
多認識了一個內置函數 __builtin_coro_promise,它的作用是根據承諾對象尋找協程狀態地址,或相反 (由最后的 bool 參數控制),內部估計就是 offsetof 指針加減吧。
關于類型擦除,這是一個宏大的概念,跳出 C++20 協程的范疇考慮的話,還有很多其它方式,比如下面這個例子:
#include <iostream>
template <typename T>
class Shape
{
public:
void Draw()
{
static_cast<T*>(this)->DrawImpl();
}
void DrawImpl() { std::cout << "Draw Shape\n"; }
};
class Circle :public Shape<Circle>
{
public:
void DrawImpl() { std::cout << "Draw Circle\n"; }
};
class Rect :public Shape<Rect>
{
public:
void DrawImpl() { std::cout << "Draw Rect\n"; }
};
class Triangle :public Shape<Triangle> {};
int main()
{
Circle a;
a.Draw(); //Draw Circle
Rect b;
b.Draw(); //Draw Rect
Triangle c;
c.Draw(); //Draw Shape
}
這種技術稱為奇異遞歸模板模式 (CRTP, Curiously Recurring Template Pattern),首先定義一個模板基類 (Shape),它包含通用的對外接口 (Draw) 和對內實現 (DrawImpl) 兩套接口,其中對外接口是委托給使用模板參數類型強制轉換后的 this 的對內實現的接口,它們都是普通函數而非虛函數,因此沒有虛函數表;接著基于模板基類進行派生 (Circle/Rect/Triangle),而基類的模板參數恰好就是派生類自己,它會重寫基類模板的對內實現接口,這樣基類的對外接口其實最終調用的就是派生類的重寫版本 (case a & b),如果派生類沒有重寫接口,則基類默認的實現會被調用 (case c);由于接收派生類模板參數的模板類在編譯期完成實例化,故無需借助虛函數就可以直接調用派生類的普通函數,這也稱為編譯期靜態多態。這種手法的優點是減少了運行期虛函數開銷,缺點是模板會拉長編譯時間并增大代碼體積。
回到 C++20 協程的場景,由于協程句柄是期望用戶直接通過 coroutine_handle<T> 的形式使用,并不定義任何新類并派生于 coroutine_handle<void>,所以上面的方式并不適合。
最后,還有其它類型擦除技術,例如借助于 C++17 的 std::variant,關于這方面就不展開了,感興趣的讀者可以參考文末附錄。
lambda 本質是仿函數
這里插一個彩蛋,和 C++20 協程無關,不過正好看到了,就一起來分析下。例子中有一段 lambda 表達式:
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
它捕獲一個協程句柄 h,沒有參數,函數體直接調用 resume 接口。看對應的 C++ Insights 解析結果:
inline void await_suspend(std::coroutine_handle<void> h)
{
class __lambda_47_29
{
public:
inline /*constexpr */ void operator()() const
{
h.resume();
}
private:
std::coroutine_handle<void> h;
public:
// inline /*constexpr */ __lambda_47_29(const __lambda_47_29 &) noexcept = default;
// inline /*constexpr */ __lambda_47_29(__lambda_47_29 &&) noexcept = default;
__lambda_47_29(const std::coroutine_handle<void> & _h)
: h{_h}
{}
};
this->scheduler->schedule(std::function<void ()>(__lambda_47_29{h}));
}
編譯器將它翻譯成了一個內置的仿函數類 __lambda_47_29,捕獲列表轉化為 private 成員變量,由構造函數初始化;調用參數將轉化為成員operator() 的參數,這里沒有;返回值轉化為成員operator() 的返回值,這里為 void。最后在調用點生成仿函數的臨時對象、并將捕獲列表作為參數傳入 __lambda_47_29{h},由于 schedule 需要一個 std::function 類型,所以這里進行了顯示轉換。
看懂了這個戲法,再看 lambda 表達式的按引用捕獲、按移動捕獲、全部捕獲、全部按引用捕獲等,是不是就清晰多了? 其實就是一個推導成員變量類型的問題,按引用捕獲的,成員變量也被聲明為一個引用,那么它的生命周期管理就值得注意,需要保證 lambda 動作時相關的對象仍存在,避免發生懸空引用的問題。
不得不夸 C++ Insights 真是個好東西~
結語
本文接續前一篇,進一步深化了 C++20 協程例子,通過使用調度器使協程的運行更符合實際使用場景。期間還分析了幾個語法特性:final_suspend 與協程的自清理、協程句柄使用類型擦除來簡化接口使用,lambda 表達式的本質是仿函數。不過這個例子還是只具有演示性質,畢竟在真實的等待異步事件場景中,協程是否繼續是要要由異步事件是否完成來決定,而不是像目前這樣“排排坐”執行。所以下一篇,將引入真正的異步網絡、磁盤事件,看看 C++20 協程是如何包裝它們的。
參考
[1]. 淺析C++的幾種類型擦除實現
[3]. C++協程的靈魂擺渡者?coroutine_handle 使用詳解和高級特性剖析
[4]. gcc/libstdc++-v3/include/std/coroutine
[5]. 初探 C++20 Coroutine
本文來自博客園,作者:goodcitizen,轉載請注明原文鏈接:http://www.rzrgm.cn/goodcitizen/p/18933425/coroutines_without_schedulers_are_not_good_coroutines
網上 C++20 協程的例子為什么難懂?要么太復雜沒有進行簡化,要么太簡單脫離了實際使用場景,特別是后者,雖然每一行代碼都能看懂,但如何在實際場景中使用就一頭霧水了。今天來看一個帶簡單調度器的協程例子,看看協程是怎么自己運行起來的吧~
浙公網安備 33010602011771號