[源碼解析] PyTorch 流水線并行實現 (4)--前向計算
[源碼解析] PyTorch 流水線并行實現 (4)--前向計算
0x00 摘要
前幾篇文章我們介紹了 PyTorch 流水線并行的基本知識,自動平衡機制和切分數據,本文我們結合論文內容來看看如何保證前向計算執行順序。
流水線并行其他文章鏈接如下:
[源碼解析] 深度學習流水線并行Gpipe(1)---流水線基本實現
[源碼解析] 深度學習流水線并行GPipe (2) ----- 梯度累積
[源碼解析] 深度學習流水線并行 GPipe(3) ----重計算
[源碼解析] 深度學習流水線并行之PipeDream(1)--- Profile階段
[源碼解析] 深度學習流水線并行 PipeDream(2)--- 計算分區
[源碼解析] 深度學習流水線并行 PipeDream(3)--- 轉換模型
[源碼解析] 深度學習流水線并行 PipeDream(4)--- 運行時引擎
[源碼解析] 深度學習流水線并行 PipeDream(5)--- 通信模塊
[源碼解析] 深度學習流水線并行 PipeDream(6)--- 1F1B策略
[源碼解析] PyTorch 流水線并行實現 (1)--基礎知識
[源碼解析] PyTorch 流水線并行實現 (2)--如何劃分模型
[源碼解析] PyTorch 流水線并行實現 (3)--切分數據和運行時系統
本文圖片來自論文和github源碼。
0x01 論文
之前我們提到過,因為 GPipe 是基于 TensorFlow 的庫(當然了,這是Google的產品嘛),所以kakaobrain的一些工程師就用PyTorch 來實現了 GPipe,并且開源出來,這就是 torchgpipe,其地址為:https://github.com/kakaobrain/torchgpipe,用戶可以通過 pip install torchgpipe 進行安裝使用。
該作者團隊還發表了一篇論文,具體如下:https://arxiv.org/pdf/2004.09910.pdf。
接下來我們就圍繞這篇論文進行分析,本文不會全部翻譯這篇論文,而是選擇與實現密切相關的部分進行翻譯分析。
1.1 引論
并行訓練的一個障礙是:訓練神經網絡的常用優化技術本質上是順序的。這些算法反復執行如下操作:對于給定的小批次(mini-batch)數據,計算其針對損失函數的梯度,并且使用這些梯度來更新模型參數。
1.1.1 數據并行
在有大量計算資源的情況下,數據并行將小批量(mini-batch)劃分為微批量(micro-batch)并將每個微批量的計算委托給可用設備,以此來加速整體優化過程。通過仔細的超參數調整,數據并行可以有效地將訓練時間減少到一定規模的小批量所需的訓練時間,這可能取決于模型、優化算法和數據。
數據并行訓練的問題則是,每個設備擁有自己的模型網絡版本來執行子任務,并且在每次參數更新后必須同步模型網絡參數。當有許多參數需要同步時,這可能會導致沉重的通信負載。
但是,當模型太大以至于即使將單個機器無法容納模型,也無法計算梯度時,數據并行性不適用。
1.1.2 模型并行
模型并行性是一種訓練龐大模型的方法,它將模型分成若干部分,并將它們放在不同的設備上。每個設備只計算模型的一小部分,并且只更新該部分中的參數。然而,模型并行性受到其"無法充分利用"行為的影響。因為大多數神經網絡由一系列的層組成,持有模型后期部分的設備必須等待直到持有模型早期部分的設備計算結束。
一種可能的解決方案是使用梯度檢查點,它只存儲激活值的子集,并在需要時重新計算丟棄的激活值,從而節省內存。顯然,這需要對模型的某些部分進行兩次計算,并增加總體訓練時間。
在后續部分,我們將討論如何將前向和后向過程分解為子任務(在某些假設下),描述微批次管道并行的設備分配策略,并演示每個設備所需的執行順序。也會討論在PyTorch中實現管道并行最佳時間線的復雜之處,并解釋torchgpipe如何解決這些問題。
此外,我們放松了模型是按順序組合的假設,并提供了一種使用長跳躍連接表示模型的方法,以便在不放棄效率的情況下仍然應用管道并行性。
1.2 模型定義
假定我們有一個神經網絡,其由一系列子網絡構成。我們假定這些子網絡是 \(f^1,...,f^n\),其參數分別是 \(\theta^1,...,\theta^n\),則整個網絡是:

參數是 \(\theta = (\theta^1,...,\theta^n)\),為了清楚起見,我們稱 \(f^j\) 表示 f 的第 j 個分區,并假設分區的參數是相互不相交的。
在訓練網絡時,基于梯度的方法(如隨機梯度下降法)需要在給定小批量訓練數據 x 和相應損失之后,計算網絡的輸出結果f(x)。以及損失相對于網絡參數 \(\theta\) 的梯度g。這兩個階段分別稱為向前傳播和向后傳播。
既然 f 由其 L 層 子模塊 (\(f^L, f^{L-1},...f^1\)) 順序組成,那么前向傳播\(f(x)\) 可以通過如下方式計算:讓 \(x^0=x\)(就是輸入x),然后順序應用每一個 partition,即 \(x^j = f^j (x^{j-1})\),這里 $ j = 1, ..., L$。就是 \(f(x)\) 可以表示為 :
再進一步,令 x 由 m 個更小的批次 \(x_1,...,x_m\) 組成,這些更小的批次叫做微批次(micro-batches)。則\(f(x)\) 的計算可以進一步分割為小的 tasks \(F_{i,j}\),這里 \(x^0_i = x_i\),所以得到定義:
這里 $ i = 1,..,m$ 和 $ j = 1,...,n$,假定 f 不參與任何 intra-batch 的計算。
用同樣的方式,后向傳播也被分割為 task,\(B_{i,j}\),這里 \(dx^n_j\) 是損失對于 \(x_j^n\) 的梯度。

因此

是通過分區 \(f^j\) 來計算后向傳播(也叫vector-Jacobian product)的函數。
最終,我們依靠把 \(g_i^j\) 通過 i 來求和來得到損失針對 \(\theta ^ j\) 的梯度。
需要注意的是在tasks之間有數據依賴。比如 \(F_{i,j}\) 需要 \(x_i^{j-1}\),而\(x_i^{j-1}\) 只有在 \(F_{i,j-1}\) 計算完成之后才有效,因此,\(F_{i,j-1}\) 必須在 \(F_{i,j}\) 開始之前結束。同理, \(B_{i,j}\) 必須在 \(B_{i,j+1}\) 之前結束。
下圖就是一個依賴圖,這里 $ m = 4, n = 3$。即,模型被分成3個子網絡,小批次被分割成 4個微批次。

前面三個 F 是三個子網絡的前向傳播,后面三個 B 是三個子網絡的后向傳播。
下面表示第一個微批次,順序完成三個子網的前向傳播和后向傳播。

給定 task 的集合 \({F_{i,j}}\) 和 \({B_{i,j}}\) ,和一個可以并行工作的設備池,不同的并行化策略有自己分配任務給設備的規則。
一旦解決依賴關系,每個設備就會計算一個或多個分配的任務。在上面的設置中,tasks 的所有依賴項都具有相同微批次索引 i。因此,通過將具有不同微批量索引的任務分配給不同的設備,可以有效地并行化任務,這就是數據并行。
1.3 GPipe計算圖
管道并行的策略是根據分區索引 j 分配任務,以便第 j 個分區完全位于第 j 個設備中。除此之外,還強制要求 \(F_{i,j}\) 必須在 \(F_{i+1,j}\) 之前完成,和 \(B{i,j}\) 必須在執行\(B{i-1,j}\)之前完成。
除了微批量流水線之外,GPipe還通過對每個\(B{i,j}\) 使用梯度檢查點進一步降低了內存需求。因為第 \(jth\) 個設備每次只執行 \(B{i,j}\) ,所以當計算\(B{i,j}\) 時候,只需要拿到 \(F{i,j}\) 的激活圖。
因為恰恰在執行 \(B{i,j}\) 之前計算前向傳播\(F{i,j}\),所以我們內存消耗減少了m倍。此外,當設備等待\(B{i,j}\) 時,可以進行重新計算,這些信息如下圖所示:

其中虛線箭頭表示因為引入了微批次順序而帶來的獨立任務之間的執行順序。顏色表示不同的設備。
我們注意到最后一個微批次的重新計算,即 \(F^{'}_{m,j}\) ,這里 \(j=1,...,n\)是不必要的。
這是因為在第j臺設備上,前向傳遞中的最后一個任務是 \(F{m,j}\) 、 因此,在前向傳遞中放棄中間激活,并在后向傳遞開始時重新計算它們,不會減少內存,只會減慢管道速度。因此,圖中省略了 \(F^{'}_{m,j}\) 。
1.4 設備執行順序(Devicewise Execution Order)
總之,在流水線并行性(帶有檢查點)中,每個設備都被分配了一組具有指定順序的任務。一旦滿足跨設備依賴關系,每個設備將逐個執行給定的任務。然而,這個圖片中缺少一個組件——設備之間的數據傳輸。為了便于說明,設備 j 必須遵循的完整執行如圖所示順序。而且為了強調,數據傳輸操作被明確表示為“接收”和“發送”。

為方便起見,庫提供了子模塊 torchgpipe.balance 來計算得到分區,目的是讓兩兩分區(pairwise)之間的資源差別盡量小。資源占用情況是通過分析(profile)來計算。具體是使用了 [2] Imre B′ar′any and Victor S Grinberg. Block partitions of sequences. Israel Journal of Mathematics, 206(1):155–164, 之中的算法。
1.5 PyTorch 實現難點
我們最關心的是效率。為了使管道并行性按預期工作,必須以正確的順序將任務分配給每個設備。在Pytorch中實現這一點有幾個復雜之處。
-
首先,由于PyTorch的define by run風格及其eager execution的執行行為(與in construct-and-run 框架相反),核函數(kernel)被動態地發布到每個設備。
- 因此,必須仔細設計主機代碼(host code),這樣不僅可以在每個設備中以正確的順序發布綁定到設備的任務,而且還可以避免由于Python解釋器未能提前請求而延遲在設備上(與CPU異步)執行任務。
- 當某些任務是CPU密集型任務或涉及大量廉價kernel調用時,可能會發生這種延遲。作為一種解決方案,torchgpipe引入了確定性時鐘周期(deterministic clock-cycle),它給出了任務的總體順序。
-
其次,后向傳播的計算圖是在前向傳播過程中動態構造的。換句話說,“它避免了“正向圖”的具體化,只記錄微分計算所需的內容”。因為PyTorch既不記錄正向計算圖,也不維護一個梯度磁帶(gradient tape),PyTorch的自動微分(autograd)引擎僅對計算圖進行反向傳播。這意味著自動加載引擎可能不會完全按照與正向過程相反的執行順序運行,除非由圖的結構強制執行。為了解決這個問題,torchgpipe開發了一對名為“fork”和“join”的基本函數,在后向計算圖中動態創建顯式依賴關系。
-
第三,如果不小心管理,多個設備之間的通信可能導致雙向同步。這會導致利用率不足,因為即使在副本和隊列中的下一個任務之間沒有顯式依賴關系時,發送方也可能等待與接收方同步,反之亦然。torchgpipe通過使用非默認CUDA流避免了這個問題,這樣副本就不會阻止計算,除非計算必須等待數據。
-
最后,torchgpipe試圖放寬微批處理流水線并行性的限制(模型必須是順序的)。
- 盡管原則上任何神經網絡都可以以順序形式編寫,但這需要提前知道整個計算圖,而PyTorch中則不是這樣。特別是,如果有一個張量從設備 \(j^{'}\)中的一層跳到設備 \(j>j^{'}+1\)中的另一層,則該張量將被復制到中間的所有設備,因為torchgpipe無法提前知道它。為了避免這個問題,我們設計了一個接口來表示跳過了哪些中間張量以及哪些層使用了它們。
1.6 總結
我們總結一下目前核心難度,從而引入下面的工作。
- 原始流水線狀態如下:
- 管道并行的策略是根據分區索引 j 分配任務,以便第 j 個分區完全位于第 j 個設備中。
- 持有模型后期部分的設備必須等待,直到持有模型早期部分的設備計算結束。

-
目標流水線狀態如下:
![]()
-
目前問題:
- 如果分成若干個微批次,則需要強制要求 \(F_{i,j}\) 必須在 \(F_{i+1,j}\) 之前完成,以及 \(B{i,j}\) 必須在執行\(B{i-1,j}\) 之前完成。
- 后向傳播的計算圖是在前向傳播過程中動態構造的。PyTorch既不記錄正向計算圖,也不維護一個梯度磁帶(gradient tape),PyTorch的自動微分(autograd)引擎僅對計算圖進行反向傳播。這意味著自動加載引擎可能不會完全按照與正向過程相反的執行順序運行,除非由圖的結構強制執行。

-
目前難點:
- 如何在每個設備中以正確的順序發布那些綁定到設備的任務,以避免由于Python解釋器未能提前請求而延遲在設備上(與CPU異步)執行任務。
- 如何建立這些小批次之間的跨設備依賴關系。
-
實現方案:
- 如何保證正確執行順序?torchgpipe引入了確定性時鐘周期(deterministic clock-cycle),它給出了任務的總體順序。
- 如何保證計算圖中的動態顯式依賴關系?針對clock_cycles產生的每一個運行計劃:
- 利用 fence 函數調用“fork”和“join”,以此在向后計算圖中動態創建顯式后向傳播依賴關系。
- 利用 compute(schedule, skip_trackers, in_queues, out_queues) 進行計算。
本文就首先看看前向計算中,如何保證正確執行順序。
0x02 執行順序
下面我們看看確定性時鐘周期算法(Forward Dependency: Deterministic Clock-cycle)。這個排序就是專門在前向傳播中使用,前向傳播按照這個算法來進行逐一計算。
一般來說,前向傳播計算是按照模型結構來完成的,但是因為流水線并行是特殊的,模型已經被分割開了,所以 torch-gpipe 需要自己提供一個前向傳播執行序列以執行各個微批次。
2.1 論文內容
任務的總順序由前向傳播中的主機代碼決定。每個設備通過CPU分配的順序隱式地理解任務之間的依賴關系。理想情況下,如果可以無代價的將任務分配給設備,只要設備內的順序正確,CPU就可以按任何順序將任務分配給設備。然而,這種假設不夠現實,因為在GPU上啟動核函數對CPU來說不是免費的,GPU之間的內存傳輸可能需要同步,或者任務是CPU密集型的。因此,為了最小化來自CPU的延遲,我們通過"某節點到\(F_{1,1}\)的距離"對所有任務進行排序。

我們把這種方案命名為確定性時鐘周期(deterministic clock-cycle)算法。在該算法中,CPU在計數器 \(k=1\)到\(k=m+n-1\) 的時鐘周期內執行。在第k個時鐘周期內,對于 $ i +j-1 = k$ 這些index:
- 首先執行 task \(F{i,j}\) 所需數據的所有復制(copy)核函數。
- 然后將用于執行任務的計算核函數注冊到相應的設備(由于同一時鐘周期中的任務是獨立的,因此可以安全地進行多線程處理)。
2.2 解析

我們結合論文的圖片看看,即:
- clock 1 時候,運行圖上的 \(F_{1,1}\)
- clock 2 時候,運行圖上的 \(F_{2,1},F_{1,2}\)。就是向右運行一格到\(F_{1,2}\),同時第二個微批次進入訓練,即運行\(F_{2,1}\) 。
- clock 3 時候,運行圖上的 \(F_{3,1},F_{2,2},F_{1,3}\)。就是 \(F_{1,2}\)向右運行一格到\(F_{1,3}\), \(F_{2,1}\) 向右運行一格到 \(F_{2,3}\) ,同時第三個微批次進入訓練流程,即運行 \(F_{3,1}\) 。
- clock 4 時候,運行圖上的 \(F_{4,1},F_{3,2},F_{2,3}\)。就是 \(F_{2,2}\)向右運行一格到\(F_{2,3}\), \(F_{3,1}\) 向右運行一格到 \(F_{3,2}\) ,同時第四個微批次進入訓練流程,即運行 \(F_{4,1}\) 。
- 依次類推.....
對應到圖上,我們可以看到,
-
\(F_{2,1},F_{1,2}\) 到 \(F_{1,1}\) 的步進距離是1,走一步可到。
-
\(F_{3,1},F_{2,2},F_{1,3}\) 到 \(F_{1,1}\) 的步進距離是2,分別走兩步可到。
這個邏輯從下圖可以清晰看到。所以,這個clock的算法就是 利用任務到\(F_{1,1}\)的距離對所有任務進行排序。這個很像把一塊石頭投入水中,泛起的水波紋一樣,從落水點一層一層的從近處向遠處傳播。
這里顏色表示不同的設備。

2.3 代碼
我們再來看看代碼。首先是生成時鐘周期,這里:
- min(1+k, n) 就是在 k 時鐘時候,可以啟動的最大device數目(partition)。
- max(1+k-m, 0) 就是在 k 時鐘時候,可以啟動的最小微batch(micro-batch)。
所以最終返回的序列就是k 時鐘時候,可以啟動的(index of micro-batch,index of partition)序列。
def clock_cycles(m: int, n: int) -> Iterable[List[Tuple[int, int]]]:
"""Generates schedules for each clock cycle."""
# m: number of micro-batches
# n: number of partitions
# i: index of micro-batch
# j: index of partition
# k: clock number
#
# k (i,j) (i,j) (i,j)
# - ----- ----- -----
# 0 (0,0)
# 1 (1,0) (0,1)
# 2 (2,0) (1,1) (0,2)
# 3 (2,1) (1,2)
# 4 (2,2)
# 我們解析一下,這里 k 就是時鐘數,從1開始,最多時鐘序號就是 m+n-1。
# min(1+k, n) 就是在 k 時鐘時候,可以啟動的最大device數目
# max(1+k-m, 0) 就是在 k 時鐘時候,可以啟動的最小batch
for k in range(m+n-1):
yield [(k-j, j) for j in range(max(1+k-m, 0), min(1+k, n))]
設定 m = 4, n =3,solve(4,3) 的輸出是:
[(0, 0)]
[(1, 0), (0, 1)]
[(2, 0), (1, 1), (0, 2)]
[(3, 0), (2, 1), (1, 2)]
[(3, 1), (2, 2)]
[(3, 2)]
因為論文有一個示例圖,而這個圖和注釋&代碼不完全一致,為了更好的說明,我們就按照圖上來,因為圖片是從 \(F_{1,1}\)開始,所以我們把注釋修正以下:
# 0 (0,0) ----> clock 1 運行圖上的 (1,1)
# 1 (1,0) (0,1) ----> clock 2 運行圖上的 (2,1) (1,2)
# 2 (2,0) (1,1) (0,2) ----> clock 3 運行圖上的 (3,1) (2,2) (1,3)
# 3 (2,1) (1,2) ----> clock 4 運行圖上的 (3,2) (2,3)
# 4 (2,2) ----> clock 5 運行圖上的 (3,3)
我們把 solve代碼修改下,為了打印正確的index,這樣大家就可以更好的把代碼和圖片對應起來了。
m=4 # m: number of micro-batches
n=3 # n: number of partitions
for k in range(m + n - 1):
print( [(k - j + 1 , j +1 ) for j in range(max(1 + k - m, 0), min(1 + k, n))] )
打印是:
[(1, 1)] # 第 1 輪訓練計劃 & 數據
[(2, 1), (1, 2)] # 第 2 輪訓練計劃 & 數據
[(3, 1), (2, 2), (1, 3)] # 第 3 輪訓練計劃 & 數據
[(4, 1), (3, 2), (2, 3)] # 第 4 輪訓練計劃 & 數據
[(4, 2), (3, 3)] # 第 5 輪訓練計劃 & 數據
[(4, 3)] # 第 6 訓練計劃 & 數據
我們把流水線的圖再祭出來看看。

我們把上面的輸出按照流水線的圖繪制一下作為比對。
可以看到,前 4 個時鐘周期內,分別有 4 個 micro-batch 進入了 cuda:0,分別是(1,1) (2,1) (3,1) (4,1) 。然后按照 clock_cycles 算法給出的順序,每次迭代(時鐘周期)內執行不同的schedule,經過了 6 個時鐘周期之后,完成了第一輪 forward 操作。這就形成了流水線。
流水線優勢在于,如果 number of micro-batches 配置的合適,那么可以在每個時鐘周期內,最大程度的讓所有設備都運行起來。與之對比,原生流水線每一時間只能讓一個設備互活躍。
+ + + + + + +
| | | | | | |
| | | | | | |
cuda:0 | (1,1) | (2,1) | (3,1) | (4,1) | | |
| | | | | | |
| | | | | | |
| | | | | | |
| | | | | | |
cuda:1 | | (1,2) | (2,2) | (3,2) | (4,2) | |
| | | | | | |
| | | | | | |
| | | | | | |
| | | | | | |
cuda:2 | | | (1,3) | (2,3) | (3,3) | (4,3) |
| | | | | | |
| | | | | | |
| | | | | | |
| clock 1 | clock 2 | clock 3 | clock 4 | clock 5 | clock 6 |
+ + + + + + +
+------------------------------------------------------------------------------> Time
具體數據batch的走向是:
+ + + + + + +
| | | | | | |
cuda:0 | (1,1) | (2,1) | (3,1) | (4,1) | | |
| + | + | + | + | | |
| | | | | | | | | | |
| | | | | | | +----------+ | |
| | | | | +-----------+ | | | |
| | | +------------+ | | | | | |
| | | | | | | | | | |
| +------------+ | | | | | | | |
| | | | | | | | | | |
| | | | v | v | v | |
| | v | | | | |
cuda:1 | | (1,2) | (2,2) | (3,2) | (4,2) | |
| | + | + | + | + | |
| | | | | | | | | | |
| | | | | | | | +-------------+ |
| | | | | | +----------+ | | |
| | | | +------------+ | | | | |
| | +-----------+ | | | | | | |
| | | | | v | v | v |
| | | v | | | |
cuda:2 | | | (1,3) | (2,3) | (3,3) | (4,3) |
| | | | | | |
| | | | | | |
| | | | | | |
| clock 1 | clock 2 | clock 3 | clock 4 | clock 5 | clock 6 |
+ + + + + + +
+-----------------------------------------------------------------------------------> Time
2.4 使用
在 Pipeline 類之中,我們可以看到,就是按照時鐘周期來啟動計算,這樣在前向傳播之中,就按照這個序列,像水波紋一樣擴散。
def run(self) -> None:
"""Runs pipeline parallelism.
It modifies the given batches in place.
"""
batches = self.batches
partitions = self.partitions
devices = self.devices
skip_layout = self.skip_layout
m = len(batches)
n = len(partitions)
skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches]
with spawn_workers(devices) as (in_queues, out_queues):
for schedule in clock_cycles(m, n): # 這里使用,給出了執行序列計劃,后續按照這個來執行
self.fence(schedule, skip_trackers) # 構建后向傳播依賴關系
self.compute(schedule, skip_trackers, in_queues, out_queues) # 進行計算
至此,前向傳播過程分析完畢,下一篇我們分析依賴關系。
0xFF 參考
https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior
NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT
Accelerating Wide & Deep Recommender Inference on GPUs
HugeCTR: High-Performance Click-Through Rate Estimation Training
https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548
https://github.com/NVIDIA/apex/
https://github.com/justheuristic/prefetch_generator
https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html
https://pytorch.org/docs/stable/autograd.html
https://pytorch.org/docs/notes/cuda.html
https://zhuanlan.zhihu.com/p/61765561

浙公網安備 33010602011771號