基于 IOCP 的協程調度器——零基礎深入淺出 C++20 協程
前言
上一篇《基于 epoll 的協程調度器》談到如何基于 epoll 構建一個事件驅動的協程調度器,沒有使用三方庫的原因主要是為了避免引入額外復雜度,不過只演示 Linux 未免對非 Unix 平臺的小伙伴有所不公,為此本文基于 Windows 的完成端口 (IO Completion Port:IOCP) 構建相同能力的 demo。
文章仍然遵守之前的創作原則:
* 選取合適的 demo 是頭等大事
* 以協程為目標,涉及到的新語法會簡單說明,不涉及的不旁征博引
* 若語法的原理非常簡單,也會簡單展開講講,有利于透過現象看本質,用起來更得心應手
上一篇文章里不光引入了基于事件的調度器,還說明了如何開啟多文件并行、await_suspend 與試讀的關系、singalfd 用于完美退出等話題,如果沒有這些內容鋪墊,看本文時會有很多地方難以理解,還沒看過的小伙伴,墻裂建議先看那篇。
工具還是之前介紹過的 Compile Explorer,這里不再用到 C++ Insights ,主要是它不支持 Windows 平臺,其實 Compiler Explorer 也只是編譯,運行的話還是不太行,因為它的環境不支持像文件、網絡之類的異步 IO,需要用戶自行搭建開發環境。
基于完成端口的 IO 多路復用
上文中提到了 Unix 系統中多路復用接口的發展歷程:分別經歷了 select -> poll -> epoll/kqueue,Windows 則通過完成端口一統江山,其實它倆調用方式差不太多:
| epoll | IOCP | |
| 初始化 | epoll_create |
CreateIoCompletionPort
|
| 關聯句柄 | epoll_ctl |
CreateIoCompletionPort
|
| 等待并獲取下一個事件 | epoll_wait |
GetQueuedCompletionStatus
|
| 投遞事件 | n/a (self pipe trick) | PostQueuedCompletionStatus |
| 銷毀 | close | CloseHandle |
而在可等待對象上,IOCP 則豐富的多:
* 文件 I/O 事件??
* 文件系統變更
* 套接字(Socket)事件??
* 命名管道(Named Pipe)事件??
* 設備 I/O 事件??
* 定時器事件(結合 Waitable Timer)??
這方面能與它相提并論的恐怕只有 kqueue 了。有了上面的鋪墊再參考之前 epoll 的實現,直接上 demo 源碼:
#include <coroutine>
#include <unordered_map>
#include <windows.h>
#include <vector>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <memory>
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
};
class IocpScheduler {
private:
HANDLE iocp_handle;
std::unordered_map<HANDLE, std::coroutine_handle<>> io_handles;
public:
IocpScheduler() {
iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (iocp_handle == NULL) {
throw std::runtime_error("CreateIoCompletionPort failed");
}
}
~IocpScheduler() {
CloseHandle(iocp_handle);
}
void register_io(HANDLE file_handle, std::coroutine_handle<> handle) {
if (io_handles.find(file_handle) == io_handles.end()) {
io_handles[file_handle] = handle;
if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {
throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");
}
}
}
void run() {
while (true) {
DWORD bytes_transferred = 0;
ULONG_PTR completion_key = 0;
LPOVERLAPPED overlapped = nullptr;
BOOL success = GetQueuedCompletionStatus(
iocp_handle,
&bytes_transferred,
&completion_key,
&overlapped,
INFINITE);
if (completion_key != 0) {
HANDLE ready_handle = (HANDLE)completion_key;
if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
it->second.resume();
}
}
}
}
};
struct AsyncReadAwaiter {
IocpScheduler& sched;
HANDLE file_handle;
std::unique_ptr<char[]> buffer;
DWORD buffer_size;
OVERLAPPED overlapped;
DWORD bytes_read;
AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size)
: sched(s), file_handle(file), buffer_size(size), bytes_read(0) {
buffer = std::make_unique<char[]>(size);
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
}
bool await_ready() const {
return false;
}
void await_suspend(std::coroutine_handle<> h) {
sched.register_io(file_handle, h);
if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
DWORD error = GetLastError();
if (error != ERROR_IO_PENDING) {
std::stringstream ss;
ss << "ReadFile failed, error " << error;
throw std::runtime_error(ss.str());
}
}
}
std::string await_resume() {
DWORD bytes_transferred = 0;
if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
DWORD error = GetLastError();
std::stringstream ss;
ss << "GetOverlappedResult failed, error " << error;
throw std::runtime_error(ss.str());
}
return std::string(buffer.get(), bytes_transferred);
}
};
Task async_read_file(IocpScheduler& sched, const char* path) {
HANDLE file_handle = CreateFileA(
path,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
std::stringstream ss;
ss << "CreateFile failed, error " << GetLastError();
throw std::runtime_error(ss.str());
}
while (true) {
auto data = co_await AsyncReadAwaiter(sched, file_handle, 4096);
std::cout << "Read " << data.size() << " bytes\n";
if (data.size() == 0) {
break;
}
}
CloseHandle(file_handle);
}
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cout << "Usage: sample file_path" << std::endl;
return 1;
}
IocpScheduler scheduler;
async_read_file(scheduler, argv[1]);
scheduler.run();
return 0;
}
先看編譯:

Compile Explorer 中指定最新的 msvc 編譯器和 C++20 選項可以編譯通過,注意在 Windows 中選項指定的語法與 Unix 大相徑庭,別弄錯了。
一點一點降低版本嘗試,發現能編譯這段代碼的最低版本是 msvc19.29,對應 vs16.11,如果你需要在本地安裝測試環境的話,穩妥起見安裝 msvc19.30、對應 vs17.0 也就是 VS2022 比較好,如果本地只有 VS2019,需要升級到第五個也就是最后一個發行版才可以。

接下來創建一個簡單的控制臺應用包含上面的源文件,需要配置一下 C++ 語言標準:

就可以編譯生成可執行文件了,在同目錄準備一個文本文件 (test.txt) 進行測試:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
...
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 409
PS D:\code\iocp_coroutine\Debug>
居然死循環了。
指定偏移量
同樣的代碼邏輯,Unix 上沒問題 Windows 上卻死循環,主要原因是:前者底層使用的是管道,與 socket 之類相似是一個流 (stream),因此沒有讀寫偏移量的說法,每次從開頭獲取就可以了;后者使用的是文件,如果不指定偏移量,每次都會從位置 0 讀取,有的讀者可能問了,為何不能使用當前文件的讀取位置呢?這是因為 Windows 上的多路復用底層是徹徹底底的異步架構,必需每次為 ReadFile 指定一個偏移量,而不能夠使用當前文件的偏移量。
修復的方法很簡單,為 ReadFile 的 overlapped 參數的 Offset & OffsetHigh 字段指定要讀取數據的偏移量即可:
...
struct AsyncReadAwaiter {
IocpScheduler& sched;
HANDLE file_handle;
std::unique_ptr<char[]> buffer;
DWORD buffer_size;
增加一個引用成員用來記錄當前請求的偏移值,LARGE_INTEGER 可以理解為 uint64 的結構化表達
LARGE_INTEGER &offset;
OVERLAPPED overlapped;
DWORD bytes_read;
AsyncReadAwaiter(IocpScheduler& s, HANDLE file, LARGE_INTEGER &off, DWORD size)
在構造函數中初始化新成員,這個值需要從外部傳入,讀取成功后更新之,以便跨等待對象使用
: sched(s), file_handle(file), buffer_size(size), offset(off), bytes_read(0) {
buffer = std::make_unique<char[]>(size);
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
}
bool await_ready() const {
return false;
}
void await_suspend(std::coroutine_handle<> h) {
sched.register_io(file_handle, h);
每次請求前設置 overlapped 的偏移字段,并增加調試日志輸出以便觀察
overlapped.Offset = offset.LowPart;
overlapped.OffsetHigh = offset.HighPart;
std::cout << "ReadFile from " << offset.QuadPart << std::endl;
if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
DWORD error = GetLastError();
if (error != ERROR_IO_PENDING) {
std::stringstream ss;
ss << "ReadFile failed, error " << error;
throw std::runtime_error(ss.str());
}
}
}
std::string await_resume() {
DWORD bytes_transferred = 0;
if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
DWORD error = GetLastError();
std::stringstream ss;
ss << "GetOverlappedResult failed, error " << error;
throw std::runtime_error(ss.str());
}
讀取成功后,遞增相應的偏移量
offset.QuadPart += bytes_transferred;
return std::string(buffer.get(), bytes_transferred);
}
};
Task async_read_file(IocpScheduler& sched, const char* path) {
HANDLE file_handle = CreateFileA(
path,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
std::stringstream ss;
ss << "CreateFile failed, error " << GetLastError();
throw std::runtime_error(ss.str());
}
在外層循環中保存這個偏移量,以便可以持久化使用,初始值為 0
LARGE_INTEGER offset = { 0 };
while (true) {
auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);
std::cout << "Read " << data.size() << " bytes\n";
if (data.size() == 0) {
break;
}
}
CloseHandle(file_handle);
}
...
再次運行程序,可以輸出讀取的內容了:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
但是額外的,也收到了一個崩潰提示:

處理文件 EOF
記得之前講到協程體整個是包在編譯的 try...catch 代碼塊中的,這里直接崩潰難道是 msvc 的異常處理沒起作用?掛上調試器看看崩潰堆棧:

看起來是命中 promise 對象的 unhandle_exception,這里調用的 terminate 導致崩潰框彈出,而 unhandled_exception 恰恰是編譯器捕獲了 throw 拋出的異常,與直覺剛好相反。經過排查,唯一可能拋出異常的位置是這里:
std::string await_resume() {
DWORD bytes_transferred = 0;
if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
DWORD error = GetLastError();
std::stringstream ss;
ss << "GetOverlappedResult failed, error " << error;
這里加打一行日志
std::cerr << ss.str() << std::endl;
throw std::runtime_error(ss.str());
}
offset.QuadPart += bytes_transferred;
return std::string(buffer.get(), bytes_transferred);
}
新的輸出果然提示這里返回了錯誤:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
GetOverlappedResult failed, error 38
錯誤碼 38 對應的是 ERROR_HANDLE_EOF表示文件已到末尾,相比 epoll 管道不關心數據結尾的問題,IOCP 讀文件還需要額外增加一些處理,另外在拋異常時,msvc 相比 clang 的顯示不太友好,需要在拋出異常前補上 stderr 的打印,修復后的代碼如下:
std::string await_resume() {
DWORD bytes_transferred = 0;
if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
DWORD error = GetLastError();
判斷錯誤類型,如果是文件 EOF,直接返回空數據,上層會進行判斷,從而退出讀取循環
if (error != ERROR_HANDLE_EOF) {
std::stringstream ss;
ss << "GetOverlappedResult failed, error " << error;
std::cerr << ss.str() << std::endl;
throw std::runtime_error(ss.str());
}
else {
return "";
}
}
offset.QuadPart += bytes_transferred;
return std::string(buffer.get(), bytes_transferred);
}
下面是新的輸出:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
Read 0 bytes
不再報錯了。
多文件并行
上面的例子雖然通過多次讀取展示了協程多次喚醒的過程,但沒有展示多個 IO 句柄并發的能力,下面稍加改造,同時讀取多個文件:
Task async_read_file(IocpScheduler& sched, const char* path) {
HANDLE file_handle = CreateFileA(
path,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
std::stringstream ss;
ss << "CreateFile failed, error " << GetLastError();
std::cerr << ss.str() << std::endl;
throw std::runtime_error(ss.str());
}
LARGE_INTEGER offset = { 0 };
while (true) {
auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);
輸出文件句柄以區別從不同文件讀取的數據
std::cout << "Read [" << file_handle << "] " << data.size() << " bytes\n";
if (data.size() == 0) {
break;
}
}
CloseHandle(file_handle);
}
int main(int argc, char* argv[]) {
if (argc < 3) {
std::cout << "Usage: sample file1 file2" << std::endl;
return 1;
}
IocpScheduler scheduler;
async_read_file(scheduler, argv[1]);
多個文件只需要多次調用協程即可,從這里可以感受到協程強大的拓展性
async_read_file(scheduler, argv[2]);
scheduler.run();
return 0;
}
下面是新的輸出:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt test2.txt
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 456 bytes
Read [0000010C] 456 bytes
Read [00000108] 0 bytes
Read [0000010C] 0 bytes
為了便于對比,這里將讀取 buffer 的默認尺寸改為 1024,并去掉了調試日志。可以看出在 IOCP 里兩個文件基本是輪著讀的,公平性還是不錯的。
await_suspend & 試讀
上文中提到,通過在 await_ready 或 await_suspend 中增加一些代碼,就可以支持數據試讀,從而在某些場景下提升數據吞吐能力。下面看看 IOCP 是如何實現的,這里只演示 await_suspend 方式:
bool await_suspend(std::coroutine_handle<> h) {
sched.register_io(file_handle, h);
overlapped.Offset = offset.LowPart;
overlapped.OffsetHigh = offset.HighPart;
//std::cout << "ReadFile from " << offset.QuadPart << std::endl;
if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
DWORD error = GetLastError();
if (error != ERROR_IO_PENDING) {
std::stringstream ss;
ss << "ReadFile failed, error " << error;
std::cerr << ss.str() << std::endl;
throw std::runtime_error(ss.str());
}
}
ReadFile 本身具有試讀能力,當任務可以立即完成時,它將返回 TRUE,bytes_read 參數將返回讀取的數據長度;這里加入判斷,若立即讀取成功,則返回 false 不掛起協程
else {
// if immediately success, not hangup
std::cout << "immediately success, read = " << bytes_read << std::endl;
}
return bytes_read > 0 ? false : true;
}
std::string await_resume() {
DWORD bytes_transferred = 0;
resume 時先判斷是否為試讀場景,是的話直接返回數據就可以了
if (bytes_read > 0) {
bytes_transferred = bytes_read;
}
else {
if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
DWORD error = GetLastError();
if (error != ERROR_HANDLE_EOF) {
std::stringstream ss;
ss << "GetOverlappedResult failed, error " << error;
std::cerr << ss.str() << std::endl;
throw std::runtime_error(ss.str());
}
else {
return "";
}
}
}
offset.QuadPart += bytes_transferred;
return std::string(buffer.get(), bytes_transferred);
}
從這里也可以看出,Windows 對直接成功的支持是比較好的,不必像 Unix 那樣來回倒騰數據,下面是新版本輸出:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 456 bytes
Read [000000FC] 0 bytes
運行了多次沒有試出來,可能 Windows 只對網絡等真正異步的場景才會有立即成功的情況吧。
PostQueuedCompletionStatus & 完美退出
上面的 demo 如果遇到大文件目前只能通過 Ctrl C 強制殺死,畢竟調度器的 run 是個死循環沒法退出,下面對它進行一番改造,看看能否實現完美退出:
IocpScheduler g_scheduler;
由于需要在信號響應函數中通知調度器退出,這里將它做為一個全局變量,工程化一點的話可以改成單例,這里偷個懶
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cout << "Usage: sample file" << std::endl;
return 1;
}
初始化時捕獲 SiGINT 以便響應 Ctrl C
signal(SIGINT, on_user_exit);
async_read_file(g_scheduler, argv[1]);
g_scheduler.run();
return 0;
}
在信號響應函數中調用調度器退出接口實現完美退出
void on_user_exit(int signo) {
g_scheduler.exit(signo);
}
class IocpScheduler {
...
調度器中新增的退出接口,無腦給 IOCP 隊列投遞通知,注意 key 參數給的是 0,以區別于一般的文件讀取事件
void exit(int signo) {
std::cout << "caught signal " << signo << ", prepare to quit!" << std::endl;
PostQueuedCompletionStatus(iocp_handle, 0, (ULONG_PTR)0, NULL);
}
void run() {
while (true) {
DWORD bytes_transferred = 0;
ULONG_PTR completion_key = 0;
LPOVERLAPPED overlapped = nullptr;
BOOL success = GetQueuedCompletionStatus(
iocp_handle,
&bytes_transferred,
&completion_key,
&overlapped,
INFINITE);
收到事件后,優先檢測是否為退出事件,命中的話直接 break while 循環退出 main
if (completion_key == 0) {
std::cout << "IOCP ready to quit" << std::endl;
break;
}
else {
HANDLE ready_handle = (HANDLE)completion_key;
if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
it->second.resume();
}
}
}
}
~IocpScheduler() {
調度器析構中增加協程的銷毀,防止內存、句柄泄漏
for(auto handle : io_handles) {
std::cout << "coroutine destroy" << std::endl;
handle.second.destroy();
}
CloseHandle(iocp_handle);
}
};
下面是新版輸出:
PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 456 bytes
Read [00000110] 0 bytes
caught signal 2, prepare to quit!
IOCP ready to quit
coroutine destroy
用戶按下 Ctrl C 后,可以實現完美退出啦!
結語
本文介紹了一種基于 IOCP 多路復用的協程調度器,除此之外還說明了如何妥善處理文件偏移、文件 EOF、await_suspend 與試讀寫、PostQueuedCompletionStatus 與進程完美退出等,可用于構建工業級強度的代碼。
最后,由于本文中 demo 經歷多次迭代,想要復制最終版進行驗證的小伙伴,可以 follow 這個開源 git 庫獲?。?a target="_blank" rel="noopener nofollow">cpp20coroutine。
下一篇來看下,如何將現有的基于回調的異步庫與 C++20 協程無縫糅合。
參考
[1]. 如果異步完成,ReadFile()是否總是返回FALSE?
[2]. 系統錯誤代碼 (0-499)
本文來自博客園,作者:goodcitizen,轉載請注明原文鏈接:http://www.rzrgm.cn/goodcitizen/p/19052857/iocp_based_multiplexing_coroutine_scheduler
浙公網安備 33010602011771號