GO 語言中 chan 的理解
GO 語言中 chan 的理解
chan 的底層實現(xiàn)是怎么樣的?
chan 是 Go 語言中的一個關(guān)鍵字,用于實現(xiàn)并發(fā)通信。chan 可以用于在不同的 goroutine 之間傳遞數(shù)據(jù),實現(xiàn)數(shù)據(jù)的同步和異步傳輸。
在底層實現(xiàn)上,chan 是通過一個結(jié)構(gòu)體來表示的,這個結(jié)構(gòu)體包含了一個指向數(shù)據(jù)的指針和兩個指向信道的指針。其中,一個指針用于發(fā)送數(shù)據(jù),另一個指針用于接收數(shù)據(jù)。
下面是 chan 的底層實現(xiàn)代碼:
type hchan struct {
qcount uint // 當前隊列中的元素數(shù)量
dataqsiz uint // 隊列的容量
buf unsafe.Pointer // 指向隊列的指針
elemsize uint16 // 元素的大小
closed uint32 // 是否關(guān)閉
elemtype *_type // 元素的類型
sendx uint // 發(fā)送的位置
recvx uint // 接收的位置
recvq waitq // 接收等待隊列
sendq waitq // 發(fā)送等待隊列
lock mutex // 鎖
}
chan 的發(fā)送和接收操作的底現(xiàn)
當我們向 chan 發(fā)送數(shù)據(jù)時,會先檢查 chan 是否已經(jīng)關(guān)閉。如果 chan 已經(jīng)關(guān)閉,那么發(fā)送操作會直接返回一個 panic。否則,會將數(shù)據(jù)復制到隊列中,并更新發(fā)送位置。
下面是 chan 發(fā)送操作的底層實現(xiàn)代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 {
panic("send on closed channel")
}
// 計算發(fā)送位置
i := c.sendx
// 計算隊列中的元素數(shù)量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果隊列已滿,需要擴容
grow(c)
}
// 更新發(fā)送位置
c.sendx++
// 將數(shù)據(jù)復制到隊列中
qput(c, i, ep)
return true
}
當我們從 chan 接收數(shù)據(jù)時,也會先檢查 chan 是否已經(jīng)關(guān)閉。如果 chan 已經(jīng)關(guān)閉并且隊列中沒有數(shù)據(jù),那么接收操作會直接返回一個零值。否則,會從隊列中取出數(shù)據(jù),并更新接收位置。
下面是 chan 接收操作的底層實現(xiàn)代碼:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 && c.qcount == 0 {
return false, false
}
// 計算接收位置
i := c.recvx
// 如果隊列中沒有數(shù)據(jù),需要阻塞等待
for c.qcount <= 0 {
if !block {
return false, false
}
gopark(chanparkcommit, unsafe.Pointer(c), "chan receive", traceEvGoBlockRecv, 1)
}
// 從隊列中取出數(shù)據(jù)
qget(c, i, ep)
// 更新接收位置
c.recvx++
// 更新隊列中的元素數(shù)量
c.qcount--
return true, true
}
chan 是如何實現(xiàn)多個 gorouting 并發(fā)安全訪問的?
如上 hchan 結(jié)構(gòu)中的 recvq 和 sendq 分別表示接收等待隊列和發(fā)送等待隊列,它們的定義如下:
type waitq struct {
first *sudog // 等待隊列的第一個元素
last *sudog // 等待隊列的最后一個元素
}
sudog 表示等待隊列中的一個元素,它的定義如下:
type sudog struct {
// 等待的 goroutine
g *g
// 是否是 select 操作
isSelect bool
// 等待隊列中的下一個元素
next *sudog
// 等待隊列中的上一個元素
prev *sudog
// 等待的元素
elem unsafe.Pointer
// 獲取鎖的時間
acquiretime int64
// 保留字段
release2 uint32
// 等待的 ticket
ticket uint32
// 父 sudog
parent *sudog
// 等待鏈表
waitlink *sudog
// 等待鏈表的尾部
waittail *sudog
// 關(guān)聯(lián)的 chan
c *hchan
// 喚醒時間
releasetime int64
}
當 chan 的隊列已滿或為空時,當前 goroutine 會被加入到發(fā)送等待隊列或接收等待隊列中,并釋放鎖。當另一個 goroutine 從 chan 中取出數(shù)據(jù)或向 chan 發(fā)送數(shù)據(jù)時,它會重新獲取鎖,并從等待隊列中取出一個 goroutine,將其喚醒。這樣,多個 goroutine 就可以通過等待隊列來實現(xiàn)并發(fā)訪問 chan。
sudog 是 Go 中非常重要的數(shù)據(jù)結(jié)構(gòu),因為 g 與同步對象關(guān)系是多對多的。
一個 g 可以出現(xiàn)在許多等待隊列上,因此一個 g 可能有很多sudog:在 select 操作中,一個 goroutine 可以等待多個 chan 中的任意一個就緒, sudog 中的 isSelect 字段被用來標記它是否是 select 操作。當一個 chan 就緒時,它會喚醒對應的 sudog,并將其從等待隊列中移除。如果一個 sudog 是 select 操作,它會在喚醒后返回一個特殊的值,表示哪個 chan 就緒了
多個 g 可能正在等待同一個同步對象,因此一個對象可能有許多 sudog:chan 在不同的 gorouting 中傳遞等待
完整的發(fā)送和接受方法實現(xiàn)如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 計算發(fā)送位置
i := c.sendx
// 計算隊列中的元素數(shù)量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果隊列已滿,需要將當前 goroutine 加入到發(fā)送等待隊列中
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false
}
// 創(chuàng)建一個 sudog,表示當前 goroutine 等待發(fā)送
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到發(fā)送等待隊列中
c.sendq.enqueue(sg)
// 釋放鎖,并將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanSend, traceEvGoBlockSend, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 從發(fā)送等待隊列中取出 sudog
sg = c.sendq.dequeue()
if sg == nil {
throw("chan send inconsistency")
}
// 將數(shù)據(jù)復制到隊列中
qput(c, i, ep)
}
// 更新發(fā)送位置
c.sendx++
// 釋放鎖
unlock(&c.lock)
return true
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 計算接收位置
i := c.recvx
// 如果隊列中沒有數(shù)據(jù),需要將當前 goroutine 加入到接收等待隊列中
if c.qcount <= 0 {
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false, false
}
// 創(chuàng)建一個 sudog,表示當前 goroutine 等待接收
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到接收等待隊列中
c.recvq.enqueue(sg)
// 釋放鎖,并將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanReceive, traceEvGoBlockRecv, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經(jīng)關(guān)閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 從接收等待隊列中取出 sudog
sg = c.recvq.dequeue()
if sg == nil {
throw("chan receive inconsistency")
}
// 從隊列中取出數(shù)據(jù)
qget(c, i, ep)
} else {
// 從隊列中取出數(shù)據(jù)
qget(c, i, ep)
}
// 更新接收位置
c.recvx++
// 更新隊列中的元素數(shù)量
c.qcount--
// 釋放鎖
unlock(&c.lock)
return true, true
}

浙公網(wǎng)安備 33010602011771號