協程模塊
協程模塊概述
一、概念
可以簡單的認為:協程就是用戶態的線程,但是上下文切換的時機是靠調用方(寫代碼的開發人員)自身去控制的;
對比
首先介紹一下為什么要使用協程。從了解進程,線程,協程之間的區別開始。
- 從定義來看
- 進程是資源分配和擁有的基本單位。進程通過內存映射擁有獨立的代碼和數據空間,若沒有內存映射給進程獨立的空間,則沒有進程的概念了。
- 線程是程序執行的基本單位。線程都處在一個進程空間中,可以相互訪問,沒有限制,所以使用線程進行多任務變成十分便利,所以當一個線程崩潰,其他任何一個線程都不能幸免。每個進程中都有唯一的主線程,且只能有一個,主線程和進程是相互依存的關系,主線程結束進程也會結束。
- 協程是用戶態的輕量級線程,線程內部調度的基本單位。協程在線程上執行。
- 從系統調用來看
- 進程由操作系統進行切換,會在用戶態與內核態之間來回切換。在切換進程時需要切換虛擬內存空間,切換頁表,切換內核棧以及硬件上下文等,開銷非常大。
- 線程由操作系統進行切換,會在用戶態與內核態之間來回切換。在切換線程時需要保存和設置少量寄存器內容,開銷很小。
- 協程由用戶進行切換,并不會陷入內核態。先將寄存器上下文和棧保存,等切換回來的時候再進行恢復,上下文的切換非常快
- 從并發性來看
- 不同進程之間切換實現并發,各自占有CPU實現并行
- 一個進程內部的多個線程并發執行
- 同一時間只能執行一個協程,而其他協程處于休眠狀態,適合對任務進行分時處理
相比于多開一個線程來操作,使用協程的好處:
- 減少了線程的重復高頻創建;
- 盡量避免線程的阻塞;
- 提升代碼的可維護與可理解性(畢竟不需要考慮多線程那一套東西了);
注1:因為協程是在單線程上運行的,并不是并發執行的,是順序執行的,所以不能使用鎖來做協程的同步,這樣會直接導致線程的死鎖。
理解
最簡單的理解,可以將協程當成一種看起來花里胡哨,并且使用起來也花里胡哨的函數。
每個協程在創建時都會指定一個入口函數,這點可以類比線程。協程的本質就是函數和函數運行狀態的組合 。
協程和函數的不同之處是,函數一旦被調用,只能從頭開始執行,直到函數執行結束退出,而協程則可以執行到一半就退出(稱為yield),但此時協程并未真正結束,只是暫時讓出CPU執行權,在后面適當的時機協程可以重新恢復運行(稱為resume),在這段時間里其他的協程可以獲得CPU并運行,所以協程也稱為輕量級線程。
協程能夠半路yield、再重新resume的關鍵是協程存儲了函數在yield時間點的執行狀態,這個狀態稱為協程上下文。協程上下文包含了函數在當前執行狀態下的全部CPU寄存器的值,這些寄存器值記錄了函數棧幀、代碼的執行位置等信息,如果將這些寄存器的值重新設置給CPU,就相當于重新恢復了函數的運行。在Linux系統里這個上下文用ucontext_t結構體來表示,通getcontext()來獲取。
搞清楚協程和線程的區別。協程雖然被稱為輕量級線程,但在單線程內,協程并不能并發執行,只能是一個協程結束或yield后,再執行另一個協程,而線程則是可以真正并發執行的。其實這點也好理解,畢竟協程只是以一種花里胡哨的方式去運行一個函數,不管實現得如何巧妙,也不可能在單線程里做到同時運行兩個函數,否則還要多線程有何用?
因為單線程下協程并不是并發執行,而是順序執行的,所以不要在協程里使用線程級別的鎖來做協程同步,比如pthread_mutex_t。如果一個協程在持有鎖之后讓出執行,那么同線程的其他任何協程一旦嘗試再次持有這個鎖,整個線程就鎖死了,這和單線程環境下,連續兩次對同一個鎖進行加鎖導致的死鎖道理完全一樣。
同樣是單線程環境下,協程的yield和resume一定是同步進行的,一個協程的yield,必然對應另一個協程的resume,因為線程不可能沒有執行主體。并且,協程的yield和resume是完全由應用程序來控制的。與線程不同,線程創建之后,線程的運行和調度也是由操作系統自動完成的,但協程創建后,協程的運行和調度都要由應用程序來完成,就和調用函數一樣,所以協程也被稱為用戶態線程。
協程的特點
-
協程可以主動讓出 CPU 時間片;
-
協程可以恢復 CPU 上下文;當另一個協程繼續執行時,其需要恢復 CPU 上下文環境;
-
協程有個管理者,管理者可以選擇一個協程來運行,其他協程要么阻塞,要么ready,或者died;
-
運行中的協程將占有當前線程的所有計算資源;
-
協程天生有棧屬性,而且是 lock free;
對稱協程與非對稱協程
參考資料:https://zhuanlan.zhihu.com/p/363775637
對于“對稱”這個名詞,闡述的實際是:協程之間的關系;用大白話來說就是:對稱協程就是說協程之間人人平等,沒有誰調用誰一說,大家都是一樣的,而非對稱協程就是協程之間存在明顯的調用關系;
- 對稱協程:任何一個協程都是相互獨立且平等的,調度權可以在任意協程之間轉移;在對稱協程中,子協程可以直接和子協程切換,也就是說每個協程不僅要運行自己的入口函數代碼,還要負責選出下一個合適的協程進行切換,相當于每個協程都要充當調度器的角色,這樣程序設計起來會比較麻煩,并且程序的控制流也會變得復雜和難以管理。
- 非對稱協程:協程出讓調度權的目標只能是它的調用者,即協程之間存在調用和被調用關系。其調度可以借助專門的調度器來負責調度協程,每個協程只需要運行自己的入口函數,然后結束時將運行權交回給調度器,由調度器來選出下一個要執行的協程即可。
二、實現基礎:ucontext_t
ucontext_t介紹
協程模塊基于ucontext_t實現,基本結構如下
- ucontext_t結構體
#include <ucontext.h>
typedef struct ucontext_t {
struct ucontext_t* uc_link;
sigset_t uc_sigmask;
stack_t uc_stack;
mcontext_t uc_mcontext;
...
};
- 類成員解釋:
uc_link:為當前context執行結束之后要執行的下一個context,若uc_link為空,執行完當前context之后退出程序。
uc_sigmask:執行當前上下文過程中需要屏蔽的信號列表,即信號掩碼
uc_stack:為當前context運行的棧信息。 uc_stack.ss_sp:棧指針指向stack uc_stack.ss_sp = stack;
uc_stack.ss_size:棧大小 uc_stack.ss_size = stacksize;
uc_mcontext:保存具體的程序執行上下文,如PC值,堆棧指針以及寄存器值等信息。它的實現依賴于底層,是平臺硬件相關的。此實現不透明。
#include <ucontext.h>
void makecontext(ucontext_t* ucp, void (*func)(), int argc, ...);
int swapcontext(ucontext_t* olducp, ucontext_t* newucp);
int getcontext(ucontext_t* ucp);
int setcontext(const ucontext_t* ucp);
- 類函數族:
makecontext:初始化一個ucontext_t,func參數指明了該context的入口函數,argc為入口參數的個數,每個參數的類型必須是int類型。另外在makecontext前,一般需要顯示的初始化棧信息以及信號掩碼集同時也需要初始化uc_link,以便程序退出上下文后繼續執行。
swapcontext:原子操作,該函數的工作是保存當前上下文并將上下文切換到新的上下文運行。
getcontext:將當前的執行上下文保存在cpu中,以便后續恢復上下文
setcontext:將當前程序切換到新的context,在執行正確的情況下該函數直接切換到新的執行狀態,不會返回。
注2:setcontext執行成功不返回,getcontext執行成功返回0,若執行失敗都返回-1。若uc_link為NULL,執行完新的上下文之后程序結束。
注3:線程寄存器的上下文一般包括以下內容:通用寄存器;程序計數器;棧指針;基址指針;標志寄存器;段寄存器;浮點寄存器與擴展寄存器
實現思路
使用非對稱協程的設計思路,通過主協程創建新協程,主協程由swapIn()讓出執行權執行子協程的任務,子協程可以通過YieldToHold()讓出執行權繼續執行主協程的任務,不能在子協程之間做相互的轉化,這樣會導致回不到main函數的上下文。這里使用了兩個線程局部變量保存當前協程和主協程,切換協程時調用swapcontext,若兩個變量都保存子協程,則無法回到原來的主協程中。
Fiber::GetThis() 獲得主協程
swapIn()
Thread->man_fiber --------> sub_fiber (new(Fiber(cb)))
^
| Fiber::YieldToHold()
|
sub_fiber
三、具體實現
協程狀態
這里在sylar的基礎上進行簡化,對每個協程,只設計了3種狀態,分別是READY,代表就緒態,RUNNING,代表正在運行,TERM,代表運行結束。
與sylar版本的實現相比,去掉了INIT狀態,HOLD狀態,和EXCEPT狀態。
sylar的INIT狀態是協程對象剛創建時的狀態,這個狀態可以直接歸到READY狀態里,sylar的HOLD狀態和READY狀態與協程調度有關,READY狀態的協程會被調度器自動重新調度,而HOLD狀態的協程需要顯式地再次將協程加入調度,這兩個狀態也可以歸到READY狀態里,反正都表示可執行狀態。sylar還給協程設計了一個EXCEPT狀態,表示協程入口函數執行時出現異常的狀態,這個狀態可以不管,具體到協程調度模塊再討論。
去掉這幾個狀態后,協程的狀態模型就簡單得一目了然了,一個協程要么正在運行(RUNNING),要么準備運行(READY),要運行結束(TERM)。
狀態簡化后,唯一的缺陷是無法區分一個READY狀態的協程對象是剛創建,還是已經運行到一半yield了,這在重置協程對象時有影響。重置協程時,如果協程對象只是剛創建但一次都沒運行過,那應該是允許重置的,但如果協程的狀態是運行到一半yield了,那應該不允許重置。雖然可以把INIT狀態加上以區分READY狀態,但既然簡化了狀態,那就簡化到底,讓協程只有在TERM狀態下才允許重置,問題迎刃而解
class Fiber(協程類)
設置靜態變量
Fiber的源碼定義了兩個全局靜態變量,用于生成協程id和統計當前的協程數,對于每個線程,sylar設計了以下兩個線程局部變量用于保存協程上下文信息:
// 用于生成協程id
static std::atomic<uint64_t> s_fiber_id {0};
// 用于統計當前的協程數
static std::atomic<uint64_t> s_fiber_count {0};
?
// 約定協程棧的大小1MB
static ConfigVar<uint32_t>::ptr g_fiber_stack_size =
Config::Lookup<uint32_t>("fiber.stack_size", 1024 * 1024, "fiber stack size");
?
// 當前協程
static thread_local Fiber *t_fiber = nullptr;
// 主協程
static thread_local Fiber::ptr t_threadFiber = nullptr;
t_fiber:指向當前運行的協程,初始化時,指向線程主協程
t_threadFiber:指向線程的主協程,初始化時,指向線程主協程,當子協程resume時,主協程讓出執行權,并保存上下文到t_threadFiber的ucontext_t中,同時激活子協程的ucontext_t的上下文。當子協程yield時,子協程讓出執行權,從t_threadFiber獲得主協程上下文恢復運行。
成員變量
// 協程id
uint64_t m_id = 0;
// 協程運行棧大小
uint32_t m_stacksize = 0;
// 協程狀態
State m_state = INIT;
// 上下文
ucontext_t m_ctx;
// 協程運行棧指針
void* m_stack = nullptr;
// 協程執行方法
std::function<void()> m_cb;
成員函數:
- 構造函數
/*
* @brief 構造函數
* @attention 無參構造函數只用于創建線程的第一個協程,也就是線程主函數對應的協程,
* 這個協程只能由GetThis()方法調用,所以定義成私有方法
*/
Fiber::Fiber(){
SetThis(this);
m_state = RUNNING;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
++s_fiber_count;
m_id = s_fiber_id++; // 協程id從0開始,用完加1
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() main id = " << m_id;
}
/**
* @brief 構造函數,用于創建用戶協程
* @param[] cb 協程入口函數
* @param[] stacksize 棧大小,默認為128k
*/
Fiber::Fiber(std::function<void()> cb, size_t stacksize)
: m_id(s_fiber_id++)
, m_cb(cb) {
++s_fiber_count;
m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
m_stack = StackAllocator::Alloc(m_stacksize);
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() id = " << m_id;
}
/**
* @brief 返回當前線程正在執行的協程
* @details 如果當前線程還未創建協程,則創建線程的第一個協程,
* 且該協程為當前線程的主協程,其他協程都通過這個協程來調度,也就是說,其他協程
* 結束時,都要切回到主協程,由主協程重新選擇新的協程進行resume
* @attention 線程如果要創建協程,那么應該首先執行一下Fiber::GetThis()操作,以初始化主函數協程
*/
Fiber::ptr GetThis(){
if (t_fiber) {
return t_fiber->shared_from_this();
}
Fiber::ptr main_fiber(new Fiber);
SYLAR_ASSERT(t_fiber == main_fiber.get());
t_thread_fiber = main_fiber;
return t_fiber->shared_from_this();
}
- 協程原語:包括resume與yeild
/**
* @brief 將當前協程切到到執行狀態
* @details 當前協程和正在運行的協程進行交換,前者狀態變為RUNNING,后者狀態變為READY
*/
void Fiber::resume() {
SYLAR_ASSERT(m_state != TERM && m_state != RUNNING);
SetThis(this);
m_state = RUNNING;
if (swapcontext(&(t_thread_fiber->m_ctx), &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
/**
* @brief 當前協程讓出執行權
* @details 當前協程與上次resume時退到后臺的協程進行交換,前者狀態變為READY,后者狀態變為RUNNING
*/
void Fiber::yield() {
/// 協程運行完之后會自動yield一次,用于回到主協程,此時狀態已為結束狀態
SYLAR_ASSERT(m_state == RUNNING || m_state == TERM);
SetThis(t_thread_fiber.get());
if (m_state != TERM) {
m_state = READY;
}
if (swapcontext(&m_ctx, &(t_thread_fiber->m_ctx))) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
- 協程入口函數
/**
* @brief 協程入口函數
* @note 這里沒有處理協程函數出現異常的情況
*/
void Fiber::MainFunc() {
Fiber::ptr cur = GetThis(); // GetThis()的shared_from_this()方法讓引用計數加1
SYLAR_ASSERT(cur);
cur->m_cb(); // 這里真正執行協程的入口函數
cur->m_cb = nullptr;
cur->m_state = TERM;
auto raw_ptr = cur.get(); // 手動讓t_fiber的引用計數減1
cur.reset();
raw_ptr->yield(); // 協程結束時自動yield,以回到主協程
}
- 協程重置函數
/**
* 這里為了簡化狀態管理,強制只有TERM狀態的協程才可以重置,但其實剛創建好但沒執行過的協程也應該允許重置的
*/
void Fiber::reset(std::function<void()> cb) {
SYLAR_ASSERT(m_stack);
SYLAR_ASSERT(m_state == TERM);
m_cb = cb;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = READY;
}

浙公網安備 33010602011771號