MIT6.824 lab1 實驗反思
MIT6.824 lab1 實驗反思
0. 實驗文檔
1. 環境搭建
實驗文檔中并沒有指出應該在哪個操作系統的環境下進行實驗,但是需要使用到 Go 的插件功能,考慮到插件功能在 Windows 上支持有限,所以應該選擇類 Unix 系統。恰好手里有一臺Linux的物理機,索性就用 VsCode + ssh 的環境進行實驗了。
1.1 配置 Go SDK
以下摘自 6.5840 Go:
Depending on your Linux distribution, you might be able to get an up-to-date version of Go from the package repository, e.g. by running apt install golang. Otherwise, you can manually install a binary from Go's website. First, make sure that you're running a 64-bit kernel (uname -a should mention "x86_64 GNU/Linux"), and then run:
$ wget -qO- https://go.dev/dl/go1.25.0.linux-amd64.tar.gz | sudo tar xz -C /usr/local
You'll need to make sure /usr/local/go/bin is on your PATH. You can do this by adding export PATH=$PATH:/usr/local/go/bin to your shell's init file ( commonly this is one of .bashrc, .bash_profile or .zshrc)
本次實驗采用的 Go 版本是 1.25.0 。
1.2 VsCode 插件安裝
VsCode這邊大概要安裝一個Go語言的插件,Code Runner 隨意吧
1.3 VsCode 編碼體驗優化
執行命令:
$ go install -v github.com/sqs/goreturns@latest
- 該命令會在你的
$GOPATH/bin目錄下(或者如果你設置了$GOBIN,則在$GOBIN目錄下)生成一個名為goreturns的可執行文件。 goreturns是一個用于格式化 Go 源代碼的工具,特別適用于在保存文件時自動格式化。它在gofmt的基礎上增加了額外的功能,比如自動添加或移除導入語句(類似goimports),并且會針對返回語句(return)進行特殊處理(例如在函數返回多個值且類型復雜時,自動拆分成多行以增強可讀性),因此得名 “goreturns”。
1.4 VsCode debug 工具配置(重要)
在項目根目錄下(當然也可以自己選擇一個工作目錄,feel free)新建 .vscode/launch.json,編寫好基本的配置:

關鍵配置詳解:
-
configurations.program: 要debug的程序入口文件
-
configurations.cwd: 程序工作目錄
-
configurations.args: 參數,這里是編譯好的插件和輸入文件
-
configurations.bulldFlags: 開啟競態條件檢測
這里的 debug 工具默認使用 $GOPATH/bin 路徑下的 delve 工具。通常來說,delve 工具支持的 Go SDK 版本一定要和當前 Go SDK 版本是兼容的才行。本機環境是 Go 1.25.0,采用的 delve 版本是 1.25.1、
在配置好這些以后啟動調試,可能會出現一個匪夷所思的問題:

這里顯示無法加載 wc.go 編譯來的插件。
如果我們使用實驗文檔給出來的編譯命令的確是會出現這個問題。原因是通常來說,編譯器對源碼進行編譯的時候多少會帶點優化手段,比如調整一些指令順序,以達到較好的性能。但這樣對于 debug 肯定是不利的。
解決辦法就是,禁止編譯器進行優化。所以如果要使用 delve 對代碼進行 debug 的話,需要把編譯插件的命令改成這樣:
$ go build -race -buildmode=plugin -gcflags="all=-N -l" -o wc.so ../mrapps/wc.go
參數詳解:
- race:開啟競態條件檢測
- buildmode=plugin:輸出是共享庫文件
- gcflags=
- all:表示這些選項應用于當前模塊的所有包(包括依賴包)
- N:禁止編譯器優化
- -l:禁止函數內聯
做完wg.so插件的一些瑣碎的工作以后,運行 debug,會發現還有一個問題:

這是因為 go build 命令不支持通配符的使用,解決方案是直接把文件名寫死在 debug 配置文件里:

做好上面的一切以后,就可以開始快樂 debug 了。Have fun~
2. lab——,啟動——
2.1 需求描述
- 簡單復刻 MapReduce 論文中描述的分布式數據處理系統——其中包含一個 master (實驗中采用了 Coordinator 這個術語)和若干個 worker。master 負責調度分配任務,監測 worker 是否可能下線;worker 則負責執行具體的 Map 階段任務和 Reduce 階段任務
- 主要需求如下:
- 協調者(coordinator):
- 跟蹤所有 Map 和 Reduce 任務的狀態(待分配、進行中、已完成)。
- 響應工作者的任務請求,分配任務(Map任務或Reduce任務)。
- 當Map任務全部完成后,才能開始分配Reduce任務。
- 監控任務超時(10秒),將超時任務重新分配。
- 當所有任務完成后,協調者退出(通過
Done()方法返回true)。
- 工作者(worker):
- 循環向協調者請求任務。
- 根據任務類型(Map/Reduce)讀取輸入文件,調用對應的Map/Reduce函數。
- 將Map任務的輸出寫入中間文件(按照
nReduce分桶),格式為mr-X-Y,其中X是Map任務編號,Y是Reduce任務編號。 - 將Reduce任務的輸出寫入最終文件
mr-out-X(每個Reduce任務一個輸出文件)。 - 如果協調者沒有任務可分配(例如所有任務已完成),工作者應退出。
- 文件處理:
- Map任務:輸入是多個文件(每個文件對應一個Map任務),輸出是
nReduce個中間文件(每個中間文件對應一個Reduce任務桶)。 - Reduce任務:從多個Map任務產生的中間文件(同一個Reduce桶編號)中讀取數據,執行Reduce函數,輸出到一個最終文件。
- 使用JSON格式寫入和讀取中間文件(鍵值對)。
- 最終輸出文件
mr-out-X的格式:每行是"%v %v",即鍵和值。
- Map任務:輸入是多個文件(每個文件對應一個Map任務),輸出是
- 容錯:
- 工作者可能在執行任務時崩潰,協調者等待10秒后重新分配該任務給其他工作者。
- 確保最終結果正確,不能因為任務重復執行而出現錯誤(Map和Reduce函數必須是冪等的)。
- 并發:
- 多個Map任務和Reduce任務可以并行執行。
- 協調者需要處理并發的RPC請求,注意鎖的使用。
- 測試:
- 通過
test-mr.sh測試,包括:單詞計數(wc)、索引生成(indexer)、并行性測試、任務計數、提前退出測試和崩潰測試。
- 通過
- 協調者(coordinator):
2.2 詳細實現方案
2.2.1 任務及其狀態
定義Map任務和Reduce任務的結構:
type Task struct {
TimeStamp time.Time // time stamp that give the task to any worker
Filenames []string // input files
TaskID int // task ID
TaskType int // task type, 0 map task, 1 reduce task
Status int // task status, 0 init, 1 ready, 2 in process, 3 completed, 4 failed
}
這里我們要記錄每個任務的開始時間戳、輸入文件列表、任務ID、任務類型、任務狀態。其中任務包含3個狀態,分別是就緒、處理中和已完成:
const (
TASK_READY = iota // ready
TASK_IN_PROCESS // in process
TASK_COMPLETED // completed
)
其有限狀態機如下:

任務創建的時候,應該是賦成 ready 狀態。這個時候的時間戳沒有含義,所以也應該賦零值;在任務分發出去以后,應該進入 in process 狀態,表示任務正在處理中。這期間有3種情況:成功、失敗、超時。如果任務處理成功了,自然標記成 completed 狀態,表示任務完成;如果任務處理失敗,或者是超時,在 Coordinator 這邊都只能感知到 worker 進程有一段時間沒有發送任務完成的通知了,所以就把任務初始化,然后分配給下一個來尋求新任務的 worker。
2.2.2 Coordinator 實現細節
Coordinator 結構
對于 Coordinator來說,需要記錄所有的輸入文件的信息,所有的 Map 任務和 Reduce 任務的信息,同時考慮到這些任務信息都是并發的讀寫操作,所以需要引入互斥鎖對臨界資源進行保護。所以定義 Coordinator 結構如下:
type Coordinator struct {
// Your definitions here.
mu *sync.Mutex // mutex lock, to protect 2 maps below
nReduce int
tasks map[int][]*Task // all the tasks, key = 0 -> map tasks, key = 1 -> reduce tasks
}
這里我選擇了 map 這個數據結構去存放兩種類型的任務,每個 key 對應一個任務列表,列表中的就是尚未完成的任務。
Coordinator 需要給 Worker 提供的 RPC
GetTask
func (c *Coordinator) GetTask(args *GetTaskRequest, response *GetTaskResponse) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
mapTaskRem := len(c.tasks[TASK_TYPE_MAP])
reduceTaskRem := len(c.tasks[TASK_TYPE_REDUCE])
if mapTaskRem > 0 {
err = c.getMapTask(response)
if err != nil {
return err
}
return nil
} else if reduceTaskRem > 0 {
err = c.getReduceTask(response)
if err != nil {
return err
}
return nil
}
response.TaskType = TASK_DONE
return nil
}
根據實驗文檔的要求,要先完成所有 Map 任務,再處理 Reduce 任務。顯然 GetTask 需要實現這一點。大致邏輯是,先加好鎖,然后獲取 Map 任務剩余數量和 Reduce 任務剩余數量。根據剩余任務數量判斷應該獲取 Map 任務還是 Reduce 任務。如果兩個任務都沒有剩余,就可以返回 TASK_DONE 這個任務類型,表示沒有任務可做,這時 Worker 就可以退出了。
這里的 TASK_DONE 這個類型是必須的。如果不從這里獲取沒有任務執行的信息,而單從 response 有沒有記錄具體任務來判斷是否有任務執行的話,會出現一種情況:當前所有任務都在處理中和沒有任務處理兩種狀態是無法區分的,這個時候程序的行為就不符合預期了。
ReportTask
func (c *Coordinator) ReportTask(args *ReportTaskArgs, response *struct{}) error {
taskID, taskType := args.TaskId, args.TaskType
c.mu.Lock()
defer c.mu.Unlock()
for _, task := range c.tasks[taskType] {
if task.TaskID != taskID {
continue
}
if time.Since(task.TimeStamp) < TASK_TIMEOUT { // report in time, and mark the task as completed
task.Status = TASK_COMPLETED
log.Printf("Coordinator.ReportTask: task %d completed.", taskID)
} else { // report timeout, and mark the task as failed
task.Status = TASK_READY
task.TimeStamp = time.Time{}
if task.TaskType == TASK_TYPE_REDUCE {
task.Filenames = nil
}
log.Printf("Coordinator.ReportTask: task %d timeout.", taskID)
}
}
return nil
}
當 Worker 執行完某個任務以后,就要調用 ReportTask 向 Coordinator 通告當前任務已經完成。這個時候 Coordinator 需要做的就是判斷通告到達的時間,如果超出了 TASK_TIMEOUT(10s),就要重置任務狀態,期待其他 Worker 重新獲取這個任務并執行;否則就標記為已完成狀態,等待后臺 goroutine 將已完成的任務從任務列表中清除。不過通常來說,這里不會出現 TASK_TIMEOUT,因為 Coordinator 也會在后臺跑一個 goroutine,周期性地檢查并重置超時任務。
2.2.3 Worker 實現細節
相對于 Coordinator 來說,Worker 就簡單很多了,它只要循環地獲取任務、執行任務、通告任務完成就好了。
但是也有一點需要注意,比如當前 Worker 實際上已經執行一段時間了,觸發了 TASK_TIMEOUT。在 Coordinator 端,發現了這個超時任務,并分配給了其他正在請求任務的 Worker。但是當前的 Worker 并不知道 Coordinator 做的這個事情,它一直執行,直到算出了所有的結果——這個時候它要把結果寫入到對應的中間文件中。這里如果處理不當,可能在中間文件中會看到兩份一模一樣的結果,這樣就會影響到最終結果的生成。
根據實驗文檔的提示,可以使用臨時文件+原子性重命名操作來規避這個問題。當 Worker 正在計算結果的時候,要將結果寫入臨時文件中,當結果計算完之后,就要原子性重命名這個臨時文件,將它重命名成當前階段任務的輸出結果文件。當然如果在重命名之前就發現這個輸出結果文件存在的話,就不必再進行重命名操作了,因為這個時候有其他 Worker 接替當前 Worker 完成了任務。
func runMapTask(res *GetTaskResponse, mapf func(string, string) []KeyValue) error {
intermediate := []KeyValue{}
filenames := res.Filenames
for _, filename := range filenames {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
content, err := ioutil.ReadAll(file)
if err != nil {
return err
}
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
// write the kvs into intermediate file
finalFilenames := make([]string, res.NReduce)
outFiles := make([]*os.File, res.NReduce)
outEncoders := make([]*json.Encoder, res.NReduce)
for i := 0; i < res.NReduce; i++ {
filename := fmt.Sprintf("mr-%d-%d", res.TaskId, i)
tmpFilename := fmt.Sprintf("tmp-%s", filename)
outFiles[i], _ = os.CreateTemp(".", tmpFilename)
outEncoders[i] = json.NewEncoder(outFiles[i])
finalFilenames[i] = filename
}
for _, kv := range intermediate {
reduceID := ihash(kv.Key) % res.NReduce
outEncoders[reduceID].Encode(&kv)
}
for i, tmpFile := range outFiles {
if err := tmpFile.Close(); err != nil {
return fmt.Errorf("close temp file %s failed: %w", tmpFile.Name(), err)
}
tmpPath := tmpFile.Name()
finalPath := finalFilenames[i]
if _, err := os.Stat(finalPath); err == nil { // exist, ignore this rename operation
continue
}
if err := os.Rename(tmpPath, finalPath); err != nil {
return fmt.Errorf("rename temp file %s to %s failed: %w", tmpPath, finalPath, err)
}
}
return nil
}
當這個任務完成以后,要向 Coordinator 通告當前任務 ID,表示這個任務已經完成了,通知 Coordinator 后臺 goroutine 將其刪除。
2.2.4 驗證
-
可以參照實驗文檔,跑一遍這個分布式計算系統,然后通過命令:
$ cat mr-out-* | LC_ALL=C sort | more查看計算結果。
-
運行
test-mr-many.sh腳本若干次,如果一直穩定通過,就表示當前實現可以通過實驗。

3. 總結
本實驗旨在實現一個簡單的分布式計算框架,引出分布式系統的知識。
對于本實驗來說,有以下三個難點:
- 理清 Coordinator 和 Worker 各自的職責
- 哪些信息需要進行通告
- 提供一個簡單的容錯機制保證任務能夠及時處理
在閱讀過 MapReduce 論文 以后,其實可以發現這個分布式計算框架仍然有很多值得改進的地方。但是對于分布式系統的啟蒙作用,不管是于當時的技術人員,還是于當下初步認識分布式系統的我們,都有著極高的歷史地位。所以,還是對 Google 的技術人員們說一聲:Respect!

浙公網安備 33010602011771號