<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      MIT6.824 lab1 實驗反思

      MIT6.824 lab1 實驗反思

      0. 實驗文檔

      6.5840 Lab 1: MapReduce

      1. 環境搭建

      實驗文檔中并沒有指出應該在哪個操作系統的環境下進行實驗,但是需要使用到 Go 的插件功能,考慮到插件功能在 Windows 上支持有限,所以應該選擇類 Unix 系統。恰好手里有一臺Linux的物理機,索性就用 VsCode + ssh 的環境進行實驗了。

      1.1 配置 Go SDK

      以下摘自 6.5840 Go:

      Depending on your Linux distribution, you might be able to get an up-to-date version of Go from the package repository, e.g. by running apt install golang. Otherwise, you can manually install a binary from Go's website. First, make sure that you're running a 64-bit kernel (uname -a should mention "x86_64 GNU/Linux"), and then run:

      $ wget -qO- https://go.dev/dl/go1.25.0.linux-amd64.tar.gz | sudo tar xz -C /usr/local
      

      You'll need to make sure /usr/local/go/bin is on your PATH. You can do this by adding export PATH=$PATH:/usr/local/go/bin to your shell's init file ( commonly this is one of .bashrc, .bash_profile or .zshrc)

      本次實驗采用的 Go 版本是 1.25.0 。

      1.2 VsCode 插件安裝

      VsCode這邊大概要安裝一個Go語言的插件,Code Runner 隨意吧

      1.3 VsCode 編碼體驗優化

      執行命令:

      $ go install -v github.com/sqs/goreturns@latest
      
      • 該命令會在你的 $GOPATH/bin 目錄下(或者如果你設置了 $GOBIN,則在 $GOBIN 目錄下)生成一個名為 goreturns 的可執行文件。
      • goreturns 是一個用于格式化 Go 源代碼的工具,特別適用于在保存文件時自動格式化。它在 gofmt 的基礎上增加了額外的功能,比如自動添加或移除導入語句(類似 goimports),并且會針對返回語句(return)進行特殊處理(例如在函數返回多個值且類型復雜時,自動拆分成多行以增強可讀性),因此得名 “goreturns”。

      1.4 VsCode debug 工具配置(重要)

      在項目根目錄下(當然也可以自己選擇一個工作目錄,feel free)新建 .vscode/launch.json,編寫好基本的配置:

      image-20250823100442611

      關鍵配置詳解:

      • configurations.program: 要debug的程序入口文件

      • configurations.cwd: 程序工作目錄

      • configurations.args: 參數,這里是編譯好的插件和輸入文件

      • configurations.bulldFlags: 開啟競態條件檢測

      這里的 debug 工具默認使用 $GOPATH/bin 路徑下的 delve 工具。通常來說,delve 工具支持的 Go SDK 版本一定要和當前 Go SDK 版本是兼容的才行。本機環境是 Go 1.25.0,采用的 delve 版本是 1.25.1、

      在配置好這些以后啟動調試,可能會出現一個匪夷所思的問題:

      image-20250823101450770

      這里顯示無法加載 wc.go 編譯來的插件。

      如果我們使用實驗文檔給出來的編譯命令的確是會出現這個問題。原因是通常來說,編譯器對源碼進行編譯的時候多少會帶點優化手段,比如調整一些指令順序,以達到較好的性能。但這樣對于 debug 肯定是不利的。

      解決辦法就是,禁止編譯器進行優化。所以如果要使用 delve 對代碼進行 debug 的話,需要把編譯插件的命令改成這樣:

      $ go build -race -buildmode=plugin -gcflags="all=-N -l" -o wc.so ../mrapps/wc.go
      

      參數詳解:

      • race:開啟競態條件檢測
      • buildmode=plugin:輸出是共享庫文件
      • gcflags=
        • all:表示這些選項應用于當前模塊的所有包(包括依賴包)
        • N:禁止編譯器優化
        • -l:禁止函數內聯

      做完wg.so插件的一些瑣碎的工作以后,運行 debug,會發現還有一個問題:

      image-20250823103323678

      這是因為 go build 命令不支持通配符的使用,解決方案是直接把文件名寫死在 debug 配置文件里:

      image-20250823103715703

      做好上面的一切以后,就可以開始快樂 debug 了。Have fun~

      2. lab——,啟動——

      2.1 需求描述

      • 簡單復刻 MapReduce 論文中描述的分布式數據處理系統——其中包含一個 master (實驗中采用了 Coordinator 這個術語)和若干個 worker。master 負責調度分配任務,監測 worker 是否可能下線;worker 則負責執行具體的 Map 階段任務和 Reduce 階段任務
      • 主要需求如下:
        1. 協調者(coordinator)
          • 跟蹤所有 Map 和 Reduce 任務的狀態(待分配、進行中、已完成)。
          • 響應工作者的任務請求,分配任務(Map任務或Reduce任務)。
          • 當Map任務全部完成后,才能開始分配Reduce任務。
          • 監控任務超時(10秒),將超時任務重新分配。
          • 當所有任務完成后,協調者退出(通過Done()方法返回true)。
        2. 工作者(worker)
          • 循環向協調者請求任務。
          • 根據任務類型(Map/Reduce)讀取輸入文件,調用對應的Map/Reduce函數。
          • 將Map任務的輸出寫入中間文件(按照nReduce分桶),格式為mr-X-Y,其中X是Map任務編號,Y是Reduce任務編號。
          • 將Reduce任務的輸出寫入最終文件mr-out-X(每個Reduce任務一個輸出文件)。
          • 如果協調者沒有任務可分配(例如所有任務已完成),工作者應退出。
        3. 文件處理
          • Map任務:輸入是多個文件(每個文件對應一個Map任務),輸出是nReduce個中間文件(每個中間文件對應一個Reduce任務桶)。
          • Reduce任務:從多個Map任務產生的中間文件(同一個Reduce桶編號)中讀取數據,執行Reduce函數,輸出到一個最終文件。
          • 使用JSON格式寫入和讀取中間文件(鍵值對)。
          • 最終輸出文件mr-out-X的格式:每行是"%v %v",即鍵和值。
        4. 容錯
          • 工作者可能在執行任務時崩潰,協調者等待10秒后重新分配該任務給其他工作者。
          • 確保最終結果正確,不能因為任務重復執行而出現錯誤(Map和Reduce函數必須是冪等的)。
        5. 并發
          • 多個Map任務和Reduce任務可以并行執行。
          • 協調者需要處理并發的RPC請求,注意鎖的使用。
        6. 測試
          • 通過test-mr.sh測試,包括:單詞計數(wc)、索引生成(indexer)、并行性測試、任務計數、提前退出測試和崩潰測試。

      2.2 詳細實現方案

      2.2.1 任務及其狀態

      定義Map任務和Reduce任務的結構:

      type Task struct {
      	TimeStamp time.Time // time stamp that give the task to any worker
      	Filenames []string  // input files
      	TaskID    int       // task ID
      	TaskType  int       // task type, 0 map task, 1 reduce task
      	Status    int       // task status, 0 init, 1 ready, 2 in process, 3 completed, 4 failed
      }
      

      這里我們要記錄每個任務的開始時間戳、輸入文件列表、任務ID、任務類型、任務狀態。其中任務包含3個狀態,分別是就緒、處理中和已完成:

      const (
      	TASK_READY = iota // ready
      	TASK_IN_PROCESS   // in process
      	TASK_COMPLETED    // completed
      )
      

      其有限狀態機如下:

      image-20250826145237766

      任務創建的時候,應該是賦成 ready 狀態。這個時候的時間戳沒有含義,所以也應該賦零值;在任務分發出去以后,應該進入 in process 狀態,表示任務正在處理中。這期間有3種情況:成功、失敗、超時。如果任務處理成功了,自然標記成 completed 狀態,表示任務完成;如果任務處理失敗,或者是超時,在 Coordinator 這邊都只能感知到 worker 進程有一段時間沒有發送任務完成的通知了,所以就把任務初始化,然后分配給下一個來尋求新任務的 worker。

      2.2.2 Coordinator 實現細節

      Coordinator 結構

      對于 Coordinator來說,需要記錄所有的輸入文件的信息,所有的 Map 任務和 Reduce 任務的信息,同時考慮到這些任務信息都是并發的讀寫操作,所以需要引入互斥鎖對臨界資源進行保護。所以定義 Coordinator 結構如下:

      type Coordinator struct {
      	// Your definitions here.
      	mu      *sync.Mutex // mutex lock, to protect 2 maps below
      	nReduce int
      	tasks   map[int][]*Task // all the tasks, key = 0 -> map tasks, key = 1 -> reduce tasks
      }
      

      這里我選擇了 map 這個數據結構去存放兩種類型的任務,每個 key 對應一個任務列表,列表中的就是尚未完成的任務。

      Coordinator 需要給 Worker 提供的 RPC

      GetTask

      func (c *Coordinator) GetTask(args *GetTaskRequest, response *GetTaskResponse) (err error) {
      	c.mu.Lock()
      	defer c.mu.Unlock()
      
      	mapTaskRem := len(c.tasks[TASK_TYPE_MAP])
      	reduceTaskRem := len(c.tasks[TASK_TYPE_REDUCE])
      
      	if mapTaskRem > 0 {
      		err = c.getMapTask(response)
      		if err != nil {
      			return err
      		}
      		return nil
      	} else if reduceTaskRem > 0 {
      		err = c.getReduceTask(response)
      		if err != nil {
      			return err
      		}
      		return nil
      	}
      	response.TaskType = TASK_DONE
      
      	return nil
      }
      

      根據實驗文檔的要求,要先完成所有 Map 任務,再處理 Reduce 任務。顯然 GetTask 需要實現這一點。大致邏輯是,先加好鎖,然后獲取 Map 任務剩余數量和 Reduce 任務剩余數量。根據剩余任務數量判斷應該獲取 Map 任務還是 Reduce 任務。如果兩個任務都沒有剩余,就可以返回 TASK_DONE 這個任務類型,表示沒有任務可做,這時 Worker 就可以退出了。

      這里的 TASK_DONE 這個類型是必須的。如果不從這里獲取沒有任務執行的信息,而單從 response 有沒有記錄具體任務來判斷是否有任務執行的話,會出現一種情況:當前所有任務都在處理中和沒有任務處理兩種狀態是無法區分的,這個時候程序的行為就不符合預期了。

      ReportTask

      func (c *Coordinator) ReportTask(args *ReportTaskArgs, response *struct{}) error {
      	taskID, taskType := args.TaskId, args.TaskType
      
      	c.mu.Lock()
      	defer c.mu.Unlock()
      
      	for _, task := range c.tasks[taskType] {
      		if task.TaskID != taskID {
      			continue
      		}
      		if time.Since(task.TimeStamp) < TASK_TIMEOUT { // report in time, and mark the task as completed
      			task.Status = TASK_COMPLETED
      			log.Printf("Coordinator.ReportTask: task %d completed.", taskID)
      		} else { // report timeout, and mark the task as failed
      			task.Status = TASK_READY
      			task.TimeStamp = time.Time{}
      			if task.TaskType == TASK_TYPE_REDUCE {
      				task.Filenames = nil
      			}
      			log.Printf("Coordinator.ReportTask: task %d timeout.", taskID)
      		}
      	}
      
      	return nil
      }
      

      當 Worker 執行完某個任務以后,就要調用 ReportTask 向 Coordinator 通告當前任務已經完成。這個時候 Coordinator 需要做的就是判斷通告到達的時間,如果超出了 TASK_TIMEOUT(10s),就要重置任務狀態,期待其他 Worker 重新獲取這個任務并執行;否則就標記為已完成狀態,等待后臺 goroutine 將已完成的任務從任務列表中清除。不過通常來說,這里不會出現 TASK_TIMEOUT,因為 Coordinator 也會在后臺跑一個 goroutine,周期性地檢查并重置超時任務。

      2.2.3 Worker 實現細節

      相對于 Coordinator 來說,Worker 就簡單很多了,它只要循環地獲取任務、執行任務、通告任務完成就好了。

      但是也有一點需要注意,比如當前 Worker 實際上已經執行一段時間了,觸發了 TASK_TIMEOUT。在 Coordinator 端,發現了這個超時任務,并分配給了其他正在請求任務的 Worker。但是當前的 Worker 并不知道 Coordinator 做的這個事情,它一直執行,直到算出了所有的結果——這個時候它要把結果寫入到對應的中間文件中。這里如果處理不當,可能在中間文件中會看到兩份一模一樣的結果,這樣就會影響到最終結果的生成。

      根據實驗文檔的提示,可以使用臨時文件+原子性重命名操作來規避這個問題。當 Worker 正在計算結果的時候,要將結果寫入臨時文件中,當結果計算完之后,就要原子性重命名這個臨時文件,將它重命名成當前階段任務的輸出結果文件。當然如果在重命名之前就發現這個輸出結果文件存在的話,就不必再進行重命名操作了,因為這個時候有其他 Worker 接替當前 Worker 完成了任務。

      func runMapTask(res *GetTaskResponse, mapf func(string, string) []KeyValue) error {
      	intermediate := []KeyValue{}
      
      	filenames := res.Filenames
      	for _, filename := range filenames {
      		file, err := os.Open(filename)
      		if err != nil {
      			return err
      		}
      		defer file.Close()
      
      		content, err := ioutil.ReadAll(file)
      		if err != nil {
      			return err
      		}
      		kva := mapf(filename, string(content))
      		intermediate = append(intermediate, kva...)
      	}
      
      	// write the kvs into intermediate file
      	finalFilenames := make([]string, res.NReduce)
      	outFiles := make([]*os.File, res.NReduce)
      	outEncoders := make([]*json.Encoder, res.NReduce)
      	for i := 0; i < res.NReduce; i++ {
      		filename := fmt.Sprintf("mr-%d-%d", res.TaskId, i)
      		tmpFilename := fmt.Sprintf("tmp-%s", filename)
      		outFiles[i], _ = os.CreateTemp(".", tmpFilename)
      		outEncoders[i] = json.NewEncoder(outFiles[i])
      		finalFilenames[i] = filename
      	}
      
      	for _, kv := range intermediate {
      		reduceID := ihash(kv.Key) % res.NReduce
      		outEncoders[reduceID].Encode(&kv)
      	}
      
      	for i, tmpFile := range outFiles {
      		if err := tmpFile.Close(); err != nil {
      			return fmt.Errorf("close temp file %s failed: %w", tmpFile.Name(), err)
      		}
      		tmpPath := tmpFile.Name()
      		finalPath := finalFilenames[i]
      		if _, err := os.Stat(finalPath); err == nil { // exist, ignore this rename operation
      			continue
      		}
      		if err := os.Rename(tmpPath, finalPath); err != nil {
      			return fmt.Errorf("rename temp file %s to %s failed: %w", tmpPath, finalPath, err)
      		}
      	}
      
      	return nil
      }
      
      

      當這個任務完成以后,要向 Coordinator 通告當前任務 ID,表示這個任務已經完成了,通知 Coordinator 后臺 goroutine 將其刪除。

      2.2.4 驗證

      1. 可以參照實驗文檔,跑一遍這個分布式計算系統,然后通過命令:

        $  cat mr-out-* | LC_ALL=C sort | more
        

        查看計算結果。

      2. 運行 test-mr-many.sh 腳本若干次,如果一直穩定通過,就表示當前實現可以通過實驗。

      image-20250826010853939

      3. 總結

      本實驗旨在實現一個簡單的分布式計算框架,引出分布式系統的知識。

      對于本實驗來說,有以下三個難點:

      • 理清 Coordinator 和 Worker 各自的職責
      • 哪些信息需要進行通告
      • 提供一個簡單的容錯機制保證任務能夠及時處理

      在閱讀過 MapReduce 論文 以后,其實可以發現這個分布式計算框架仍然有很多值得改進的地方。但是對于分布式系統的啟蒙作用,不管是于當時的技術人員,還是于當下初步認識分布式系統的我們,都有著極高的歷史地位。所以,還是對 Google 的技術人員們說一聲:Respect!

      posted @ 2025-08-26 22:18  Led_Zeppelin_死忠粉  閱讀(52)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产AV国片精品有毛| 精品国精品国产自在久国产应用男 | 亚洲午夜伦费影视在线观看| 欧洲一区二区中文字幕| 亚洲码国产精品高潮在线| 最新国产精品拍自在线观看| 亚洲一区在线成人av| 国产私拍福利精品视频| 亚洲 日本 欧洲 欧美 视频| www国产无套内射com| 亚洲欧美在线观看| 国产成人亚洲综合图区| 伊人久久精品久久亚洲一区| 干老熟女干老穴干老女人| 深夜av免费在线观看| 久久久久青草线综合超碰| 国产在线一区二区在线视频| 日本精品一区二区不卡| 京山县| 最新午夜男女福利片视频| 国产无套护士在线观看| 午夜免费福利小电影| 亚洲超碰97无码中文字幕| 国产精品中文字幕二区| 国产成人免费ā片在线观看| 日韩中文字幕V亚洲中文字幕| 平罗县| 精品无码国产一区二区三区AV| 久久精品国产亚洲精品| 爱性久久久久久久久| 国产亚洲精品久久77777| 乱码午夜-极品国产内射| 亚洲欧美高清在线精品一区二区| 日韩一区日韩二区日韩三区| 国产精品久久久一区二区三区| аⅴ天堂中文在线网| 国产精品中文字幕久久| 性欧美老人牲交xxxxx视频| 色道久久综合亚洲精品蜜桃| 久久久久成人片免费观看蜜芽 | 性XXXX视频播放免费直播|