<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      基于 epoll 的協程調度器——零基礎深入淺出 C++20 協程

      前言

      上一篇《沒有調度器的協程不是好協程》談到協程如何自動運行,然而那個例子里的調度器還是不太自然,考查一下真實場景,掛起的協程一般是在等待異步事件的完成,如果異步事件沒完成就輪到自己執行,它其實還是無法繼續,相當于一次無效喚醒。所以這一篇準備引入異步事件,看看在真實的場景下,調度器是如何運作的。

      文章仍然遵守之前的創作原則:

      * 選取合適的 demo 是頭等大事

      * 以協程為目標,涉及到的新語法會簡單說明,不涉及的不旁征博引

      * 若語法的原理非常簡單,也會簡單展開講講,有利于透過現象看本質,用起來更得心應手

      上一篇文章里不光引入了初級的調度器,還說明了 final_suspend 與協程自清理的關系、協程句柄通過類型擦除來屏蔽用戶定義承諾對象的差異、以及 lambda 表達式的本質是仿函數等,如果沒有這些內容鋪墊,看本文時會有很多地方難以理解,還沒看過的小伙伴,墻裂建議先看那篇。

      工具還是之前介紹過的 C++ Insights ,這里不再用到 Compile Explorer,主要是它的運行環境不支持像文件、網絡之類的異步 IO,為此需要用戶自行搭建開發環境。

      基于 epoll 的 IO 多路復用

      本文演示的異步 IO 以文件操作為主,相比網絡操作它具有代碼量少、易于測試的優點。為了簡化復雜度,這里沒有接入任何三方庫,而是直接調用操作系統 raw API,閱讀本文需要具有 IO 多路復用 (multiplexing) 的知識基礎,例如 Linux 的 epoll 或 Windows 的 IOCP。

      在單線程時代,想要處理多個 IO 事件也不是不行,只要將異步 IO 句柄交給 select / poll / epoll / kqueue 等待即可,當任一 IO 事件到達時,控制權將從阻塞等待中返回,并告知用戶哪個句柄上有何種事件發生,從而方便用戶直接處理那個句柄上的 IO 事件,并且預期將不會被阻塞。這種模型因為檢測完成后,還需要用戶動作一下,也稱為 Reactor 模型;相對的,還有 Proactor 模型,主要是基于 Windows IOCP,當事件完成時,相應的讀、寫動作已由系統完成,不再需要用戶動作,故有此區別,關于這一點,后面在介紹基于 IOCP 的調度器時詳述。

      類 Unix 系統上的 IO 多路分離器比較多,早期的 select 就能監控 IO 句柄的讀、寫、異常三個事件集,并且帶超時能力;后面發展的 poll 消除了 select 對句柄數量的限制;Linux 上誕生的 epoll 解決了 select & poll 在句柄數量增長時效能線性下降的問題,主要優化了句柄集合在用戶態與內核態的來回復制、返回時遍歷句柄集等性能開銷;kqueue 則是 BSD 系統上的 epoll 平替,兩者都支持水平觸發與邊緣觸發兩種模式。

      水平觸發意味著只要句柄上有事件,分離器就會一直通知,上述四個默認都是水平觸發,適合少量離散數據的場景;邊緣觸發意味著一次通知中如果不將對應的事件處理完,下次不會再通知,除非有新的事件產生,epoll / kqueue 可選邊緣觸發,適合大數據量的場景,可以有效緩解高頻通知導致的數據傳輸低效問題。

      惡補了 IO 多路復用機制相關的知識后,考慮到我們是在 Linux 上進行測試,這里選取了 epoll 作為分離器。需要注意的是 epoll 不能直接處理普通文件讀寫,需要借助 fifo 文件,后面我們會看到這一點,話不多說直接上 demo:

      #include <coroutine>
      #include <unordered_map>
      #include <sys/epoll.h>
      #include <unistd.h>
      #include <fcntl.h>
      #include <vector>
      #include <stdexcept>
      #include <iostream>
      #include <sstream>
      
      #define MAX_EVENTS 10
      
      struct Task {
          struct promise_type {
              Task get_return_object() { return {}; }
              std::suspend_never initial_suspend() { return {}; }
              std::suspend_never final_suspend() noexcept { return {}; }
              void return_void() {}
              void unhandled_exception() { std::terminate(); }
          };
      };
      
      class EpollScheduler {
      private:
          int epoll_fd;
          std::unordered_map<int, std::coroutine_handle<>> io_handles;
      public:
          EpollScheduler() {
              epoll_fd = epoll_create(MAX_EVENTS);
              if (epoll_fd == -1) {
                  std::stringstream ss;
                  ss << "epoll_create failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
          }
      
          ~EpollScheduler() {
              close(epoll_fd);
          }
      
          void register_io(int fd, std::coroutine_handle<> handle) {
              if (io_handles.find(fd) == io_handles.end()) {
                  io_handles[fd] = handle;
      
                  epoll_event event{};
                  event.events = EPOLLIN | EPOLLET; 
                  event.data.fd = fd;
                  if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
                      std::stringstream ss;
                      ss << "epoll_ctl failed, error " << errno; 
                      throw std::runtime_error(ss.str());
                  }
              }
          }
      
          void run() {
              while (true) {
                  epoll_event events[MAX_EVENTS] = { 0 };
                  int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
                  for (int i = 0; i < n; ++i) {
                      int ready_fd = events[i].data.fd;
                      if (auto it = io_handles.find(ready_fd); it != io_handles.end()) {
                          it->second.resume(); 
                      }
                  }
              }
          }
      };
      
      struct AsyncReadAwaiter {
          EpollScheduler& sched;
          int fd;
          std::string buffer; 
      
          AsyncReadAwaiter(EpollScheduler& s, int file_fd, size_t buf_size) 
              : sched(s), fd(file_fd), buffer(buf_size, '\0') {}
      
          bool await_ready() const { 
              return false;
          }
      
          void await_suspend(std::coroutine_handle<> h) {
              sched.register_io(fd, h); 
          }
      
          std::string await_resume() {
              ssize_t n = read(fd, buffer.data(), buffer.size());
              if (n == -1) {
                  std::stringstream ss;
                  ss << "read failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              buffer.resize(n);
              return std::move(buffer);
          }
      };
      
      Task async_read_file(EpollScheduler& sched, const char* path) {
          int fd = open(path, O_RDONLY | O_NONBLOCK);
          if (fd == -1) {
              std::stringstream ss;
              ss << "open failed, error " << errno; 
              throw std::runtime_error(ss.str());
          }
      
          while (true) {
              auto data = co_await AsyncReadAwaiter(sched, fd, 4096);
              std::cout << "Read " << data.size() << " bytes\n";
              // if (data.size() == 0)
              //     break; 
          }
          close(fd);
      }
      
      int main(int argc, char* argv[]) {
          if (argc < 2) { 
              std::cout << "Usage: sample pipe" << std::endl; 
              return 1; 
          }
      
          EpollScheduler scheduler;
          async_read_file(scheduler, argv[1]);
          scheduler.run();
          return 0;
      }

      先來看編譯,公司的開發環境中安裝的 gcc 最高版本為 12.1:

      $ /opt/compiler/gcc-12/bin/g++ --version
      /opt/compiler/gcc-12/bin/g++ (GCC) 12.1.0
      Copyright (C) 2022 Free Software Foundation, Inc.
      This is free software; see the source for copying conditions.  There is NO
      warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

      經 Compile Explorer 驗證,可用:

      一點點降低版本嘗試,發現能編譯這段代碼的最低 gcc 版本是 11.1,如果你需要在本地安裝 gcc 的話,大于等于這個版本就行。

      包裝一個簡單的 Makefile:

      all: sample
      
      sample : sample.cpp
      	/opt/compiler/gcc-12/bin/g++ -std=c++20 -o $@ $^
      	mkfifo communication.pipe
      
      clean:
      	rm sample communication.pipe

      mkfifo 用于管道文件 (communication.pipe) 的創建。啟動 sample 程序后可以在管道另一側用腳本寫一些數據進去:

      for ((i=1;i<500;++i)); do  echo hello > communication.pipe; done

      寫入 500 個 hello 字符串,接收端的 sample 輸出如下:

      $ ./sample communication.pipe
      Read 6 bytes
      Read 60 bytes
      Read 6 bytes
      Read 54 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 12 bytes
      Read 0 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      Read 6 bytes
      ...

      demo 唯一的參數是 pipe 文件路徑。如果使用普通文件做同樣的測試:

      $ ./sample sample.cpp
      terminate called after throwing an instance of 'std::runtime_error'
        what():  epoll_ctl failed, error 1
      Aborted (core dumped)

      果然報錯了,這就是開頭所說 epoll 不支持普通文件的特性:對于普通文件,Linux 認為永遠可讀可寫,沒必要通過 epoll 進行等待,所以 epoll_ctl 直接返回 EPERM 了。

      這個順便演示了 C++20 編譯器會對協程體代碼進行 try...catch 的邏輯,任何未捕獲的異常終將調用用戶承諾對象的 unhandled_exception 接口,這里調了 terminate 來終止進程,關于這一點,請參考《協程本質是函數加狀態機》。

       

      代碼比較長,下面分段看下:

      #include <coroutine>
      #include <unordered_map>
      #include <sys/epoll.h>
      #include <unistd.h>
      #include <fcntl.h>
      #include <vector>
      #include <stdexcept>
      #include <iostream>
      #include <sstream>
      
      #define MAX_EVENTS 10

      返回對象定義,相比之前經典的定義,承諾對象的 final_suspend 未中斷協程、返回對象沒有析構時銷毀協程句柄的動作,意味著協程是個啟動后“不管”的類型

      struct Task {
          struct promise_type {
              Task get_return_object() { return {}; }
              std::suspend_never initial_suspend() { return {}; }
              std::suspend_never final_suspend() noexcept { return {}; }
              void return_void() {}
              void unhandled_exception() { std::terminate(); }
          };
      };

      跳到 main,果然沒有接收協程體 async_read_file 的返回對象,它返回的臨時對象將自動析構,不影響協程體正常運轉

      int main(int argc, char* argv[]) {
          if (argc < 2) { 
              std::cout << "Usage: sample pipe" << std::endl; 
              return 1; 
          }
      
          EpollScheduler scheduler;
          async_read_file(scheduler, argv[1]);
          scheduler.run();
          return 0;
      }

      回到調度器,構造與析構負責 epoll 句柄的生命周期管理,聯系 main 中 scheduler 的定義,它會貫穿整個進程生命期

      class EpollScheduler {
      private:
          int epoll_fd;
          std::unordered_map<int, std::coroutine_handle<>> io_handles;
      public:
          EpollScheduler() {
              epoll_fd = epoll_create(MAX_EVENTS);
              if (epoll_fd == -1) {
                  std::stringstream ss;
                  ss << "epoll_create failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
          }
      
          ~EpollScheduler() {
              close(epoll_fd);
          }

      調度器提供協程注冊接口。與之前相比這里不再使用簡單的先進先出隊列,而是將文件句柄與協程句柄通過 map 關聯起來,方便后面根據事件句柄喚醒協程

          void register_io(int fd, std::coroutine_handle<> handle) {
              if (io_handles.find(fd) == io_handles.end()) {

      select 或 poll 需要每次檢測前都準備句柄集,epoll 則不同,句柄只需注冊一次,后續就能一直監聽該句柄上的事件,重復注冊還會導致 epoll_ctl 返回失敗,因此這里有判重邏輯

                  io_handles[fd] = handle;

      只注冊讀事件 (EPOLLIN),并且使用邊緣觸發模式 (EPOLLET) 

                  epoll_event event{};
                  event.events = EPOLLIN | EPOLLET; 
                  event.data.fd = fd;
                  if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
                      std::stringstream ss;
                      ss << "epoll_ctl failed, error " << errno; 
                      throw std::runtime_error(ss.str());
                  }
              }
          }

      調度器提供的運行接口,循環 wait IO 事件,有讀事件才喚醒對應的協程

          void run() {
              while (true) {
                  epoll_event events[MAX_EVENTS] = { 0 };
                  int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
                  for (int i = 0; i < n; ++i) {
                      int ready_fd = events[i].data.fd;
                      if (auto it = io_handles.find(ready_fd); it != io_handles.end()) {
                          it->second.resume(); 
                      }
                  }
              }
          }
      };

      協程體內部打開文件句柄準備進行異步讀取 (O_NONBLOCK),每次通過等待對象讀取數據并展示在控制臺,與之前 co_await 純粹用于掛起協程等待相比,這里可以通過它返回數據

      Task async_read_file(EpollScheduler& sched, const char* path) {
          int fd = open(path, O_RDONLY | O_NONBLOCK);
          if (fd == -1) {
              std::stringstream ss;
              ss << "open failed, error " << errno; 
              throw std::runtime_error(ss.str());
          }
      
          while (true) {
              auto data = co_await AsyncReadAwaiter(sched, fd, 4096);
              std::cout << "Read " << data.size() << " bytes\n";
              // if (data.size() == 0)
              //     break; 
          }
          close(fd);
      }

      等待對象是本次的核心:await_ready 返回 false 掛起協程;await_suspend 在協程掛起前注冊協程句柄到調度器;await_resume 在協程恢復后讀取數據,并返回給 co_await 調用者

      struct AsyncReadAwaiter {
          EpollScheduler& sched;
          int fd;
          std::string buffer; 
      
          AsyncReadAwaiter(EpollScheduler& s, int file_fd, size_t buf_size) 
              : sched(s), fd(file_fd), buffer(buf_size, '\0') {}
      
          bool await_ready() const { 
              return false;
          }
      
          void await_suspend(std::coroutine_handle<> h) {
              sched.register_io(fd, h); 
          }
      
          std::string await_resume() {
              ssize_t n = read(fd, buffer.data(), buffer.size());
              if (n == -1) {
                  std::stringstream ss;
                  ss << "read failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              buffer.resize(n);
              return std::move(buffer);
          }
      };

      老規矩,下面有請 C++ Insights 上場,看看編譯器底層做的工作與之前相比有何差異,內容比較多,只撿關鍵的看下:

      struct __async_read_fileFrame
      {
        void (*resume_fn)(__async_read_fileFrame *);
        void (*destroy_fn)(__async_read_fileFrame *);
        std::__coroutine_traits_impl<Task>::promise_type __promise;
        int __suspend_index;
        bool __initial_await_suspend_called;

      協程狀態與之前別無二致,注意除了參數外,局部變量如 ss、data 也都放進來了,因此在編寫協程體時需要格外注意,能放在內部調用的變量,不要直接放在協程體

        EpollScheduler & sched;
        const char * path;
        int fd;
        std::basic_stringstream<char> ss;
        std::basic_string<char, std::char_traits<char>, std::allocator<char> > data;
        std::suspend_never __suspend_100_6;
        AsyncReadAwaiter __suspend_109_30;
        std::basic_string<char, std::char_traits<char>, std::allocator<char> > __suspend_109_30_res;
        std::suspend_never __suspend_100_6_1;
      };

      真正的協程體邏輯被挪到協程 resume 中了

      /* This function invoked by coroutine_handle<>::resume() */
      void __async_read_fileResume(__async_read_fileFrame * __f)
      {
        try 
        {

      開頭就是 duff device,這個之前已經見識過了

          /* Create a switch to get to the correct resume point */
          switch(__f->__suspend_index) {
            case 0: break;
            case 1: goto __resume_async_read_file_1;
            case 2: goto __resume_async_read_file_2;
            case 3: goto __resume_async_read_file_3;
          }

      initial_suspend 返回 suspend_never 直接跳過不掛起

          /* co_await insights.cpp:100 */
          __f->__suspend_100_6 = __f->__promise.initial_suspend();
          if(!__f->__suspend_100_6.await_ready()) {
            __f->__suspend_100_6.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
            __f->__suspend_index = 1;
            __f->__initial_await_suspend_called = true;
            return;
          } 
          
          __resume_async_read_file_1:
          __f->__suspend_100_6.await_resume();

      打開文件,失敗直接拋異常

          __f->fd = open(__f->path, 0 | 2048);
          if(__f->fd == -1) {
            __f->ss = std::basic_stringstream<char>();
            std::operator<<(__f->ss, "open failed, error ").operator<<((*__errno_location()));
            throw std::runtime_error(std::runtime_error(__f->ss.str()));
          } 

      循環讀文件,AsyncWaitReader 的 await_ready 返回 false 掛起協程,掛起前調用 await_suspend 注冊協程到調度器

          while(true) {
            
            /* co_await insights.cpp:109 */
            __f->__suspend_109_30 = AsyncReadAwaiter(__f->sched, __f->fd, 4096);
            if(!__f->__suspend_109_30.await_ready()) {
              __f->__suspend_109_30.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
              __f->__suspend_index = 2;
              return;
            } 

      文件句柄上有可讀數據時,調度器恢復協程運行,AsyncWaitReader 的 await_resume 讀取數據并記錄在 data 中

            __resume_async_read_file_2:
            __f->__suspend_109_30_res = __f->__suspend_109_30.await_resume();
            __f->data = __f->__suspend_109_30_res;
            std::operator<<(std::operator<<(std::cout, "Read ").operator<<(__f->data.size()), " bytes\n");
          }

      結束循環前關閉句柄 (目前是個死循環走不到這里),協程終止前調用承諾對象的 return_void,有未捕獲異常時調用承諾對象的 unhandled_exception

          close(__f->fd);
          /* co_return insights.cpp:100 */
          __f->__promise.return_void()/* implicit */;
          goto __final_suspend;
        } catch(...) {
          if(!__f->__initial_await_suspend_called) {
            throw ;
          } 
          
          __f->__promise.unhandled_exception();
        }

      final_suspend 返回 suspend_never 直接跳過不掛起,調用 destroy 自動銷毀協程狀態釋放內存

        __final_suspend:
        
        /* co_await insights.cpp:100 */
        __f->__suspend_100_6_1 = __f->__promise.final_suspend();
        if(!__f->__suspend_100_6_1.await_ready()) {
          __f->__suspend_100_6_1.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
          __f->__suspend_index = 3;
          return;
        } 
        
        __resume_async_read_file_3:
        __f->destroy_fn(__f);
      }

      有上一篇的鋪墊,看起來沒什么尿點,甚至有點老三樣。唯一有新意的地方是 co_await 也能通過 await_resume 獲取返回數據,這與 co_yield & co_return 有異曲同工之妙,體現出 C++20 協程靈活的一面。

      多文件并行

      上面的例子雖然通過多次讀取展示了協程多次喚醒的過程,但沒有展示多個 IO 句柄并發的能力,下面稍加改造,同時讀取多個 fifo:

      Task async_read_file(EpollScheduler& sched, const char* path) {
      ...
          while (true) {
              auto data = co_await AsyncReadAwaiter(sched, fd, 4096);
              std::cout << "Read [" << data.size() << "] " << data;
              if (data.size() == 0)
                  std::cout << std::endl; 
          }
      ...
      }
      
      int main(int argc, char* argv[]) {
          if (argc < 3) { 
              std::cout << "Usage: sample pipe1 pipe2" << std::endl; 
              return 1; 
          }
      
          EpollScheduler scheduler;
          async_read_file(scheduler, argv[1]);
          async_read_file(scheduler, argv[2]);
          scheduler.run();
          return 0;
      }

      主要的改動是:

      * 協程體展示數據內容,便于區分是從哪個 fifo 讀到了數據

      * demo 接收兩個 pipe 路徑,分別調用兩個協程進行處理

      對應的,修改寫數據的腳本:

      $ for ((i=1;i<500;++i)); do if [ $((i%2)) -eq 0 ]; then echo hello > communication.pipe; else echo world > communication2.pipe; fi; done

      交替在兩個 pipe 上寫入 hello 與 world,下面是程序輸出:

      $ ./sample communication.pipe communication2.pipe
      Read [6] world
      Read [18] world
      world
      world
      Read [24] hello
      hello
      hello
      hello
      Read [0]
      Read [6] world
      Read [12] hello
      hello
      Read [12] world
      world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [0]
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [0]
      ...

      讀取看起來并不是嚴格的交替執行,這與 pipe 可讀時積累的數據量有關,如果讀取前發送端已經累計發送了多次,就會出現上面的情況。不論怎樣,這里實現了用協程并行讀取文件的能力,并且不需要對跨協程的公共變量做任何并發防護 (如調度器內部 map),且每個文件的讀取邏輯清晰易懂,這可能就是協程的魅力吧。讀取 N 個文件的場景 (N>2),都可以參考上面的進行拓展,此處就不再贅述了。

      最后補充一張調用順序圖:

      為了便于繪制,調度器的 register_io & run 分開畫了。另外非首次讀取時,沒有 7-8 這條路徑,取而代之的是 run 內部的事件循環。

      await_suspend & 試讀

      眾所周知讀寫異步 IO 句柄 (O_NONBLOCK) 時不會被阻塞,當系統能滿足用戶請求時,會讀取盡可能多的數據返回;當沒有可用數據時,系統立即返回一個錯誤,一般是 EAGAINEWOULDBLOCK (Windows),此時再進入 epoll 等待也不遲,當數據比較頻繁時能節約相當可觀的 epoll 等待與喚醒,從而提高吞吐性能。

      回到 demo,試讀的結果決定是否掛起協程,因此最佳的判斷位置是在 await_ready,下面是改造后的代碼:

      struct AsyncReadAwaiter {
          EpollScheduler& sched;
          int fd;
          int len; 
          std::string buffer; 
      
          AsyncReadAwaiter(EpollScheduler& s, int file_fd, size_t buf_size) 
              : sched(s), fd(file_fd), len(0), buffer(buf_size, '\0') { }
          
          bool await_ready()  { 
              len = 0; 
              ssize_t n = read(fd, buffer.data(), buffer.size());
              if (n > 0) { 
                  len = n; 
                  return true; 
              } else if (n == -1 && errno != EAGAIN) {
                  std::stringstream ss;
                  ss << "pre read failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              return false;
          }
      
          void await_suspend(std::coroutine_handle<> h) {
              sched.register_io(fd, h); 
          }
      
          std::string await_resume() {
              ssize_t n = read(fd, buffer.data() + len, buffer.size() - len);
              if (n == -1) {
                  if (len > 0) { 
                      buffer.resize(len); 
                      return std::move(buffer);
                  }
      
                  if (errno != EAGAIN) {
                      std::stringstream ss;
                      ss << "read failed, error " << errno; 
                      throw std::runtime_error(ss.str());
                  }
      
                  n = 0; 
              }
      
              buffer.resize(n + len);
              if (len > 0) {
                  std::cout << "pre-read " << len << ", read " << n << std::endl; 
              }
              return std::move(buffer);
          }
      };

      內容不長,不過也分段解讀下:

      struct AsyncReadAwaiter {
          EpollScheduler& sched;
          int fd;

      增加 len 字段記錄試讀的結果長度

          int len; 
          std::string buffer; 
      
          AsyncReadAwaiter(EpollScheduler& s, int file_fd, size_t buf_size) 
              : sched(s), fd(file_fd), len(0), buffer(buf_size, '\0') { }
      
          bool await_ready()  { 
              len = 0; 

      增加一次讀取,若成功記錄讀取的長度,返回 true 繼續協程

              ssize_t n = read(fd, buffer.data(), buffer.size());
              if (n > 0) { 
                  len = n; 
                  return true; 

      非 EAGAIN 錯誤直接拋異常

              } else if (n == -1 && errno != EAGAIN) {
                  std::stringstream ss;
                  ss << "pre read failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }

      EAGAIN 無數據,返回 false 掛起協程等待

              return false;
          }

      掛起前調用 register_io 注冊協程句柄

          void await_suspend(std::coroutine_handle<> h) {
              sched.register_io(fd, h); 
          }

      在正式讀取時跳過試讀的長度,避免數據覆蓋

          std::string await_resume() {
              ssize_t n = read(fd, buffer.data() + len, buffer.size() - len);
              if (n == -1) {

      若讀取失敗且有試讀數據,直接返回試讀數據

                  if (len > 0) { 
                      buffer.resize(len); 
                      return std::move(buffer);
                  }

      若非 EAGAIN 錯誤直接拋出異常,否則重置 n 的長度為 0,防止將 -1 加和到最終長度

                  if (errno != EAGAIN) {
                      std::stringstream ss;
                      ss << "read failed, error " << errno; 
                      throw std::runtime_error(ss.str());
                  }
      
                  n = 0; 
              }

      若成功,將結果與試讀結果合并后返回給用戶

              buffer.resize(n + len);
              if (len > 0) {
                  std::cout << "pre-read " << len << ", read " << n << std::endl; 
              }
              return std::move(buffer);
          }
      };

      主要的改動已經在代碼中解讀了,下面是程序運行效果:

      $ ./sample communication.pipe communication2.pipe
      Read [6] world
      pre-read 30, read 0
      Read [30] world
      world
      world
      world
      world
      pre-read 6, read 0
      Read [6] world
      Read [0]
      Read [42] hello
      hello
      hello
      hello
      hello
      hello
      hello
      Read [6] world
      Read [24] hello
      hello
      hello
      hello
      pre-read 6, read 0
      Read [6] hello
      Read [24] world
      world
      world
      world
      Read [0]
      Read [6] world
      Read [0]
      Read [12] hello
      hello
      Read [12] world
      world
      Read [0]
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      Read [6] world
      Read [6] hello
      ...

      新增的 pre-read 日志就是試讀成功的場景,看起來發生次數并不多,可能是數據量比較小的緣故。一般在試讀成功后,正式讀取時就沒有數據了。

      總流程變為兩條路徑:

      * 返回 true:await_ready -> await_resume

      * 返回 false:await_ready -> await_suspend -> 掛起等待 -> await_resume

      注意為了能在 await_ready 中修改成員 len 的內容,將接口 const 修飾符去掉了,編譯器似乎對這些細節沒有要求,只要函數主體簽名能對得上就 ok。

       

      一些細心的讀者可能注意到了,std::string::resize() 會在擴張字符串尺寸時,將當前 size 到新 size 之間的內容重置為 '\0',一般不適用于搭配 read 讀取數據使用,之前的例子可以這樣做,是基于以下幾個事實:

      * AsyncReadAwaiter 構造函數中將其初始化為最大尺寸: buffer(buf_size, '\0')

      * 讀取成功后調用 resize 屬于尺寸縮小,因此不存在數據重置的問題

      * 第二次讀取時會重新構造一個 AsyncReadAwaiter 臨時對象,舊的會隨著作用域的結束自動析構,從而保證了 buffer 每次都始化為最大長度

      新例子中 1、3 點保持了延續,第 2 點也得到了妥善的處理:

      * 試讀時只記錄讀取長度 len,不進行 resize 操作

      * 正式讀取時

      ??* 若失敗,有試讀內容時,直接 resize(len) 并返回

      ??* 若成功,resize(n+len) 并返回

      換句話說,最終總能保證從最大尺寸縮小到目標尺寸,而不是分別 resize(len)resize (len+n),從而避免 size 增長和內容重置。

      經過多輪測試,終于復現了一次試讀與正式讀取都有內容的場景:

      $ ./sample communication.pipe communication2.pipe
      Read [10] world-war
      pre-read 20, read 10
      Read [30] world-war
      world-war
      world-war
      Read [0]
      Read [24] hello
      hello
      hello
      hello
      Read [10] world-war
      Read [6] hello
      Read [0]
      Read [10] world-war
      Read [30] hello
      hello
      hello
      hello
      hello
      Read [40] world-war
      world-war
      world-war
      world-war
      pre-read 10, read 0
      Read [10] world-war
      Read [0]
      Read [0]
      Read [6] hello
      pre-read 6, read 0
      Read [6] hello
      Read [10] world-war
      Read [0]
      ...

      為了避免 hello 與 world 同長度掩蓋問題,這里修改了寫入 communication2.pipe 的內容為 world-war,這樣在讀取 hello 后再讀取 world-war,size 增長了而內容沒有被截斷,可以證明之前的結論 1、3;在第一次 pre-read 過程中,先讀取 20,后讀取 10,總長度 30,size 也增長了,最終輸出的內容沒截斷,證明了結論 2。

      可以看到,使用 read 搭配 std::string::resize() 處理數據是非常麻煩的,不建議在真實的環境中使用,這里主要是出于便于演示的目的。

       

      行文至此,本節的主角還沒有登場:其實 await_suspend 這個接口也可以返回 bool 值,true 表示掛起,false 表示繼續,與 await_ready 剛好相反,下面改它試試:

          bool await_ready() const { 
              return false;
          }
      
          bool await_suspend(std::coroutine_handle<> h) {
              len = 0; 
              ssize_t n = read(fd, buffer.data(), buffer.size());
              if (n > 0) { 
                  len = n; 
                  return false; 
              } else if (n == 0 || (n == -1 && errno == EAGAIN)) {
                  sched.register_io(fd, h); 
                  return true; 
              } else {
                  std::stringstream ss;
                  ss << "pre read failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
          }

      修改局限于上面兩個接口中,主要是將試讀從 await_ready 移到了 await_suspend 中,其它沒有變化;新的組織形式,讓 await_ready 顯得不那么臃腫了,看起來更協調和具有可讀性,更推薦這種形式。

      通過 C++ Insights 看下新 await_suspend 的編譯器中間結果:

          while(true) {
            
            /* co_await insights.cpp:129 */
            __f->__suspend_129_30 = AsyncReadAwaiter(__f->sched, __f->fd, 4096);
            if(!__f->__suspend_129_30.await_ready()) {
              if(__f->__suspend_129_30.await_suspend(std::coroutine_handle<Task::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>())) {
                __f->__suspend_index = 2;
                return;
              } 
              
            } 
            
            __resume_async_read_file_2:
            __f->__suspend_129_30_res = __f->__suspend_129_30.await_resume();
            __f->data = __f->__suspend_129_30_res;
            std::operator<<(std::operator<<(std::operator<<(std::cout, "Read [").operator<<(__f->data.size()), "] "), __f->data);
            if(__f->data.size() == 0) {
              std::cout.operator<<(std::endl);
            } 
            
          }

      它被放置到了 if 條件中,總的流程變為:

      * 返回 true:await_ready -> await_suspend -> 掛起等待 -> await_resume

      * 返回 false:await_ready -> await_suspend -> await_resume

      可以期望當 await_suspend 返回 false 時,后續的 await_resume 會被立即調用。

      signalfd & 完美退出

      上面的 demo 目前只能通過 Ctrl C 強制殺死,畢竟調度器的 run 是個死循環沒法退出。用來做做演示沒問題,但是要用來開發項目就不行了,本著做出工業級強度代碼的使命感,下面對它進行一番改造,看看能否實現完美退出。

      核心思路是檢測用戶按下 Ctrl C 讓 epoll_wait 感知并退出 run 循環,按下 Ctrl C 簡單,等價于處理 SIGINT 信號,但讓 epoll 感知比較難,查了下 deepseek 給了三種方案:

      * 通過 signalfd 將信號轉化為 IO 事件,交給 epoll 統一處理

      * 建立一個進程內的 pipe 通道,注冊到 epoll,在檢測到 SIGINT 事件時寫入一字節以喚醒 epoll_wait 并退出

      * 信號處理器設置一個標志位,使用 epoll_wait 的超時功能,定時檢測該標志位

      方案 III 有延遲,首先排除;方案 II 就是傳說中的 self-pipe trick,比較通用但不夠高效;方案 I 最直接,也比較適合 Linux,就它了:

      #include <signal.h>
      #include <sys/signalfd.h>
      
      class EpollScheduler {
      private:
          int epoll_fd;
          int signal_fd; 
          std::unordered_map<int, std::coroutine_handle<>> io_handles;
      public:
          EpollScheduler(int signum) {
              epoll_fd = epoll_create(MAX_EVENTS);
              if (epoll_fd == -1) {
                  std::stringstream ss;
                  ss << "epoll_create failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              sigset_t mask;
              sigemptyset(&mask);
              sigaddset(&mask, signum);
              sigprocmask(SIG_BLOCK, &mask, NULL);
              signal_fd = signalfd(-1, &mask, SFD_NONBLOCK);
              if (signal_fd == -1) { 
                  std::stringstream ss;
                  ss << "signalfd failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              struct epoll_event ev;
              ev.events = EPOLLIN;
              ev.data.fd = signal_fd;
              if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev) == -1) {
                  std::stringstream ss;
                  ss << "epoll_ctl failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              std::cout << "register signal " << signum << " as fd " << signal_fd << std::endl; 
          }
      
          ~EpollScheduler() {
              for(auto handle : io_handles) {
                  std::cout << "coroutine destroy" << std::endl; 
                  handle.second.destroy(); 
              }
              close(signal_fd); 
              close(epoll_fd);
          }
      
      ...
      
          void run() {
              while (true) {
                  epoll_event events[MAX_EVENTS] = { 0 };
                  int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
                  for (int i = 0; i < n; ++i) {
                      int ready_fd = events[i].data.fd;
                      if (ready_fd == signal_fd) {
                          struct signalfd_siginfo fdsi = { 0 };
                          read(signal_fd, &fdsi, sizeof(fdsi));
                          std::cout << "signal " << fdsi.ssi_signo << " detected, exit..." << std::endl; 
                          return; 
                      }
      
                      if (auto it = io_handles.find(ready_fd); it != io_handles.end()) {
                          it->second.resume(); 
                      }
                  }
              }
          }
      };

      改動主要集中在 EpollScheduler 類的構造、析構與 run 方法。內容不長,分段解讀一下:

      class EpollScheduler {
      private:
          int epoll_fd;

      增加成員記錄信號對應的句柄,方便后續在 epoll_wait 返回時做對比

          int signal_fd; 
          std::unordered_map<int, std::coroutine_handle<>> io_handles;
      public:

      構造函數接收一個信號作為監聽對象,main 中會傳遞 SIGINT 或 SIGQUIT

          EpollScheduler(int signum) {
              epoll_fd = epoll_create(MAX_EVENTS);
              if (epoll_fd == -1) {
                  std::stringstream ss;
                  ss << "epoll_create failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }

      構建信號對應的異步文件句柄

              sigset_t mask;
              sigemptyset(&mask);
              sigaddset(&mask, signum);

      下面這句是關鍵,如果不屏蔽默認的信號處理方式,默認的信號處理器會讓進程退出,epoll 就沒機會啦

              sigprocmask(SIG_BLOCK, &mask, NULL);
              signal_fd = signalfd(-1, &mask, SFD_NONBLOCK);
              if (signal_fd == -1) { 
                  std::stringstream ss;
                  ss << "signalfd failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }

      將信號句柄注冊到 epoll,成功時打印一條日志,失敗時拋異常

              struct epoll_event ev;
              ev.events = EPOLLIN;
              ev.data.fd = signal_fd;
              if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev) == -1) {
                  std::stringstream ss;
                  ss << "epoll_ctl failed, error " << errno; 
                  throw std::runtime_error(ss.str());
              }
      
              std::cout << "register signal " << signum << " as fd " << signal_fd << std::endl; 
          }

      析構除了增加信號句柄的關閉,還增加了掛起協程的銷毀,如果調度器的生命周期與進程不一致時 (多次初始化與銷毀調度器),這就比較關鍵了,可以防止協程泄漏

          ~EpollScheduler() {
              for(auto handle : io_handles) {
                  std::cout << "coroutine destroy" << std::endl; 
                  handle.second.destroy(); 
              }
              close(signal_fd); 
              close(epoll_fd);
          }
      
      ...
          
          void run() {
              while (true) {
                  epoll_event events[MAX_EVENTS] = { 0 };
                  int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
                  for (int i = 0; i < n; ++i) {
                      int ready_fd = events[i].data.fd;

      epoll_wait 返回時,優先處理信號句柄上的事件

                      if (ready_fd == signal_fd) {
                          struct signalfd_siginfo fdsi = { 0 };
                          read(signal_fd, &fdsi, sizeof(fdsi));
                          std::cout << "signal " << fdsi.ssi_signo << " detected, exit..." << std::endl; 
                          return; 
                      }

      之后才是普通 IO 事件及協程的恢復

                      if (auto it = io_handles.find(ready_fd); it != io_handles.end()) {
                          it->second.resume(); 
                      }
                  }
              }
          }
      };

       下面是程序運行效果:

      $ ./sample communication.pipe communication2.pipe
      register signal 2 as fd 4
      Read [10] world-war
      pre-read 30, read 0
      Read [30] world-war
      world-war
      world-war
      pre-read 10, read 0
      Read [10] world-war
      Read [0]
      ...
      Read [6] hello
      Read [10] world-war
      Read [6] hello
      Read [10] world-war
      Read [0]
      Read [6] hello
      Read [10] world-war
      ^Csignal 2 detected, exit...
      coroutine destroy
      coroutine destroy

      內容比較長,中間忽略了一部分;開始的 register 日志顯示新增的信號句柄值為 4;最后的 ^C 是用戶按下了 Ctrl C,demo 能正常檢測到信號值為 2 并退出事件循環,析構中還銷毀了兩個掛起的協程,符合預期。

      其實不光事件循環存在完美退出的問題,單個 IO 句柄也存在同樣的問題:正常的管道不可能一直讀下去。當 writer 關閉管道或連接斷開時,應該檢測此種場景并加以處理,例如將 fd 從 epoll 中移除,從而讓 IO 句柄也能完美退出。但不幸的是,目前選取的 fifo 文件,在 O_NONBLOCK 模式下,似乎無法感知對端關閉這種操作,傳統的 read 返回 0 并不能代表這種情況,像上面的輸出中,在正常的傳輸過程中,就會出現多次 read 返回 0 的情況,顯然并不是對端關閉管道所致 (也不是 read 返回 EAGAIN 的問題,這個我加日志確認過了)。不過對于 socket,還是可以通過 read 返回 0 來檢測連接斷開的場景,這個就當作課外題就交給感興趣的讀者吧 ~

      結語

      本文介紹了一種基于真實 IO 事件驅動的協程調度器,通過特定的等待對象,實現協程在沒有異步事件時掛起等待、異步事件到達時恢復運行的邏輯,更加貼近實際應用場景。除此之外,還說明了 await_suspend 與試讀寫、signalfd 與進程完美退出的關系等,可用于構建工業級強度的代碼。

      最后,由于本文中 demo 經歷多次迭代,想要復制最終版的代碼進行驗證的小伙伴,可以 follow 這個開源 git 庫獲取:cpp20coroutine

      本文的 demo 是基于 Linux epoll 的,下一篇來看看怎么用 Windows 的 IOCP 實現類似的能力。

      參考

      [1]. epoll_ctl : Operation not permitted error - c program

      [2]. std::string::resize() 對緩沖區一些用處

      [3]. select/poll/epoll對比分析

      [4]. Netty - 五種 I/O 多路復用機制 select、poll、epoll、kqueue、iocp(windows) 對比

      [5]. 水平觸發和邊緣觸發

      posted @ 2025-08-18 09:57  goodcitizen  閱讀(884)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 超碰伊人久久大香线蕉综合| 日本国产精品第一页久久| 亚洲国产成人va在线观看天堂| 亚洲AV永久无码一区| 黑人av无码一区| 亚洲精品国产一区二区三| 久久精品国产精品亚洲蜜月| 风韵丰满熟妇啪啪区老熟熟女| 亚洲一区二区日韩综合久久| 中文字幕日韩区二区三区| 亚洲综合成人av在线| 国产精品久久国产精麻豆99网站| 国产在线观看免费观看| 熟女人妻视频| 久久国产精品精品国产色| 少妇愉情理伦片丰满丰满午夜| 亚洲一区二区三区四区| 国产线播放免费人成视频播放| 中文字幕在线精品人妻| 国产熟女丝袜av一二区| 亚洲精品自拍在线视频| 亚洲a∨无码无在线观看| 成人自拍短视频午夜福利| 91孕妇精品一区二区三区| 国产熟睡乱子伦视频在线播放| 四虎影视一区二区精品| 69天堂人成无码免费视频| 永吉县| 老熟女重囗味hdxx69| 久久亚洲人成网站| 国产亚洲精品综合一区二区| 国产精品免费观在线| 国产午夜精品福利视频| 久久国产综合色免费观看| 无码人妻丰满熟妇区96| 久久综合色一综合色88欧美| 97亚洲色欲色欲综合网| 国产一区二区日韩在线| 国产亚洲av夜间福利香蕉149| 日韩熟女乱综合一区二区| 狠狠人妻久久久久久综合九色|