Golang 高性能 Websocket 庫(kù) gws 使用與設(shè)計(jì)(一)
前言
大家好這里是,白澤,這期分析一下 golang 開(kāi)源高性能 websocket 庫(kù) gws。
視頻講解請(qǐng)關(guān)注??B站:白澤talk

介紹
- gws:https://github.com/lxzan/gws |GitHub ?? 1.2k,高性能的 websocket 庫(kù),代碼雙語(yǔ)注釋?zhuān)m合有開(kāi)發(fā)經(jīng)驗(yàn)的同學(xué)進(jìn)階學(xué)習(xí)。
- gws 的兩個(gè)特性
-
High IOPS Low Latency(高I/O,低延遲)
-
Low Memory Usage(低內(nèi)存占用)
可以從下圖看到: payload 越高,性能相比其他 websocket 庫(kù)越是優(yōu)越,如何做到?

gws chatroom 架構(gòu)圖
這是 gws 的官方聊天室 demo 的架構(gòu)圖,繪制在這里幫助各位理解什么是全雙工的通信模式。

WebSocket 與 HTTP 一樣是應(yīng)用層的協(xié)議,只需要 TCP 完成三次握手之后,Golang 的 net/http 庫(kù)提供了 Hijack() 方法,將 TCP 套接字(活躍的一個(gè)會(huì)話(huà)),從 HTTP 劫持,此后 tcp 的連接將由 WebSocket 管理,脫離了 HTTP 協(xié)議的范疇。
而只要獲取了 TCP 的套接字,何時(shí)發(fā)送和接受數(shù)據(jù),都是由應(yīng)用層決定的,傳輸層的 TCP 套接字只是被編排的對(duì)象(單工/雙工),自然可以實(shí)現(xiàn)服務(wù)端主動(dòng)發(fā)送數(shù)據(jù)。
緩沖池
為什么 payload 越高,性能相比其他 websocket 庫(kù)越是優(yōu)越?
原因:gws 中的讀寫(xiě)操作,全部使用了緩沖池。

binaryPool = internal.NewBufferPool(128, 256*1024) // 緩沖池
讀緩沖:每次讀取是一次系統(tǒng)調(diào)用,因此可以讀取一段數(shù)據(jù),且用一個(gè) offset 定位消費(fèi)的位置,減少讀取次數(shù)。
寫(xiě)緩沖:每次寫(xiě)入是一次系統(tǒng)調(diào)用,因此可以多次寫(xiě)入 buffer,統(tǒng)一 flush。
緩沖池:為不同大小的 buffer 提供了緩沖池,大段 buffer 的創(chuàng)建次數(shù)減少,減少 GC 壓力 & 創(chuàng)建對(duì)象和銷(xiāo)毀對(duì)象時(shí)間。
// NewBufferPool Creating a memory pool
// Left, right indicate the interval range of the memory pool, they will be transformed into pow(2,n)。
// Below left, Get method will return at least left bytes; above right, Put method will not reclaim the buffer.
func NewBufferPool(left, right uint32) *BufferPool {
var begin, end = int(binaryCeil(left)), int(binaryCeil(right))
var p = &BufferPool{
begin: begin,
end: end,
shards: map[int]*sync.Pool{},
}
for i := begin; i <= end; i *= 2 {
capacity := i
p.shards[i] = &sync.Pool{
New: func() any { return bytes.NewBuffer(make([]byte, 0, capacity)) },
}
}
return p
}
使用循環(huán)從 begin 到 end,每次容量翻倍(乘以2),為每個(gè)容量創(chuàng)建一個(gè) sync.Pool 實(shí)例。sync.Pool 是Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)中的一個(gè)類(lèi)型,用于存儲(chǔ)和回收臨時(shí)對(duì)象。
使用緩沖池中的 buffer 從 conn(網(wǎng)絡(luò)連接)中讀取和寫(xiě)入數(shù)據(jù)時(shí),通常會(huì)執(zhí)行以下步驟:
- 從緩沖池獲取緩沖區(qū):使用
Get方法從緩沖池中獲取一個(gè)buffer。 - 讀取數(shù)據(jù):如果需要從
conn讀取數(shù)據(jù),可以將buffer用作讀取操作的目的地。 - 處理數(shù)據(jù):根據(jù)需要處理讀取到的數(shù)據(jù)。
- 寫(xiě)入數(shù)據(jù):如果需要寫(xiě)入數(shù)據(jù),可以將數(shù)據(jù)寫(xiě)入從緩沖池獲取的
buffer,然后從buffer寫(xiě)入conn。 - 釋放緩沖區(qū):使用完畢后,將
buffer放回緩沖池,以便重用。
設(shè)計(jì)一個(gè) WebScket 庫(kù)
編寫(xiě)WebSocket庫(kù)時(shí),有幾個(gè)關(guān)鍵點(diǎn)會(huì)影響其性能,尤其是在高并發(fā)場(chǎng)景下。
下面針對(duì)這些場(chǎng)景,部分給出一些 demo 寫(xiě)法(偽代碼),可以從中提煉一些通用的項(xiàng)目設(shè)計(jì)方法:
- 事件驅(qū)動(dòng)模型: 使用非阻塞的事件驅(qū)動(dòng)架構(gòu)可以提高性能,因?yàn)樗试SWebSocket庫(kù)在單個(gè)線(xiàn)程內(nèi)處理多個(gè)連接,而不會(huì)因等待I/O操作而阻塞。
package main
import (
"fmt"
"time"
)
func main() {
eventChan := make(chan string)
readyChan := make(chan bool)
// 模擬WebSocket連接
go func() {
time.Sleep(2 * time.Second)
eventChan <- "connected"
readyChan <- true
}()
// 事件處理循環(huán)
for {
select {
case event := <-eventChan:
fmt.Println("Event received:", event)
case <-readyChan:
fmt.Println("WebSocket is ready to use")
return
}
}
}
-
并發(fā)處理: 庫(kù)如何處理并發(fā)連接和消息是影響性能的重要因素。使用goroutines或線(xiàn)程池可以提高并發(fā)處理能力。
-
消息壓縮: 支持消息壓縮(如
permessage-deflate擴(kuò)展)可以減少傳輸數(shù)據(jù)量,但同時(shí)也會(huì)增加CPU的使用率,需要找到合適的平衡點(diǎn)。 -
內(nèi)存管理: 優(yōu)化內(nèi)存使用,比如通過(guò)減少內(nèi)存分配和重用緩沖區(qū),可以提高性能并減少垃圾回收的壓力。
var buffer = make([]byte, 0, 1024)
func readMessage(conn *websocket.Conn) {
_, buffer, err := conn.ReadMessage()
if err != nil {
// 處理錯(cuò)誤
}
// 使用buffer中的數(shù)據(jù)
}
- 連接池管理: 有效的連接池管理可以減少連接建立和關(guān)閉的開(kāi)銷(xiāo),特別是在長(zhǎng)連接和頻繁通信的場(chǎng)景下。
type WebSocketPool struct {
pool map[*websocket.Conn]struct{}
}
func (p *WebSocketPool) Add(conn *websocket.Conn) {
p.pool[conn] = struct{}{}
}
func (p *WebSocketPool) Remove(conn *websocket.Conn) {
delete(p.pool, conn)
}
func (p *WebSocketPool) Broadcast(message []byte) {
for conn := range p.pool {
conn.WriteMessage(websocket.TextMessage, message)
}
}
- 鎖和同步機(jī)制: 在多線(xiàn)程或goroutine環(huán)境中,合理的鎖和同步機(jī)制是必要的,以避免競(jìng)態(tài)條件和死鎖,但過(guò)多的鎖競(jìng)爭(zhēng)會(huì)降低性能。
import "sync"
var pool = &WebSocketPool{
pool: make(map[*websocket.Conn]struct{}),
}
var mu sync.Mutex
func broadcast(message []byte) {
mu.Lock()
defer mu.Unlock()
for conn := range pool.pool {
conn.WriteMessage(websocket.TextMessage, message)
}
}
- I/O模型: 使用非阻塞I/O或異步I/O模型可以提高性能,因?yàn)樗鼈冊(cè)试S在等待網(wǎng)絡(luò)數(shù)據(jù)時(shí)執(zhí)行其他任務(wù)。
func handleConnection(conn *websocket.Conn) {
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
return // 處理錯(cuò)誤
}
// 處理接收到的消息
}
}()
}
- 協(xié)議實(shí)現(xiàn): 精確且高效的WebSocket協(xié)議實(shí)現(xiàn),包括幀的處理、掩碼的添加和去除、以及控制幀的管理,都是影響性能的因素。
func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) {
if opcode == OpcodeText && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) {
return nil, internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding)
}
var n = payload.Len()
if n > c.config.WriteMaxPayloadSize {
return nil, internal.CloseMessageTooLarge
}
var buf = binaryPool.Get(n + frameHeaderSize)
buf.Write(framePadding[0:])
if c.pd.Enabled && opcode.isDataFrame() && n >= c.pd.Threshold {
return c.compressData(buf, opcode, payload, isBroadcast)
}
var header = frameHeader{}
headerLength, maskBytes := header.GenerateHeader(c.isServer, true, false, opcode, n)
_, _ = payload.WriteTo(buf)
var contents = buf.Bytes()
if !c.isServer {
internal.MaskXOR(contents[frameHeaderSize:], maskBytes)
}
var m = frameHeaderSize - headerLength
copy(contents[m:], header[:headerLength])
buf.Next(m)
return buf, nil
}
-
錯(cuò)誤處理和恢復(fù): 健壯的錯(cuò)誤處理和異常恢復(fù)機(jī)制可以防止個(gè)別連接的問(wèn)題影響整個(gè)服務(wù)的性能。
-
測(cè)試和基準(zhǔn): 通過(guò)廣泛的測(cè)試和基準(zhǔn)測(cè)試來(lái)識(shí)別性能瓶頸,并根據(jù)測(cè)試結(jié)果進(jìn)行優(yōu)化。
GitHub ?? 1.2k,高性能的 websocket 庫(kù),代碼雙語(yǔ)注釋?zhuān)m合有開(kāi)發(fā)經(jīng)驗(yàn)的同學(xué)進(jìn)階學(xué)習(xí)。
浙公網(wǎng)安備 33010602011771號(hào)