如何使用Mutex確保并發(fā)程序的正確性
1. 簡介
本文的主要內(nèi)容是介紹Go中Mutex并發(fā)原語。包含Mutex的基本使用,使用的注意事項以及一些實踐建議。
2. 基本使用
2.1 基本定義
Mutex是Go語言中的一種同步原語,全稱為Mutual Exclusion,即互斥鎖。它可以在并發(fā)編程中實現(xiàn)對共享資源的互斥訪問,保證同一時刻只有一個協(xié)程可以訪問共享資源。Mutex通常用于控制對臨界區(qū)的訪問,以避免競態(tài)條件的出現(xiàn)。
2.2 使用方式
使用Mutex的基本方法非常簡單,可以通過調(diào)用Mutex的Lock方法來獲取鎖,然后通過Unlock方法釋放鎖,示例代碼如下:
import "sync"
var mutex sync.Mutex
func main() {
mutex.Lock() // 獲取鎖
// 執(zhí)行需要同步的操作
mutex.Unlock() // 釋放鎖
}
2.3 使用例子
2.3.1 未使用mutex同步代碼示例
下面是一個使用goroutine訪問共享資源,但沒有使用Mutex進行同步的代碼示例:
package main
import (
"fmt"
"time"
)
var count int
func main() {
for i := 0; i < 1000; i++ {
go add()
}
time.Sleep(1 * time.Second)
fmt.Println("count:", count)
}
func add() {
count++
}
上述代碼中,我們啟動了1000個goroutine,每個goroutine都調(diào)用add()函數(shù)將count變量的值加1。由于count變量是共享資源,因此在多個goroutine同時訪問的情況下會出現(xiàn)競態(tài)條件。但是由于沒有使用Mutex進行同步,所以會導(dǎo)致count的值無法正確累加,最終輸出的結(jié)果也會出現(xiàn)錯誤。
在這個例子中,由于多個goroutine同時訪問count變量,而不進行同步控制,導(dǎo)致每個goroutine都可能讀取到同樣的count值,進行相同的累加操作。這就會導(dǎo)致最終輸出的count值不是期望的結(jié)果。如果我們使用Mutex進行同步控制,就可以避免這種競態(tài)條件的出現(xiàn)。
2.3.2 使用mutex解決上述問題
下面是使用Mutex進行同步控制,解決上述代碼中競態(tài)條件問題的示例:
package main
import (
"fmt"
"sync"
"time"
)
var (
count int
mutex sync.Mutex
)
func main() {
for i := 0; i < 1000; i++ {
go add()
}
time.Sleep(1 * time.Second)
fmt.Println("count:", count)
}
func add() {
mutex.Lock()
count++
mutex.Unlock()
}
在上述代碼中,我們在全局定義了一個sync.Mutex類型的變量mutex,用于進行同步控制。在add()函數(shù)中,我們首先調(diào)用mutex.Lock()方法獲取mutex的鎖,確保只有一個goroutine可以訪問count變量。然后進行加1操作,最后調(diào)用mutex.Unlock()方法釋放mutex的鎖,使其他goroutine可以繼續(xù)訪問count變量。
通過使用Mutex進行同步控制,我們避免了競態(tài)條件的出現(xiàn),確保了count變量的正確累加。最終輸出的結(jié)果也符合預(yù)期。
3. 使用注意事項
3.1 Lock/Unlock需要成對出現(xiàn)
下面是一個沒有成對出現(xiàn)Lock和Unlock的代碼例子:
package main
import (
"fmt"
"sync"
)
func main() {
var mutex sync.Mutex
go func() {
mutex.Lock()
fmt.Println("goroutine1 locked the mutex")
}()
go func() {
fmt.Println("goroutine2 trying to lock the mutex")
mutex.Lock()
fmt.Println("goroutine2 locked the mutex")
}()
}
在上述代碼中,我們創(chuàng)建了一個sync.Mutex類型的變量mutex,然后在兩個goroutine中使用了這個mutex。
在第一個goroutine中,我們調(diào)用了mutex.Lock()方法獲取mutex的鎖,但是沒有調(diào)用相應(yīng)的Unlock方法。在第二個goroutine中,我們首先打印了一條信息,然后調(diào)用了mutex.Lock()方法嘗試獲取mutex的鎖。由于第一個goroutine沒有釋放mutex的鎖,第二個goroutine就一直阻塞在Lock方法中,一直無法執(zhí)行。
因此,在使用Mutex的過程中,一定要確保每個Lock方法都有對應(yīng)的Unlock方法,確保Mutex的正常使用。
3.2 不能對已使用的Mutex作為參數(shù)進行傳遞
下面舉一個已使用的Mutex作為參數(shù)進行傳遞的代碼的例子:
type Counter struct {
sync.Mutex
Count int
}
func main(){
var c Counter
c.Lock()
defer c.Unlock()
c.Count++
foo(c)
fmt.println("done")
}
func foo(c Counter) {
c.Lock()
defer c.Unlock()
fmt.println("foo done")
}
當(dāng)一個 mutex 被傳遞給一個函數(shù)時,預(yù)期的行為應(yīng)該是該函數(shù)在訪問受 mutex 保護的共享資源時,能夠正確地獲取和釋放 mutex,以避免競態(tài)條件的發(fā)生。
如果我們在Mutex未解鎖的情況下拷貝這個Mutex,就會導(dǎo)致鎖失效的問題。因為Mutex的狀態(tài)信息被拷貝了,拷貝出來的Mutex還是處于鎖定的狀態(tài)。而在函數(shù)中,當(dāng)要訪問臨界區(qū)數(shù)據(jù)時,首先肯定是先調(diào)用Mutex.Lock方法加鎖,而傳入Mutex其實是處于鎖定狀態(tài)的,此時函數(shù)將永遠無法獲取到鎖。
因此,不能將已使用的Mutex直接作為參數(shù)進行傳遞。
3.3 不可重復(fù)調(diào)用Lock/UnLock方法
下面是一個例子,其中對同一個 Mutex 進行了重復(fù)加鎖:
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
mu.Lock()
fmt.Println("First Lock")
// 重復(fù)加鎖
mu.Lock()
fmt.Println("Second Lock")
mu.Unlock()
mu.Unlock()
}
在這個例子中,我們先對 Mutex 進行了一次加鎖,然后在沒有解鎖的情況下,又進行了一次加鎖操作.
這種情況下,程序會出現(xiàn)死鎖,因為第二次加鎖操作已經(jīng)被阻塞,等待第一次加鎖的解鎖操作,而第一次加鎖的解鎖操作也被阻塞,等待第二次加鎖的解鎖操作,導(dǎo)致了互相等待的局面,無法繼續(xù)執(zhí)行下去。
Mutex實際上是通過一個int32類型的標(biāo)志位來實現(xiàn)的。當(dāng)這個標(biāo)志位為0時,表示這個Mutex當(dāng)前沒有被任何goroutine獲取;當(dāng)標(biāo)志位為1時,表示這個Mutex當(dāng)前已經(jīng)被某個goroutine獲取了。
Mutex的Lock方法實際上就是將這個標(biāo)志位從0改為1,表示獲取了鎖;Unlock方法則是將標(biāo)志位從1改為0,表示釋放了鎖。當(dāng)?shù)诙握{(diào)用Lock方法,此時標(biāo)記位為1,代表有一個goroutine持有了這個鎖,此時將會被阻塞,而持有該鎖的其實就是當(dāng)前的goroutine,此時該程序?qū)肋h阻塞下去。
4. 實踐建議
4.1 Mutex鎖不要同時保護兩份不相關(guān)數(shù)據(jù)
下面是一個例子,使用Mutex同時保護兩份不相關(guān)的數(shù)據(jù)
// net/http transport.go
type Transport struct {
lk sync.Mutex
idleConn map[string][]*persistConn
altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
}
func (t *Transport) CloseIdleConnections() {
t.lk.Lock()
defer t.lk.Unlock()
if t.idleConn == nil {
return
}
for _, conns := range t.idleConn {
for _, pconn := range conns {
pconn.close()
}
}
t.idleConn = nil
}
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
if scheme == "http" || scheme == "https" {
panic("protocol " + scheme + " already registered")
}
t.lk.Lock()
defer t.lk.Unlock()
if t.altProto == nil {
t.altProto = make(map[string]RoundTripper)
}
if _, exists := t.altProto[scheme]; exists {
panic("protocol " + scheme + " already registered")
}
t.altProto[scheme] = rt
}
在這個例子中,idleConn是存儲了空閑的連接,altProto是存儲了協(xié)議的處理器,CloseIdleConnections方法是關(guān)閉所有空閑的連接,RegisterProtocol是用于注冊協(xié)議處理的。
盡管ideConn和altProto這兩部分?jǐn)?shù)據(jù)并沒有任何關(guān)聯(lián),但是卻是使用同一個Mutex來保護的,這樣子當(dāng)調(diào)用RegisterProtocol方法時,便無法調(diào)用CloseIdleConnections方法,這會導(dǎo)致競爭過多,從而影響性能。
因此,為了提高并發(fā)性能,應(yīng)該將 Mutex 的鎖粒度盡量縮小,只保護需要保護的數(shù)據(jù)。
現(xiàn)代版本的 net/http 中已經(jīng)對 Transport 進行了改進,分別使用了不同的 mutex 來保護 idleConn 和 altProto,以提高性能和代碼的可維護性。
type Transport struct {
idleMu sync.Mutex
idleConn map[connectMethodKey][]*persistConn // most recently used at end
altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
}
4.2 Mutex嵌入結(jié)構(gòu)體中位置放置建議
將 Mutex 嵌入到結(jié)構(gòu)體中,如果只需要保護其中一些數(shù)據(jù),可以將 Mutex 放在需要控制的字段上面,然后使用空格將被保護字段和其他字段進行分隔。這樣可以實現(xiàn)更細粒度的鎖定,也能更清晰地表達每個字段需要被互斥保護的意圖,代碼更易于維護和理解。下面舉一些實際的例子:
Server結(jié)構(gòu)體中reqLock是用來保護freeReq字段,respLock用來保護freeResp字段,都是將mutex放在被保護字段的上面
//net/rpc server.go
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
在Transport結(jié)構(gòu)體中,idleMu鎖會保護closeIdle等一系列字段,此時將鎖放在被保護字段的最上面,然后用空格將被idleMu鎖保護的字段和其他字段分隔開來。 實現(xiàn)更細粒度的鎖定,也能更清晰地表達每個字段需要被互斥保護的意圖。
// net/http transport.go
type Transport struct {
idleMu sync.Mutex
closeIdle bool // user has requested to close all idle conns
idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
idleLRU connLRU
reqMu sync.Mutex
reqCanceler map[cancelKey]func(error)
altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
connsPerHostMu sync.Mutex
connsPerHost map[connectMethodKey]int
connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
}
4.3 盡量減小鎖的作用范圍
在一個代碼段里,盡量減小鎖的作用范圍可以提高并發(fā)性能,減少鎖的等待時間,從而減少系統(tǒng)資源的浪費。
鎖的作用范圍越大,那么就有越多的代碼需要等待鎖,這樣就會降低并發(fā)性能。因此,在編寫代碼時,應(yīng)該盡可能減小鎖的作用范圍,只在需要保護的臨界區(qū)內(nèi)加鎖。
如果鎖的作用范圍是整個函數(shù),使用 defer 語句來釋放鎖是一種常見的做法,可以避免忘記手動釋放鎖而導(dǎo)致的死鎖等問題。
func (t *Transport) CloseIdleConnections() {
t.lk.Lock()
defer t.lk.Unlock()
if t.idleConn == nil {
return
}
for _, conns := range t.idleConn {
for _, pconn := range conns {
pconn.close()
}
}
t.idleConn = nil
}
在使用鎖時,注意避免在鎖內(nèi)執(zhí)行長時間運行的代碼或者IO操作,因為這樣會阻塞鎖的使用,導(dǎo)致鎖的等待時間變長。如果確實需要在鎖內(nèi)執(zhí)行長時間運行的代碼或者IO操作,可以考慮將鎖釋放,讓其他代碼先執(zhí)行,等待操作完成后再重新獲取鎖, 比如下面代碼示例
// net/http/httputil persist.go
func (cc *ClientConn) Read(req *http.Request) (resp *http.Response, err error) {
// Retrieve the pipeline ID of this request/response pair
cc.mu.Lock()
id, ok := cc.pipereq[req]
delete(cc.pipereq, req)
if !ok {
cc.mu.Unlock()
return nil, ErrPipeline
}
cc.mu.Unlock()
// xxx 省略掉一些中間邏輯
// 從http連接中讀取http響應(yīng)數(shù)據(jù), 這個IO操作,先解鎖
resp, err = http.ReadResponse(r, req)
// 網(wǎng)絡(luò)IO操作結(jié)束,再繼續(xù)讀取
cc.mu.Lock()
defer cc.mu.Unlock()
if err != nil {
cc.re = err
return resp, err
}
cc.lastbody = resp.Body
cc.nread++
if resp.Close {
cc.re = ErrPersistEOF // don't send any more requests
return resp, cc.re
}
return resp, err
}
5.總結(jié)
在并發(fā)編程中,Mutex是一種常見的同步機制,用來保護共享資源。為了提高并發(fā)性能,我們需要盡可能縮小Mutex的鎖粒度,只保護需要保護的數(shù)據(jù),同時在一個代碼段里,盡量減小鎖的作用范圍。如果鎖的作用范圍是整個函數(shù),可以使用defer來在函數(shù)退出時解鎖。當(dāng)Mutex嵌入到結(jié)構(gòu)體中時,我們可以將Mutex放到要控制的字段上面,并使用空格將字段進行分隔,以便只保護需要保護的數(shù)據(jù)。

浙公網(wǎng)安備 33010602011771號