協程并發下數據匯總:除了互斥鎖,還有其他方式嗎?
1. 簡介
本文介紹了在并發編程中數據匯總的問題,并探討了在并發環境下使用互斥鎖和通道兩種方式來保證數據安全性的方法。
首先,通過一個實例,描述了一個并發拉取數據并匯總的案例,并使用互斥鎖來確保線程安全。然后,討論了互斥鎖的一些缺點,引出了通道作為一種替代方案,并介紹了通道的基本使用和特性。接下來,通過實例演示了如何使用通道來實現并發下的數據匯總。
最后,引用了etcd中使用通道實現協程并發下數據匯總的例子,展示了通道在實際項目中的應用。
2. 問題引入
在請求處理過程中,經常需要通過RPC接口拉取數據。有時候,由于數據量較大,單個數據拉取操作可能會導致整個請求的處理時間較長。為了加快處理速度,我們通常考慮同時開啟多個協程并發地拉取數據。一旦多個協程并發拉取數據后,主協程需要匯總這些協程拉取到的數據,然后再返回結果。在這個過程中,往往涉及對共享資源的并發訪問,為了保證線程安全性,通常會使用互斥鎖。下面通過一個簡單的代碼來展示該過程:
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
var (
// 匯總結果
dataList []Data
// 互斥鎖
mutex sync.Mutex
)
func fetchData(page int, wg *sync.WaitGroup) {
// 模擬RPC接口拉取數據的耗時操作
time.Sleep(time.Second)
// 假設從RPC接口獲取到了一批數據
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
// 使用互斥鎖保護共享數據的并發訪問
mutex.Lock()
defer mutext.Unlock()
dataList = append(dataList, data)
wg.Done()
}
func main() {
var wg sync.WaitGroup
// 定義需要拉取的數據頁數
numPages := 10
// 啟動多個協程并發地拉取數據
for i := 1; i <= numPages; i++ {
wg.Add(1)
go fetchData(i, &wg)
}
// 等待所有協程完成
wg.Wait()
// 打印拉取到的數據
fmt.Println("Fetched data:")
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在上述示例中,我們定義了一個共享的dataList切片用于保存拉取到的數據。每個goroutine通過調用fetchData函數來模擬拉取數據的過程,并使用互斥鎖mutex保護dataList的并發訪問。主協程使用sync.WaitGroup等待所有協程完成數據拉取任務,然后打印出拉取到的數據。通過并發地拉取數據,并使用互斥鎖保證線程安全,我們可以顯著提高數據拉取的速度,并且確保數據的正確性和一致性。
回看上述實現,其實是涉及到了多個協程操作同一份數據,有可能導致線程安全的問題,然后這里是通過互斥鎖來保證線程安全的。確實,使用互斥鎖是可以保證線程安全的,但是也是存在一些缺點的,比如競爭和阻塞,兩個協程同時競爭互斥鎖時,只有一個協程能夠獲得鎖,而其他協程則會被阻塞,這個就可能導致性能瓶頸,當然在這個場景下問題不大。其次就是代碼的復雜性提高了,使用互斥鎖需要仔細設計和管理,確保鎖的正確獲取和釋放。這增加了代碼的復雜性和維護成本,如果在代碼中處理鎖的方式不正確,可能會死鎖,導致程序無法繼續執行。
那我們其實就有疑問,在協程并發下數據匯總的場景,是否存在其他方式,不需要通過使用互斥鎖,也能夠保證線程安全呢? 其實還真有,Go語言中的channel非常適用于這種情況。通過使用通道,我們可以實現線程安全的數據共享和同步,而無需顯式地使用互斥鎖。下面我們來了解一下channel。
3. channel的使用
3.1 channel的基本介紹
3.1.1 基本說明
channel在Go語言中是一種特殊的數據結構,用于協程之間的通信和同步。它類似于一個先進先出(FIFO)的隊列,用于數據的傳輸和共享。在并發環境中,可以將數據發送到通道,也可以從通道中接收數據,而這兩個操作都是線程安全的。
使用channel的優勢在于它提供了內置的同步機制,無需顯式地使用互斥鎖來處理并發訪問。
當一個協程向通道發送數據時,如果通道已滿,發送操作會被阻塞,直到有其他協程從通道中接收數據釋放空間。同樣地,當一個協程從通道接收數據時,如果通道為空,接收操作也會被阻塞,直到有其他協程向通道發送數據。
同時,當多個協程同時訪問通道時,Go運行時系統會自動處理協程之間的同步和并發訪問的細節,保證數據的正確性和一致性。從而可以放心地在多個協程中使用通道進行數據的發送和接收操作,而不需要額外的鎖或同步機制來保證線程安全。
因此,使用channel其實是可以避免常見的并發問題,如競態條件和死鎖,簡化了并發編程的復雜性。
3.1.2 基本使用
通過上面對channel的基本介紹,我們已經對channel有了基本的了解,其實可以粗略理解其為一個并發安全的隊列。下面來了解下channel的基本語法,從而能夠開始使用channel。
channel基本操作分為創建channel,發送數據到channel,接收channel中的數據,以及關閉channel。下面對其進行簡單展示:
創建channel,使用make函數創建通道,通道的類型可以根據需要選擇,例如int、string等:
ch := make(chan int)
發送數據到channel:使用<-操作符將數據發送到通道中
ch <- data
接收channel中的數據: 使用<-操作符從通道中接收數據
result := <-ch
關閉channel, 使用close函數關閉通道。關閉通道后,仍然可以從通道接收數據,但無法再向通道發送數據
close(ch)
通過上面channel的四個基本操作,便能夠實現在不同協程間線程安全得傳遞數據。最后通過一個例子,完整得展示channel的基本使用。
package main
import "fmt"
func main() {
ch := make(chan string) // 創建字符串通道
defer close(ch)
go func() {
ch <- "hello, channel!" // 發送數據到通道
}()
result := <-ch // 從通道接收數據
fmt.Println(result)
}
在這個示例中,我們創建了一個字符串通道ch。然后,在一個單獨的協程中,我們向通道發送了字符串"hello, channel!"。最后,主協程從通道中接收數據,并將其打印出來。
通過使用通道,我們可以實現協程之間的數據傳輸和同步,確保數據的安全共享和線程安全性。通道的使用能夠簡化并發編程的復雜性,提供一種高效、可靠的方式來處理并發場景下的數據傳遞。
3.2 使用channel實現匯總數據
下面,我們使用channel來實現并發數據匯總,替換掉之前使用互斥鎖來保證線程安全的實現:
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {
// 模擬 RPC 接口拉取數據的耗時操作
time.Sleep(time.Second)
// 假設從 RPC 接口獲取到了一批數據
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
ch <- data // 將數據發送到通道
wg.Done()
}
func main() {
var wg sync.WaitGroup
// 定義需要拉取的數據頁數
numPages := 10
dataCh := make(chan Data, 10) // 創建用于接收數據的通道
// 啟動多個協程并發地拉取數據
for i := 1; i <= numPages; i++ {
wg.Add(1)
go fetchData(i, dataCh, &wg)
}
go func() {
wg.Wait()
close(dataCh) // 關閉通道,表示數據已經全部發送完成
}()
// 從通道接收數據并匯總
var dataList []Data
for data := range dataCh {
dataList = append(dataList, data)
}
// 打印拉取到的數據
fmt.Println("Fetched data:")
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在修改后的代碼中,我們創建了一個用于接收數據的 dataCh。每個協程通過將數據發送到該channel 來完成數據的匯總。主協程通過從channel接收數據,并將其添加到 dataList 中實現數據的匯總過程。這種方式不需要顯式地加鎖和解鎖,并且避免了互斥鎖帶來的復雜性和性能問題。
通過使用channel,我們能夠以一種更直觀、更安全的方式實現協程之間的數據傳遞和同步。channel在并發編程中起到了關鍵的作用,簡化了并發操作的管理和實現。同時,它提供了內置的同步機制,保證了數據的正確性和一致性,避免了死鎖和競態條件的問題。
3.3 總結
協程間的并發下匯總數據可以歸類為協程間的數據傳遞這個場景。在這個場景中,多個協程并發地拉取數據,然后將數據匯總到一個共享的數據結構中。為了保證數據的正確性和一致性,需要使用某種機制來確保多個協程對共享數據的并發訪問是安全的。
在原始的實現中,使用了互斥鎖來保護共享數據的并發訪問。互斥鎖提供了互斥訪問的機制,確保同一時間只有一個協程可以訪問共享數據,從而避免了數據競爭和不一致性。這種方式在保證線程安全的同時,引入了鎖的開銷和復雜性。
而使用channel來實現協程間的安全數據傳遞可以更簡潔和高效。每個協程可以將拉取到的數據通過channel發送到主協程,主協程通過接收channel中的數據來進行匯總。channel提供了并發安全的數據傳遞機制,協程之間的數據傳輸是同步和有序的。由于channel本身就提供了同步機制,不需要額外的鎖和同步操作,能夠更簡潔地實現協程間的安全數據傳遞。
因此,如果需要在多個協程間實現數據傳遞,而且由此可能帶來線程安全的問題,此時使用channel來實現是相對比較合適的。
4. 開源項目中的使用
假設我們需要對etcd進行性能測試,此時需要模擬大量并發請求,對etcd進行負載測試,并收集每個請求的執行時間、成功/失敗狀態等結果數據。然后主協程需要收集每一個請求的結果數據,并進行統計計算,生成相應的性能報告。基于此,能夠計算出總請求數、請求成功率、平均執行時間、最慢/最快請求等統計信息,以及錯誤分布情況和慢速請求的詳細信息。
從上面的講述來看,其實我們可以大概想象出這個模型,多個協程并發執行,然后獲取每個請求的結果數據。然后主協程需要收集匯總這些數據,基于此來生成性能報告。這個模型其實也就是我們上面所說的協程并發下的數據匯總,因此通過channel來實現協程間的數據傳輸,是非常合適的。
下面我們來看看etcd中對應的實現。etcd中存在一個report對象的實現,能夠接受一系列的請求數據的結果,然后生成性能報告返回回去。結構體定義如下:
type report struct {
results chan Result
stats Stats
}
func (r *report) Results() chan<- Result { return r.results }
// Result describes the timings for an operation.
type Result struct {
Start time.Time
End time.Time
Err error
}
func newReport(precision string) *report {
r := &report{
results: make(chan Result, 16),
}
return r
}
Result結構體為單個測試的結果,而 report 結構體則用于整個測試過程的報告和統計信息。通過使用 results 通道,可以將每個測試的結果發送到 report 結構體中,以便進行統計和生成報告。
當進行性能壓測時,首先通過newReport生成一個report對象,然后啟動多個協程同時進行壓測請求,每一個請求處理完成之后,便會生成一個處理結果,存儲到Result對象當中。然后基于report對象的Results方法獲取到對應的channel,將處理結果傳輸給主協程。
主協程便通過遍歷report對象中的results變量對應的channel,匯總計算所有處理結果,基于此便能夠生成壓測結果和報告。下面來看其具體流程。
首先是創建一個report對象,然后啟動多個協程來處理請求,將結果發送到report對象中的results對應的channel中。
// 這里NewReportSample方法,其實是對上面newReport方法的一個封裝
r := NewReportSample("%f")
// 這里假設只有一個協程,模擬執行一系列的測試,并將測試結果發送到 Report 對象的 results 通道中。
go func() {
start := time.Now()
for i := 0; i < 5; i++ {
// 不真實進行請求,只是簡單獲取執行結果,將測試結果進行傳輸
end := start.Add(time.Second)
r.Results() <- Result{Start: start, End: end}
start = end
}
r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
// 假設所有壓測請求都執行完成了
close(r.Results())
}()
// 主協程 匯總所有的處理結果,然后生成壓測報告
stats := <-r.Stats()
以上代碼中,r 是通過 NewReportSample("%f") 創建的一個 Report 對象。然后,在一個單獨的協程中,執行了一系列的測試,并將測試結果發送到 r.Results() 通道中。
這段代碼的作用是模擬執行一系列的測試,并將測試結果發送到 Report 對象的 results 通道中。通過使用 r.Results() 方法返回的通道,可以將測試結果發送到報告對象中進行統計和處理。
接下來,主協程應該不斷從 r.Results()方法返回的通道中讀取數據,匯總所有的處理結果,從而生成壓測報告。這個方法其實是被封裝在r.Stas()方法中,具體如下:
func (r *report) Stats() <-chan Stats {
// 創建一個channel
donec := make(chan Stats, 1)
// 啟動一個協程來執行
go func() {
defer close(donec)
r.processResults()
s := r.stats.copy()
if r.sps != nil {
s.TimeSeries = r.sps.getTimeSeries()
}
// 執行完成的話,將結果返回
donec <- s
}()
// 返回channel
return donec
}
// Stats方法啟動的協程中,實際運行的任務
func (r *report) processResults() {
st := time.Now()
// 遍歷r.results方法中channel中的數據,然后執行處理流程
for res := range r.results {
r.processResult(&res)
}
// 后續執行一些具體的計算邏輯
}
上述代碼是 report 結構體中的兩個方法,其中 Stats() 方法返回一個只讀的 Stats 通道。這個方法會在一個單獨的協程中執行,并處理 results 通道中的測試結果。事實上就是匯總channel中的數據,然后進行一定的處理,然后返回。
5. 總結
本文通過介紹并發編程中的數據匯總問題,提出了使用互斥鎖和通道來保證線程安全的方法。互斥鎖適用于臨界區保護和共享資源的互斥訪問,但可能存在死鎖和性能瓶頸的問題。相比之下,通道提供了更直觀和安全的協程間通信方式,避免了鎖的問題,并提供了更靈活的并發模式。
基于以上內容的介紹,大概能夠明確下,在數據傳遞和匯總的場景下,使用channel來實現可能是更為合適的,能夠提高代碼的可讀性和并發安全性。希望以上內容對你有所幫助。

浙公網安備 33010602011771號