go并發調度原理學習
go并發調度模型如上圖
M指的是Machine,一個M直接關聯了一個線程。
P指的是Processor,代表了M所需的上下文環境,也是處理用戶級代碼邏輯的處理器。
G指的是Goroutine,其實本質上也是一種輕量級的線程。
?先是 Processor(簡稱 P),其作?類似 CPU 核,?來控制可同時并發執?的任務數量。每個?作線程都必須綁定?個有效 P 才被允許執?任務,否則只能休眠,直到有空閑 P 時被喚醒。P 還為線程提供執?資源,?如對象分配內存、本地任務隊列等。線程獨享所綁定的 P 資源,可在?鎖狀態下執??效操作。
進程內的?切都在以G?式運?,包括運?時相關服務,以及main.main ?口函數。需要指出,G 并?執?體,它僅僅保存并發任務狀態,為任務執?提供所需棧內存空間。G 任務創建后被放置在 P 本地隊列或全局隊列,等待?作線程調度執?。
實際執?體是系統線程(簡稱 M),它和 P 綁定,以調度循環?式不停執? G 并發任務。M 通過修改寄存器,將執?棧指向 G ?帶棧內存,并在此空間內分配堆棧幀,執?任函數。當需要中途切換時,只要將相關寄存器值保存回 G 空間即可維持狀態,任何 M 都可據此恢復執?。線程僅負責執?,不再持有狀態,這是并發任務跨線程調度,實現多路復?的根本所在。
G自己提供內存棧在M上執行
P保存P執行過程中的數據行,當G被暫停時,SP,SC等寄存器信息會保存在G.sched中,當G被喚醒繼續執行時,從之前暫停的位置繼續執行,因為G提供內存棧,并記錄了上次執行到的位置,G數量很多,P相對較少,在垃圾回收的時候方便定位
P中有一個對列保存G的指針,其實就是一個256個元素的數組,通過兩個變量指向對首和對尾,所以這個隊列是會出現滿的情況的,滿了新加的G就只能放到全局隊列中
type g struct { stack stack //棧,兩個能容納任何變量地址的變量 stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink _panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink sched gobuf //存放g上下文信息,g被停止調度時,會將上線文信息存在這里,喚醒后可繼續調度 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr // expected sp at top of stack, to check in traceback param unsafe.Pointer // passed parameter on wakeup atomicstatus uint32 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus goid int64 //就像線程有id,g也有id waitsince int64 // approx time when the g become blocked waitreason string // if status==Gwaiting schedlink guintptr //指向另一個G,全局G就是通過這個字段連在一起的 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt paniconfault bool // panic (instead of crash) on unexpected fault address preemptscan bool // preempted g does scan for gc gcscandone bool // g has scanned stack; protected by _Gscan bit in status gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplit bool // must not split stack raceignore int8 // ignore race detection events sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64 // cputicks when syscall has returned (for tracing) traceseq uint64 // trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm *m sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr // pc of go statement that created this goroutine startpc uintptr // 被執行的函數 racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr // cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep gcAssistBytes int64 }
go func()到底做了什么?
對應函數runtime.newproc
1:從執行當前方法的G所在P的空閑G列表中取一個G,如果沒有就從全局list中取一個,畢竟G還是經常使用,用完的G并不是馬上釋放,而是放回P的空閑列表中反復利用,如果還是沒有空閑的G,就new一個malg(2048),G的棧大小為2K
2:如果有參數會將參數拷貝到G的棧上,將G狀態改成可運行狀態
3:如果P的G隊列沒滿,將G加入隊尾
4:如果P的G隊列滿了,就取出G隊列的前面一半+當前G,共129個G加入全局隊列
加入隊列后,等待被調度
全局隊列G存取
G本身有個字段schedlink指向另一個G,天生就是鏈表的一個節點,全局隊列其實就是兩個指針,一個指向隊首,一個指向隊尾,隊尾的存在就是方便入隊列
入全局隊列:前面說過將P中的一半+1個G(129)加入全局隊列,并不是一個個入隊列,而是將這個129個G的首接入全局隊列的尾,將全局隊列的尾改成這129個G的尾
出全局隊列:當系統開始調度的時候,會從P本地G隊列取一個可用G執行,如果沒有,則從全局隊列中取,最多取128個,返回第一個用于執行,剩余的存入本地G隊列中,畢竟操作本地隊列不用加鎖,操作全局隊列需要加鎖
findrunnable查找可執行的G
1:本地隊列:從M對應的P的G隊列中找(runqget),隊列不為空,返回對列首個元素,對首指針指向下一個元素,當對首和對尾指向同一個元素時表示隊列為空,訪問本地隊列中的G不需要加鎖
2:全局隊列:從全局隊列中找(globrunqget),從全局隊列中取G不是一次取一個,畢竟訪問全局隊列是要加鎖的,所以全局隊列有多少取多少,最多取P隊列容量一半128個,將這些G存入P的G隊列中
3:?絡任務(netpoll)
4:從其他P任務隊列取,拿一半
所有目的就是多核齊心協力以最快的速度完成任務,總不能出現某個P的本地隊列還有多個人,其他P都在睡大覺吧,最后如果還是沒找到一個可用的G,那就大家一起睡大覺,等著被叫醒
type p struct { lock mutex id int32 status uint32 // one of pidle/prunning/... link puintptr schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle) mcache *mcache //方便小對象的分配,一個p一個,不需要加鎖 racectx uintptr deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) deferpoolbuf [5][32]*_defer // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64 // Queue of runnable goroutines. Accessed without lock. runqhead uint32 //隊頭 runqtail uint32 //隊尾 runq [256]guintptr //G循環隊列 runnext guintptr //高優先級的G,會先執行 // Available G's (status == Gdead) gfree *g //空閑G列表 gfreecnt int32 //空閑G數量 sudogcache []*sudog sudogbuf [128]*sudog tracebuf traceBufPtr // traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr palloc persistentAlloc // per-P to avoid mutex // Per-P GC state gcAssistTime int64 // Nanoseconds in assistAlloc gcBgMarkWorker guintptr gcMarkWorkerMode gcMarkWorkerMode gcw gcWork runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point pad [sys.CacheLineSize]byte }
永遠不會退出的調度(schedule)
當一個G執行完成后,會繼續調用調度函數schedule,死循環就產生了
// goexit continuation on g0. func goexit0(gp *g) { _g_ := getg() casgstatus(gp, _Grunning, _Gdead) dropg() _g_.m.locked = 0 gfput(_g_.m.p.ptr(), gp) schedule() }
整體執行流程
mstart() => schedule() => findrunnable() => execute() => func() => goexit() => schedule()
M就緒 =>調度 => 查找可調度G => 執行G => 具體方法 => 執行完成 => 繼續調度
入口函數是 _rt0_amd64_linux,需要說明的是,不同平臺的入口函數名稱會有所不同,該方法會調用runtime.rt0_go匯編。
rt0_go 做了大量的初始化工作,runtime.args 讀取命令行參數、runtime.osinit 讀取 CPU 數目,runtime.schedinit初始化Processor數目,最大的Machine數目等等。
除此之外,我們還看到了兩個奇怪的 g0 和 m0 變量。m0 Machine 代表著當前初始化線程,而 g0 代表著初始化線程 m0 的 system stack,似乎還缺一個 p0 ?
實際上所有的 Processor 都會放到 allp 里。runtime.schedinit 會在調用 procresize 時為 m0 分配上 allp[0] 。所以到目前為止,初始化線程運行模式是符合上文提到的 G/P/M 模型的。
大量的初始化工作做完之后,會調用 runtime.newproc 為 mainPC 方法生成一個 Goroutine。 雖然 mainPC 并不是我們平時寫的那個 main 函數,但是它會調用我們寫的 main 函數,所以 main 函數是會以 Goroutine 的形式運行。
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8 LEAQ 8(SP), SI // argv MOVQ 0(SP), DI // argc MOVQ $main(SB), AX JMP AX TEXT main(SB),NOSPLIT,$-8 MOVQ $runtime·rt0_go(SB), AX JMP AX TEXT runtime·rt0_go(SB),NOSPLIT,$0 LEAQ runtime·g0(SB), CX MOVQ CX, g(BX) LEAQ runtime·m0(SB), AX // save m->g0 = g0 MOVQ CX, m_g0(AX) // save m0 to g0->m MOVQ AX, g_m(CX) CALL runtime·args(SB) CALL runtime·osinit(SB) //獲取cpu數量,頁大小 CALL runtime·schedinit(SB) //調度初始化 // create a new goroutine to start program MOVQ $runtime·mainPC(SB), AX // entry,執行runtime.main CALL runtime·newproc(SB) // start this M CALL runtime·mstart(SB) MOVL $0xf1, 0xf1 // crash RET DATA runtime·mainPC+0(SB)/8,$runtime·main(SB) GLOBL runtime·mainPC(SB),RODATA,$8 package runtime // The main goroutine. func main() { // Allow newproc to start new Ms. mainStarted = true gcenable() fn := main_init // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime fn() fn = main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime fn() exit(0) }
參考
https://github.com/golang/go (go源碼)
https://github.com/qyuhen/book (雨痕,內容很棒很全面,已出書)
浙公網安備 33010602011771號