Golang基礎(chǔ)筆記十五之sync
本文首發(fā)于公眾號(hào):Hunter后端
這一篇筆記介紹 Golang 中的 sync 模塊。
sync 包主要提供了基礎(chǔ)的同步原語(yǔ),比如互斥鎖,讀寫(xiě)鎖,等待組等,用于解決并發(fā)編程中的線程安全問(wèn)題,以下是本篇筆記目錄:
- WaitGroup-等待組
- sync.Mutex-互斥鎖
- sync.RWMutex-讀寫(xiě)鎖
- sync.Once-一次性執(zhí)行
- sync.Pool-對(duì)象池
- sync.Cond-條件變量
- sync.Map
1、WaitGroup-等待組
前面在第十篇我們介紹 goroutine 和 channel 的時(shí)候,在使用 goroutine 的時(shí)候介紹有一段代碼如下:
package main
import (
"fmt"
"time"
)
func PrintGoroutineInfo() {
fmt.Println("msg from goroutine")
}
func main() {
go PrintGoroutineInfo()
time.Sleep(1 * time.Millisecond)
fmt.Println("msg from main")
}
在這里,我們開(kāi)啟了一個(gè)協(xié)程調(diào)用 PrintGoroutineInfo() 函數(shù),然后使用 time.Sleep() 來(lái)等待它調(diào)用結(jié)束。
然而在開(kāi)發(fā)中,我們不能確定這個(gè)函數(shù)多久才能調(diào)用完畢,也無(wú)法使用準(zhǔn)確的 sleep 時(shí)間來(lái)等待,那么這里就可以使用到 sync 模塊的 WaitGroup 函數(shù)來(lái)等待一個(gè)或多個(gè) goroutine 執(zhí)行完畢。
下面是使用示例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func SleepRandSeconds(wg *sync.WaitGroup) {
defer wg.Done()
sleepSeconds := rand.Intn(3)
fmt.Printf("sleep %d seconds\n", sleepSeconds)
time.Sleep(time.Duration(sleepSeconds) * time.Second)
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go SleepRandSeconds(&wg)
go SleepRandSeconds(&wg)
wg.Wait()
fmt.Println("函數(shù)執(zhí)行完畢")
}
在這里,我們通過(guò) var wg sync.WaitGroup 定義了一個(gè)等待組,并通過(guò) wg.Add(2) 表示添加了需要等待的并發(fā)數(shù),在并發(fā)中我們將 &wg 傳入并通過(guò) wg.Done() 減少需要等待的并發(fā)數(shù)。
在 wg.Done() 函數(shù)內(nèi)部,使用 wg.Add(-1) 減少需要等待的并發(fā)數(shù),在 main 函數(shù)中,使用 wg.Wait() 進(jìn)入阻塞狀態(tài),當(dāng)?shù)却牟l(fā)都完成后,此函數(shù)就會(huì)返回,完成等待并接著往后執(zhí)行。
2、sync.Mutex-互斥鎖
1. 數(shù)據(jù)競(jìng)態(tài)與互斥鎖
當(dāng)多個(gè) goroutine 并發(fā)訪問(wèn)同一個(gè)共享資源,且至少有一個(gè)訪問(wèn)是寫(xiě)操作時(shí),就會(huì)發(fā)生數(shù)據(jù)競(jìng)態(tài),造成的結(jié)果就是程序每次運(yùn)行的結(jié)果表現(xiàn)可能會(huì)不一致。
比如下面的示例:
var balance int
func AddFunc() {
balance += 1
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(5 * time.Second)
fmt.Println("balance is: ", balance)
}
多次執(zhí)行上面的代碼,最終輸出的 balance 的值可能都不一致。
如果一個(gè)變量在多個(gè) goroutine 同時(shí)訪問(wèn)時(shí),不會(huì)出現(xiàn)比如數(shù)據(jù)不一致或程序崩潰的情況,那么我們就稱其是并發(fā)安全的。
我們可以使用 go run -race main.go 的方式來(lái)檢測(cè)數(shù)據(jù)競(jìng)態(tài),執(zhí)行檢測(cè)后,會(huì)輸出數(shù)據(jù)競(jìng)態(tài)的一些信息,比如發(fā)生在代碼的多少行,一共發(fā)生了多少次數(shù)據(jù)競(jìng)態(tài):
==================
WARNING: DATA RACE
Read at 0x000003910df8 by goroutine 7:
main.AddFunc()
/../main.go:13 +0x24
Previous write at 0x000003910df8 by goroutine 6:
main.AddFunc()
/../main.go:13 +0x3c
Goroutine 7 (running) created at:
main.main()
/../main.go:18 +0x32
Goroutine 6 (finished) created at:
main.main()
/../main.go:18 +0x32
==================
balance is: 98
Found 3 data race(s)
exit status 66
而要避免這種數(shù)據(jù)競(jìng)態(tài)的發(fā)生,我們可以限制在同一時(shí)間只能有一個(gè) goroutine 訪問(wèn)同一個(gè)變量,這種方法稱為互斥機(jī)制。
我們可以通過(guò)緩沖通道和 sync.Mutex 來(lái)實(shí)現(xiàn)這種互斥鎖的操作。
2. 緩沖通道實(shí)現(xiàn)互斥鎖
我們可以通過(guò)容量為 1 的緩沖通道來(lái)實(shí)現(xiàn)互斥鎖的操作,保證同一時(shí)間只有一個(gè) goroutine 訪問(wèn)同一個(gè)變量,下面是修改后的代碼:
var sema = make(chan struct{}, 1)
var balance int
func AddFunc() {
sema <- struct{}{}
balance += 1
<-sema
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(3 * time.Second)
fmt.Println("balance is: ", balance)
}
在上面這段代碼里,我們定義了 sema 這個(gè)容量為 1 的通道,在每個(gè) AddFunc() 并發(fā)中,對(duì)變量 balance 執(zhí)行讀寫(xiě)操作前,我們先往通道里寫(xiě)入了一條數(shù)據(jù),這樣其他并發(fā)在執(zhí)行該函數(shù)時(shí),由于也會(huì)先往通道里寫(xiě)入數(shù)據(jù),而這個(gè)時(shí)候通道已經(jīng)滿了,所以會(huì)處于堵塞狀態(tài),這就相當(dāng)于獲取鎖。
直到 balance 寫(xiě)操作完成,從通道里讀取數(shù)據(jù),通道為空,相當(dāng)于釋放鎖,這個(gè)時(shí)候其他并發(fā)才可以往通道里寫(xiě)入數(shù)據(jù)重新拿到鎖。
這樣我們通過(guò)往通道里寫(xiě)入和讀取數(shù)據(jù)保證了同一時(shí)間只有一個(gè) goroutine 在對(duì) balance 進(jìn)行寫(xiě)操作,從而實(shí)現(xiàn)互斥鎖的操作。
3. sync.Mutex
在 sync 包中,sync.Mutex 直接為我們實(shí)現(xiàn)了互斥鎖的操作,它的操作如下:
var mutex sync.Mutex // 互斥鎖的定義
mutex.Lock() // 獲取鎖
mutex.Unlock() // 釋放鎖
那么使用 sync.Mutex 實(shí)現(xiàn)上面的邏輯,代碼如下:
var mutex sync.Mutex
var balance int
func AddFunc() {
mutex.Lock()
defer mutex.Unlock()
balance += 1
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(3 * time.Second)
fmt.Println("balance is: ", balance)
}
3、sync.RWMutex-讀寫(xiě)鎖
在上面介紹的 sync.Mutex 互斥鎖中,限制了同一時(shí)間只能有一個(gè) goroutine 訪問(wèn)某個(gè)變量,包括讀和寫(xiě),但這種情況并非是最理想的,比如在讀多寫(xiě)少的場(chǎng)景下。
那么 Golang 里的 sync.RWMutex 為我們提供了讀寫(xiě)鎖的操作,它會(huì)允許多個(gè)讀操作的并發(fā),而寫(xiě)操作會(huì)阻塞所有讀和寫(xiě)。
讀寫(xiě)鎖的基本規(guī)則如下:
- 當(dāng)一個(gè) goroutine 獲取了讀鎖后,其他 goroutine 仍然可以獲取讀鎖,但不能獲取寫(xiě)鎖。
- 當(dāng)一個(gè) goroutine 獲取了寫(xiě)鎖后,其他 goroutine 無(wú)論是讀鎖還是寫(xiě)鎖都不能獲取,必須等待該寫(xiě)鎖釋放。
- 當(dāng)有寫(xiě)操作在等待時(shí),避免寫(xiě)操作長(zhǎng)期饑餓,會(huì)優(yōu)先處理寫(xiě)鎖請(qǐng)求。
下面是讀寫(xiě)鎖的用法:
var rwMu sync.RWMutex
rwMu.RLock() // 獲取讀鎖
rwMu.RUnlock() // 釋放讀鎖
rwMu.Lock() // 獲取寫(xiě)鎖
rwMu.Unlock() // 釋放寫(xiě)鎖
下面是讀寫(xiě)鎖在函數(shù)中的用法示例:
func ReadBalance() {
rwMu.RLock()
fmt.Println("get read lock")
defer rwMu.RUnlock()
fmt.Println("balance: ", balance)
}
func WriteBalance() {
rwMu.Lock()
fmt.Println("get write lock")
defer rwMu.Unlock()
balance += 1
}
4、sync.Once-一次性執(zhí)行
sync.Once 可用于確保函數(shù)只被執(zhí)行一次,常用于初始化操作,且可以用于延遲加載。
提供的方法是 Do(f func()),參數(shù)內(nèi)容是一個(gè)需要被執(zhí)行的函數(shù) f,這個(gè)方法實(shí)現(xiàn)的功能是只有在第一次被調(diào)用的時(shí)候會(huì)執(zhí)行 f 函數(shù)進(jìn)行初始化。
下面是該方法的使用示例:
import (
"fmt"
"sync"
)
type Config struct {
// 配置信息
}
var (
instance *Config
once sync.Once
)
func LoadConfig() {
fmt.Println("初始化配置...")
instance = &Config{}
// 加載配置的邏輯
}
func GetConfig() *Config {
once.Do(LoadConfig)
return instance
}
func main() {
c1 := GetConfig()
c2 := GetConfig()
fmt.Println(c1 == c2) // 輸出: true(同一個(gè)實(shí)例)
}
在這里,雖然 GetConfig() 函數(shù)執(zhí)行了兩遍,但是其內(nèi)部的調(diào)用的 LoadConfig 函數(shù)卻只執(zhí)行了一次,因?yàn)?sync.Once 會(huì)在內(nèi)部記錄該函數(shù)是否已經(jīng)初始化。
sync.Once 是個(gè)結(jié)構(gòu)體,其結(jié)構(gòu)如下:
type Once struct {
done atomic.Uint32
m Mutex
}
其中,done 字段用于記錄需要執(zhí)行的函數(shù) f 是否已經(jīng)被執(zhí)行,其對(duì)應(yīng)的 Do() 方法內(nèi)部會(huì)先根據(jù) done 字段判斷,如果已經(jīng)被執(zhí)行過(guò)則直接返回,而如果沒(méi)有則會(huì)先執(zhí)行一次。
而 m 字段表示的互斥鎖則用于在 Do() 方法內(nèi)部調(diào)用的 doSlow() 中使用,用于確保并發(fā)情況下目標(biāo)函數(shù)只被執(zhí)行一次,在 f 函數(shù)執(zhí)行結(jié)束后,done 參數(shù)會(huì)被置為 1,表示該函數(shù)已經(jīng)被執(zhí)行,這樣再次調(diào)用 Do() 方法時(shí),判斷 done 字段的值為 1 則不會(huì)再執(zhí)行此函數(shù)。
5、sync.Pool-對(duì)象池
sync.Pool,對(duì)象池,我們可以將一些生命周期短且創(chuàng)建成本高的對(duì)象存在其中,從而避免頻繁的創(chuàng)建和銷毀對(duì)象,以減少內(nèi)存分配和垃圾回收壓力。
簡(jiǎn)單地說(shuō)就是復(fù)用對(duì)象。
1. 基礎(chǔ)用法
下面以復(fù)用一個(gè)字節(jié)緩沖區(qū)為例介紹一下對(duì)象池的基礎(chǔ)用法。
1) 創(chuàng)建對(duì)象池
創(chuàng)建對(duì)象池的操作如下:
var bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
可以看到,這里對(duì) sync.Pool 的 New 字段賦值了一個(gè)函數(shù),返回的是一個(gè)字節(jié)緩沖區(qū)。
2) 從池中獲取對(duì)象
從對(duì)象池中獲取該對(duì)象的操作使用 Get() 操作:
buf := bufferPool.Get().(*bytes.Buffer)
3) 將對(duì)象放回池中
對(duì)該字節(jié)緩沖區(qū)使用完畢后可以將該對(duì)象再放回池中:
bufferPool.Put(buf)
2. 使用示例
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("create bytes buffer")
return &bytes.Buffer{}
},
}
func LogMessage(msg string) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.WriteString(msg)
fmt.Println(buf.String())
}
func main() {
LogMessage("hello world")
LogMessage("hello world")
LogMessage("hello world")
}
在上面的代碼中,我們先定義了 bufferPool,然后在 LogMessage() 函數(shù)中,先使用 Get() 獲取該字節(jié)緩沖對(duì)象,因?yàn)檫@里返回的數(shù)據(jù)是接口類型,所以這里將其轉(zhuǎn)為了對(duì)應(yīng)的類型,然后使用 buf.Reset() 重置了之前的記錄后寫(xiě)入新的數(shù)據(jù),最后使用的 defer 操作將此對(duì)象又放回了對(duì)象池。
6、sync.Cond-條件變量
sync.Cond 用于等待特定條件發(fā)生后再繼續(xù)執(zhí)行,可用于生產(chǎn)者-消費(fèi)者的模式。
創(chuàng)建一個(gè)條件變量,參數(shù)只有一個(gè),那就是鎖,下面代碼里用的是互斥鎖:
cond = sync.NewCond(&sync.Mutex{})
返回的 cond 對(duì)外暴露的字段 L 就是我們輸入的鎖。
下面用一個(gè)生產(chǎn)者的代碼示例來(lái)介紹 cond 的幾個(gè)相關(guān)函數(shù)。
在這里定義了 queue 作為隊(duì)列,其中擁有需要處理的數(shù)據(jù),queueMaxSize 字段為限制的最大隊(duì)列長(zhǎng)度。
var (
queue []int
queueMaxSize int = 5
cond = sync.NewCond(&sync.Mutex{})
)
func Producer() {
for {
cond.L.Lock()
for len(queue) == queueMaxSize {
fmt.Println("produce queue max size, wait")
cond.Wait()
}
queue = append(queue, 1)
fmt.Println("produce queue")
cond.Signal() // 通知消費(fèi)者
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
在定義的 Producer() 函數(shù)中,有一個(gè)死循環(huán),內(nèi)部先使用 cond.L.Lock() 獲取鎖,然后判斷生產(chǎn)的數(shù)據(jù)是否有消費(fèi)者消費(fèi),如果隊(duì)列滿了的話,則進(jìn)入等待。
1. cond.Wait()
在上面的代碼中,我們使用 cond.Wait() 進(jìn)入了等待狀態(tài)。
在 Wait() 函數(shù)內(nèi)部,先對(duì)前面的鎖進(jìn)行釋放操作,然后進(jìn)入阻塞狀態(tài),直到其他 gouroutine 通過(guò) Signal() 函數(shù)喚醒后重新獲取鎖。
2. cond.Signal()
前面往隊(duì)列里添加數(shù)據(jù)后,通過(guò) cond.Signal() 函數(shù)通知消費(fèi)者,消費(fèi)者在另一個(gè)函數(shù)中就可以被喚醒,然后進(jìn)行處理,同時(shí)這個(gè)函數(shù)后面將鎖釋放 cond.L.Unlock()。
3. cond.Broadcast()
前面的 Signal() 函數(shù)是喚醒一個(gè)等待的 goroutine,cond.Broadcast() 函數(shù)則可以喚醒所有等待的 goroutine。
下面提供一下生產(chǎn)者-消費(fèi)者的全部處理代碼:
package main
import (
"fmt"
"sync"
"time"
)
var (
queue []int
queueMaxSize int = 5
cond = sync.NewCond(&sync.Mutex{})
)
func Producer() {
for {
cond.L.Lock()
for len(queue) == queueMaxSize {
fmt.Println("produce queue max size, wait")
cond.Wait()
}
queue = append(queue, 1)
fmt.Println("produce queue")
cond.Signal() // 通知消費(fèi)者
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
func Consumer() {
for {
cond.L.Lock()
for len(queue) == 0 {
fmt.Println("wait for produce")
cond.Wait() // 等待并釋放鎖
}
fmt.Println("consume queue")
item := queue[0]
queue = queue[1:]
cond.Signal() // 通知生產(chǎn)者
cond.L.Unlock()
ProcessItem(item)
}
}
func ProcessItem(i int) {
fmt.Println("process i: ", i)
}
func main() {
go Producer()
go Consumer()
time.Sleep(1 * time.Second)
}
7、sync.Map
sync 模塊提供了 sync.Map 用來(lái)存儲(chǔ)鍵值對(duì),但是和之前介紹的 map 不一樣的是,sync.Map 是并發(fā)安全的,而且無(wú)需初始化,并且在操作方法上與原來(lái)的 map 不一樣。
1. 并發(fā)安全
原生的 map 是非并發(fā)安全的,如果多個(gè) goroutine 對(duì)其進(jìn)行同時(shí)讀寫(xiě)會(huì)觸發(fā)錯(cuò)誤,比如下面的操作:
import (
"fmt"
"time"
)
var originMap = make(map[string]int)
func UpdateMapKey() {
originMap["a"] += 1
}
func GetMapKey() {
a := originMap["a"]
fmt.Println(a)
}
func main() {
originMap["a"] = 0
for range 100 {
go UpdateMapKey()
go GetMapKey()
}
time.Sleep(1 * time.Second)
fmt.Println("originMap: ", originMap)
}
但是 sync.Map 是并發(fā)安全的,內(nèi)部會(huì)通過(guò)互斥鎖的操作允許多個(gè) goroutine 安全地讀寫(xiě),下面是使用 sync.Map 對(duì)上面邏輯的改寫(xiě),后面我們會(huì)具體介紹其操作方法:
import (
"fmt"
"sync"
"time"
)
var originMap sync.Map
func UpdateMapKey() {
for {
oldValue, loaded := originMap.Load("a")
if !loaded {
if _, ok := originMap.LoadOrStore("a", 1); ok {
return
}
} else {
newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {
return
}
}
}
}
func GetMapKey() {
a, _ := originMap.Load("a")
fmt.Println(a)
}
func main() {
originMap.Store("a", 0)
for range 100 {
go UpdateMapKey()
go GetMapKey()
}
time.Sleep(1 * time.Second)
a, _ := originMap.Load("a")
fmt.Println("originMap: ", a)
}
2. 初始化
原生的 map 進(jìn)行初始化,有下面兩種操作方法:
var originMap = make(map[string]int)
var originMap = map[string]int{}
sync.Map 可以直接聲明使用:
var m sync.Map
m.Store("a", 0)
3. 操作方法
這里先介紹 sync.Map 增刪改查的基礎(chǔ)操作:
1) 增
增加一個(gè) key 的操作如下:
originMap.Store("a", 1)
2) 刪
刪除一個(gè) key 的操作如下:
originMap.Delete("a", 1)
3) 改
修改操作還是可以用 Store() 方法,而且可以修改為不同數(shù)據(jù)類型:
m.Store("a", "123")
4) 查
查詢操作可以使用 Load() 方法,返回對(duì)應(yīng)的 value 值以及是否存在:
m.Store("a", 1)
v, ok := m.Load("a")
if ok {
fmt.Printf("exist value:%v\n", v.(int))
} else {
fmt.Printf("key not exists")
}
5) 遍歷
遍歷操作如下:
m.Store("a", 1)
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
4. 原子性條件操作
上面的這些方法可以實(shí)現(xiàn)基礎(chǔ)的增刪改查操作,但是如果我們有一個(gè)需求,比如前面的獲取一個(gè) key 的 value,然后在原值的基礎(chǔ)上 +1 再存入,大概邏輯如下:
v, ok := m.Load(key)
v = v.(int)
v +=1
m.Store(key, v)
但是在這個(gè)操作中,如果有其他 goroutine 已經(jīng)修改了 v 的值,那么我們這里的操作就相當(dāng)于污染了源數(shù)據(jù),而為了避免這個(gè)可能,我們可以使用一些原子性條件操作,以實(shí)現(xiàn)并發(fā)操作。
1) CompareAndSwap()
CompareAndSwap() 是一個(gè)更新操作,傳入 3 個(gè)參數(shù),key,oldValue 和 newValue,僅當(dāng) key 的結(jié)果為 oldValue 的時(shí)候,將結(jié)果更新為 newValue,使用示例如下:
key := "a"
m.Store(key, 1)
swapped := m.CompareAndSwap(key, 1, 2)
fmt.Printf("當(dāng) value 為 1 的時(shí)候,將 value 從 1 修改為 2, 是否更新結(jié)果 %v\n", swapped)
swapped = m.CompareAndSwap(key, 1, 3)
fmt.Printf("當(dāng) value 為 1 的時(shí)候,將 value 從 1 修改為 3, 是否更新結(jié)果 %v\n", swapped)
所以在上面我們要對(duì)結(jié)果進(jìn)行 +1 的代碼操作為:
newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {
return
}
2) CompareAndDelete()
CompareAndDelete 是一個(gè)原子性的刪除操作,接受兩個(gè)參數(shù),key 和 oldValue,僅當(dāng) key 的值為 oldValue 時(shí)刪除該 key,返回結(jié)果為是否刪除:
key := "a"
m.Store(key, 1)
deleted := m.CompareAndDelete(key, 1)
if deleted {
fmt.Printf("當(dāng) key 的 value 為 %v 時(shí),刪除\n", 1)
} else {
fmt.Printf(" key 的 value 不為 %v 時(shí),不執(zhí)行刪除\n", 1)
}
3) LoadAndDelete()
LoadAndDelete 表示是否加載某個(gè) key 的值并刪除該 key,無(wú)論該 key 是否存在,參數(shù)為 key,返回值為 value 和是否存在該 key:
key := "a"
m.Store(key, 1)
value, loaded := m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
value, loaded = m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
4) LoadOrStore()
LoadOrStore 方法為不存在 key 則存入,存在的話則返回該值:
key := "a"
value, loaded := m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
value, loaded = m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)

浙公網(wǎng)安備 33010602011771號(hào)