為什么Sync.Pool不需要加鎖卻能保證線程安全
1. 簡介
我們在 Sync.Pool: 提高go語言程序性能的關鍵一步 一文中,已經了解了使用sync.Pool來實現對象的復用以減少對象的頻繁創建和銷毀,以及使用sync.Pool的一些常見注意事項。
在這篇文章中,我們將剖析sync.Pool內部實現中,介紹了sync.Pool比較巧妙的內部設計思路以及其實現方式。在這個過程中,也間接介紹了為何不加鎖也能夠實現線程安全。
主要會涉及到Go語言中實現并發的GMP模型以及其基本的調度原理,以及本地緩存的設計,無鎖隊列的使用這幾個部分的內容,綜上這幾個方面的內容實現了不加鎖也能夠保證線程安全。
2. GMP之間的綁定關系
為了之能夠幫助我們后續更好得理解sync.Pool的設計與實現,這里需要對GMP模型進行簡單的介紹。GMP模型是Go語言中的一種協作式調度模型,其中G表示Goroutine,M可以理解為內核線程,P為邏輯處理器,簡單理解其維護了一條Goroutine隊列。
2.1 M和P的關系
在GMP模型中,M和P可以動態綁定,一個M可以在運行時綁定到任意一個P上,而一個P也可以與任意一個M綁定。這種綁定方式是動態的,可以根據實際情況進行靈活調整,從而實現更加高效的協程調度。
盡管M和P可以動態綁定,但在特定時間點,一個M只會對應一個P。這是因為M是操作系統線程,而P是Go語言的邏輯處理器,Go語言的邏輯處理器需要在某個操作系統線程中運行,并且是被該邏輯處理器(P)單獨占用的。
P的數量一般是和CPU核數保持一致,每個P占用一個CPU核心來執行,可以通過runtime.GOMAXPROCS函數來修改。不過在大多數情況下,不需要手動修改,Go語言的調度器會根據實際情況自動進行調整。
2.2 P和G的關系
剛創建的Goroutine會被放入當前線程對應P的本地隊列中等待被執行。如果本地隊列已滿,則會放入全局隊列中,供其他線程的P來搶占執行。
當P空閑時,會嘗試從全局隊列中獲取Goroutine來執行。如果全局隊列中沒有Goroutine,則會從其他處理器的本地運行隊列中"偷取"一些Goroutine來執行。
如果協程執行過程中遇到阻塞操作(比如等待I/O或者鎖),處理器(P)會立即將協程移出本地運行隊列,并執行其他協程,直到被阻塞的協程可以繼續執行為止。被阻塞的協程會被放到相應的等待隊列中等待事件發生后再次被喚醒并加入到運行隊列中,但不一定還是放回原來的處理器(P)的等待隊列中。
從上述過程可以看出,G和P的綁定關系是動態綁定的,在不同的時間點,同一個G可能在不同的P上執行,同時,在不同的時間點,P也會調度執行不同的G。
2.3 總結
每個P在某個時刻只能綁定一個M,而每個G在某個時刻也只存在于某個P的等待隊列中,等待被調度執行。這是GMP模型的基本調度原理,也是Go語言高效調度的核心所在。通過動態綁定和靈活調度,可以充分利用多核處理器的計算能力,從而實現高并發、高效率的協程調度。
通過對GMP模型的基本了解,能夠幫助我們后續更好得理解sync.Pool的設計與實現。
3.Sync.Pool與GMP模型
3.1 sync.Pool性能問題
這里我們回到sync.Pool, 可以簡單使用切片,存儲可復用的對象,在需要時從中取出對象,用完之后再重新放回池子中,實現對象的重復使用。
當多個協程同時從 sync.Pool 中取對象時,會存在并發問題,因此需要實現并發安全。一種簡單的實現方式是加鎖,每個協程在取數據前先加鎖,然后獲取數據,再解鎖,實現串行讀取的效果。但是這種方式在并發比較大的場景下容易導致大量協程進入阻塞狀態,從而進一步降低性能。
因此,為了提高程序的性能,我們需要尋找一種減少并發沖突的方式。有什么方式能夠減少并發沖突呢?
3.2 基于GMP模型的改進
回到GMP模型,從第二節對GMP模型的介紹中,我們知道協程(G)需要在邏輯處理器(P)上執行,而邏輯處理器的數量是有限的,一般與CPU核心數相同。而之前的sync.Pool實現方式是所有P競爭同一份數據,容易導致大量協程進入阻塞狀態,影響程序性能。
那我們這里,是不是能夠將 sync.Pool 分成多個小的存儲池,每個P都用擁有一個小的存儲池呢? 在每個小存儲池中分別使用獨立的鎖進行并發控制。這樣可以避免多個協程同時競爭同一個全局鎖的情況,降低鎖的粒度,從而減少并發沖突。
協程運行時都需要綁定一個邏輯處理器(P),此時每個P都有自己的數據緩存,需要對象時從綁定的P的緩存中獲取,用完后重新放回。這種實現方式減少了協程競爭同一份數據的情況,只有在同一個邏輯處理器上的協程才存在競爭,從而減少并發沖突,提升性能。
3.3 能不能完全不加鎖
在上面的實現中,處于不同的P上的協程都是操作不同的數據,此時并不會出現并發問題。唯一可能出現并發問題的地方,為協程在獲取緩存對象時,邏輯處理器中途調度其他協程來執行,此時才可能導致的并發問題。那這里能不能避免并發呢?
那如果能夠將協程固定到邏輯處理器P上,并且不允許被搶占,也就是該P上永遠都是執行某一個協程,直到成功獲取緩存對象后,才允許邏輯處理器去調度執行其他協程,那么就可以完全避免并發沖突的問題了。
因此,如果我們能夠做到協程在讀取緩沖池中的數據時,能夠完全占用邏輯處理器P,不會被搶占,此時就不會出現并發了,也不需要加鎖了。
幸運的是,runtime包中提供了runtime_procPin調用,可以將當前協程固定到協程所在邏輯處理器P上,并且不允許被搶占,也就是邏輯處理器P一直都被當前協程所獨享。在獲取緩存對象時,我們可以使用runtime_procPin將當前協程固定到邏輯處理器P上,然后從該邏輯處理器P的緩存中獲取對象。這樣做不僅可以避免并發沖突,還可以避免上下文切換和鎖競爭等性能問題。
4. sync.Pool初步實現
下面來看看當前sync.Pool的部分代碼,其原理便是上述所提到的方式。具體來說,每個邏輯處理器P保存一份數據,并利用runtime_procPin來避免同一邏輯處理器P中的協程發生并發沖突。
需要注意的是,下面所展示的代碼只是部分代碼,并不包含完整的實現。但是這些代碼涵蓋了前面所提到的實現方式。同時,為了講述方便,也修改部分實現,后文會說明當前sync.Pool當前真正的實現。
4.1 sync.Pool結構體定義
type Pool struct {
// 指向 poolLocal 結構體切片的地址,長度與cpu核心數保持一致
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// 記錄當前 poolLocal 切片的長度
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() any
}
type poolLocal struct {
// 上文所說的小緩沖池的實現
private []any // Can be used only by the respective P.
}
其中,New 函數用于創建一個新的對象,以便向池中添加對象。當池中沒有可用對象時,會調用該函數。local 是指向 poolLocal 切片的指針,poolLocal 即為上文提到的小緩沖池,每個邏輯處理器都有一個。
4.2 Put方法
func (p *Pool) Put(x any) {
if x == nil {
return
}
// 這里調用pin方法,獲取到poolLocal
l, _ := p.pin()
// 將對象重新放入邏輯處理的小緩沖池當中
l.private = l.private.append(x)
// 這個為解除 Proccssor的固定,Processor能夠調度其他協程去執行
runtime_procUnpin()
}
Put方法是用于將對象重新放入緩沖池,首先調用pin方法獲取到poolLocal,然后將對象放入到poolLocal當中,然后再通過runtime_procUnpin調用,解除對當前P的綁定。
可以發現,其中比較重要的邏輯,是調用pin方法獲取到P對應的poolLocal,下面我們來看pin方法的實現。
func (p *Pool) pin() (*poolLocal, int) {
// 調用runtime_procPin,占用Processor,不會被搶占
pid := runtime_procPin()
// 獲取localSize字段的值
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
// 獲取poolLocal切片
l := p.local // load-consume
// pid為協程編號,如果pid < localSize的值,說明屬于該processor的緩沖池已經創建好了
if uintptr(pid) < s {
// 根據pid獲取對應的緩沖池
return indexLocal(l, pid), pid
}
// 否則走下面邏輯
return p.pinSlow()
}
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
// 直接通過Processor的編號,計算出偏移量獲取到對應的poolLocal
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
在函數開始時,pin方法通過調用runtime_procPin方法,占用當前goroutine所在的P,不允許其他goroutine搶占該P,這里能夠避免處于同一等待隊列中的協程出現并發讀取poolLocal的數據的問題。
同時runtime_procPin方法也會返回當前P的編號,在系統內部是唯一的,從0開始依次遞增的整數。其將能夠作為poolLocal的切片下標,來讀取poolLocal。
接下來,通過原子操作runtime_LoadAcquintptr讀取localSize字段的值,該字段表示當前poolLocal實例切片的長度。如果當前的P編號小于localSize的值,則表示該P的poolLocal實例已經被創建,可以直接獲取該P對應的poolLocal實例并返回。
如果P編號大于等于localSize的值,此時說明該P對應的poolLocal還沒創建,通過調用pinSlow()方法進行初始化。下面繼續來看pinSlow方法的具體實現。
func (p *Pool) pinSlow(pid int) (*poolLocal, int) {
// 獲取processor數量
size := runtime.GOMAXPROCS(0)
// 創建一個新的poolLocal切片
local := make([]poolLocal, size)
// 將切片地址存儲到local字段當中
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
// 將切片數組長度 存儲到 localSize 當中
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
// 根據pid,獲取到對應的poolLocal
return &local[pid], pid
}
首先調用 runtime.GOMAXPROCS(0) 獲取當前程序可用的 processor 數量,并基于這個數量創建了一個新的 poolLocal 切片 local,這里也印證了我們之前所說的,每一個Processor都有一個小緩沖池。
接著,使用原子操作 atomic.StorePointer 將指向 p.local 的指針修改為指向新創建的 local 切片的第一個元素的指針。然后使用 runtime_StoreReluintptr 原子操作將 p.localSize 修改為 size。到此為止,便完成了poolLocal切片的初始化操作。
最后返回當前 processor 對應的 poolLocal 指針和它的編號 pid。由于這里新創建的 local 切片是局部變量,在 pinSlow 函數返回后,它就無法被訪問了。但是,由于我們已經將 p.local 修改為指向這個切片的第一個元素的指針,所以其他 processor在調用 pin 方法時,就能獲取到新創建的 poolLocal。
4.3 Get方法
func (p *Pool) Get() any {
l, pid := p.pin()
var x any
if n := len(l.private); n > 0 {
x = l.private[n-1]
l.private[n-1] = nil // Just to be safe
l.private = l.private[:n-1]
}
runtime_procUnpin()
if x == nil && p.New != nil {
x = p.New()
}
return x
}
首先調用pin()方法,獲得當前協程綁定的緩存池local和協程編號pid。接著從local.private中嘗試取出對象,如果取出來是空,說明緩沖池中沒有對象,此時調用runtime_procUnpin()方法解綁協程與處理器的綁定。
如果沒有從緩沖池中成功獲取對象,并且Pool結構體的New字段非空,則調用New字段所指向的函數創建一個新對象并返回。
4.4 總結
到此為止,sync.Pool已經通過結合GMP模型的特點,給每一個P設置一份緩存數據,當邏輯處理器上的協程需要從sync.Pool獲取可重用對象時,此時將從邏輯處理器P對應的緩存中取出對象,避免了不同邏輯處理器的競爭。
此外,也通過調用runtime_procPin方法,讓協程能夠在某段時間獨占該邏輯處理器,避免了鎖的競爭和不必要的上下文切換的消耗,從而提升性能。
5. sync.Pool實現優化
5.1 問題描述
在sync.Pool的初步實現中,我們讓每一個邏輯處理器P都擁有一個小緩沖池,讓各個邏輯處理器P上的協程從sync.Pool獲取對象時不會競爭,從而提升性能。
現在可能存在的問題,在于每個 Processor 都保存一份緩存數據,那么當某個 Processor 上的 goroutine 需要使用緩存時,可能會發現它所在的 Processor 上的緩存池為空的,而其他 Processor 上的緩存對象卻沒有被利用。這樣就浪費了其他 Processor 上的資源。
回到sync.Pool的設計初衷來看,首先是提升程序性能,減少重復創建和銷毀對象的開銷;其次是減少內存壓力,通過對象復用,從而降低程序GC頻次。從這兩個方面來看,上面sync.Pool的初步實現其實存在一些優化空間的。
這里就陷入了一個兩難的境地,如果多個Processor共享同一個緩沖池,會存在容易導致大量協程進入阻塞狀態,進一步降低性能。每個 Processor 都保存一份緩存數據的話,此時也容易陷入資源浪費的問題。那能怎么辦呢?
5.2 實現優化
很多時候,可能并沒有十全十美的事情,我們往往需要折中。比如上面多個Processor共享同一個緩沖池,會降低性能;而每個 Processor 都保存一份緩存數據的話,容易陷入資源浪費的問題。
這個時候,我們可以折中一下,不采用完全共享的模式,也不采用完全獨占的模式。而采用部分獨有、部分共享的模式。每個 Processor 獨占一部分緩存,可以避免不同 Processor 之間的競爭,提高并發性能。同時,每個 Processor 也可以共享其他 Processor 上的緩存,避免了浪費。相對于完全共享和完全獨立的模式,這種設計方式是不是能夠更好地平衡并發性能和緩存利用效率。
同時,也可以基于部分獨有,部分共享的模式的基礎上,再對其進行優化。對于共享部分的資源,可以使用多個緩沖池來存儲,是將其給了所有的Processor,每個Processor保留一部分共享數據。
當Processor讀取數據時,此時先從自身的私有緩沖中讀取,讀取不到再到自身的共享緩存中讀取,讀取不到才到其他Processor讀取其共享部分。這樣子能夠避免了多個Processor同時競爭一個池導致的性能問題。同時,共享部分也可以被充分利用,避免了資源浪費。
6.Sync.Pool最終實現
6.1 sync.Pool結構體定義
type Pool struct {
noCopy noCopy
// 1. 指向poolLocal切片的指針
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// 2. 對應local切片的長度
localSize uintptr // size of the local array
// 3. 緩存池中沒對象時,調用設置的New函數來創建對象
New func() any
// 部分與此次講述無關內容,未被包含進來
// ....
}
// 每個Processor都會對應一個poolLocal
type poolLocal struct {
// 存儲緩存對象的數據結構
poolLocalInternal
// 用于內存對齊
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
// 存儲每個Processor獨享的對象
private any // Can be used only by the respective P.
// 存儲Processor共享的對象,這個是一個無鎖隊列
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
首先說明poolLocal結構體,可以認為是一個小緩沖池,每個Processor都會有對應的poolLocal對象。poolLocal中對象的存儲通過poolLocalInternal來實現,至于poolLocal中的pad字段只是用于內存對其。
poolLocalInternal其中包含private字段和shared字段,private字段保存了上文所說的Processor獨占的緩存對象,而shared字段,也就是我們上文所說的共享緩沖池組成的一部分,是允許Processor之間相互讀取的。shared字段的類型為poolChain,是一個無鎖隊列,調用pushHead能夠將數據放入共享緩沖池,調用popHead能夠從緩沖池中取出數據,無需加鎖也是并發安全的,這個并非今日的重點,在此簡單描述一下。
Pool結構體中local字段,指向了poolLocal結構體切片的地址,而localSize字段的值,為前面poolLocal切片的長度。
6.2 Get方法
func (p *Pool) Get() any {
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if x == nil && p.New != nil {
x = p.New()
}
return x
}
首先調用pin()方法,獲取當前Processor對應的poolLocal對象和協程編號pid,同時占用該Processor。
開始嘗試獲取對象,首先從poolLocal對象中獲取私有緩存private。如果私有緩存為空,則嘗試從共享緩存shared的頭部彈出一個元素x,并賦值給x。如果共享緩存也為空,則調用getSlow()方法從其他Processor的共享緩存或New方法中獲取元素x。釋放當前Processor的占用。如果元素x不為空,則返回x,否則如果New方法不為空,則調用New方法生成一個新的元素x并返回,否則返回nil。
可以看出來,在Get方法的外層,主要是嘗試從Proessor對應的poolLocal中獲取數據,讀取不到,則調用getSlow方法,嘗試從其他Processor的共享數據中獲取。下面來看getSlow方法的邏輯:
func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
// 獲取poolLocal
l := indexLocal(locals, (pid+i+1)%int(size))
// poolLocal中的shared是一個無鎖隊列,無需加鎖,也能夠保證線程安全
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 與sync.Pool對象回收的相關邏輯先刪除,與此次講述并無太大關系
// ....
return nil
}
getSlow方法實現較為簡單,首先讀取Pool結構體中localSize字段的值,得知當前有多少個poolLocal。然后對所有的poolLocal進行遍歷,嘗試從其他poolLocal的共享緩存中獲取數據,成功獲取則直接返回。
6.3 Put方法
func (p *Pool) Put(x any) {
if x == nil {
return
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}
首先調用pin方法,獲取當前Processor對應的poolLocal,然后將x放到該poolLocal的private字段中,也就是放到當前Processor的私有緩存中。如果private字段不為空,說明已經有對象放到private中了,那么x則會放到poolLocal的shared字段中,通過無鎖隊列的方式加入到共享資源池中。
6.4 總結
到此為止,在sync.Pool原本的實現上,對緩存數據的設計進行了優化,將緩存數據中區分為私有緩存部分和共享部分。此時在一定程度上避免不同 Processor 之間的競爭,提高并發性能。同時,每個 Processor 也可以共享其他 Processor 上的緩存,避免了內存的浪費。
7.總結
這篇文章,我們其實主要介紹了sync.Pool的實現原理。
我們首先基于GMP模型完成sync.Pool的一個實現,基于該實現,引出了部分獨有、部分共享的模式的優化。在這個過程中,也展示了sync.Pool的部分源碼,以便能夠更好得理解sync.Pool的實現。
同時,基于實現的講述,我們也間接得解答了sync.Pool為何不需要加鎖也保證了線程安全的問題。
這次講述sync.Pool的實現過程中,并沒有直接講述sync.Pool源碼的實現,而是一步一步得對現有實現進行優化,將其中比較好的點給描述出來,希望能夠有所幫助。

浙公網安備 33010602011771號