<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] 深度學(xué)習(xí)流水線并行Gpipe(1)---流水線基本實(shí)現(xiàn)

      [源碼解析] 深度學(xué)習(xí)流水線并行Gpipe(1)---流水線基本實(shí)現(xiàn)

      目錄

      0x00 摘要

      GPipe是一個(gè)基于 Lingvo (Lingvo 是 Google 基于 TensorFlow 二次開發(fā)的重點(diǎn)針對序列模型的框架)開發(fā)的,支持超大規(guī)模模型的神經(jīng)網(wǎng)絡(luò)訓(xùn)練并行庫,本文介紹其基本功能和流水線機(jī)制。

      0x01 概述

      1.1 什么是GPipe

      GPipe是一個(gè)基于 Lingvo (Lingvo 是 Google 基于 TensorFlow 二次開發(fā)的重點(diǎn)針對序列模型的框架 https://github.com/tensorflow/lingvo)開發(fā)的,支持超大規(guī)模模型的神經(jīng)網(wǎng)絡(luò)訓(xùn)練并行庫,其特點(diǎn)如下:

      • GPipe 把一個(gè)L層的網(wǎng)絡(luò),切分成 K個(gè) composite layers。每個(gè)composite layer 運(yùn)行在單獨(dú)的TPU core上。
      • 這 K個(gè) core composite layers只能順序執(zhí)行,但是GPipe 引入了流水并行策略來緩解這個(gè)順序執(zhí)行的性能問題,把 mini-batch細(xì)分為多個(gè)更小的macro-batch,提高并行程度。
      • GPipe 還用recomputation這個(gè)簡單有效的技巧來降低內(nèi)存,進(jìn)一步允許訓(xùn)練更大的模型。

      1.2 挑戰(zhàn)

      深度學(xué)習(xí)框架本質(zhì)上是一個(gè)基于張量(Tensor)之間的計(jì)算(Operator)表達(dá)式所組成的計(jì)算圖(Graph)編譯執(zhí)行引擎,提供了一系列張量的定義、一元操作、二元操作等數(shù)學(xué)原語,并根據(jù)反向傳播算法(Back Propagation)進(jìn)行梯度自動(dòng)求導(dǎo)以及模型更新。在大量數(shù)據(jù)分批次流入計(jì)算圖進(jìn)行模型訓(xùn)練之后,使得模型學(xué)習(xí)到數(shù)據(jù)中的內(nèi)在關(guān)聯(lián)關(guān)系,從而獲得對應(yīng)場景中的“智能”感知與判斷能力。

      DNN訓(xùn)練的目標(biāo)是在盡可能短的時(shí)間內(nèi)獲得一個(gè)高精度的模型。這一目標(biāo)可以通過兩個(gè)指標(biāo)來實(shí)現(xiàn):

      • 統(tǒng)計(jì)效率,達(dá)到預(yù)期準(zhǔn)確度所需的歷元數(shù);
      • 硬件效率,完成單個(gè)歷元所需的時(shí)間。達(dá)到期望精度水平的總訓(xùn)練時(shí)間只是這兩個(gè)指標(biāo)的乘積;

      GPU最主要提供的是兩種資源:計(jì)算資源顯存帶寬資源。所以訓(xùn)練大型模型有兩個(gè)基本挑戰(zhàn):顯存效率計(jì)算效率

      深度學(xué)習(xí)框架性能優(yōu)化的最終目標(biāo)是深度學(xué)習(xí)模型訓(xùn)練最快,從而使得完成訓(xùn)練的時(shí)間最短,節(jié)省模型訓(xùn)練開發(fā)周期和用戶的時(shí)間成本。

      0x02 并行機(jī)制

      業(yè)界采用一些并行機(jī)制達(dá)到優(yōu)化的目的。

      2.1 機(jī)制分類與權(quán)衡

      本節(jié)以下主要參考如下文章:

      Efficient Large-Scale Language Model Training on GPU Clusters

      DeepSpeed: Extreme-scale model training for everyone

      [譯] DeepSpeed:所有人都能用的超大規(guī)模模型訓(xùn)練工具

      PipeDream: Fast and Efficient Pipeline Parallel DNN Training

      在 "Efficient Large-Scale Language Model Training on GPU Clusters" 論文中, NVIDIA 介紹了分布式訓(xùn)練超大規(guī)模模型的三種必須的并行技術(shù):

      • 數(shù)據(jù)并行(Data Parallelism)
      • 模型并行(Tensor Model Parallelism)
      • 流水并行(Pipeline Model Parallelism)

      2.1.1 數(shù)據(jù)并行

      數(shù)據(jù)并行(Data Parallelism)是最常見的方法。其特點(diǎn)如下:

      • 模型在多個(gè)worker機(jī)器上復(fù)制,每個(gè)GPU都維護(hù)模型的完整副本。
      • 輸入數(shù)據(jù)集可以跨多個(gè)gpu進(jìn)行分區(qū)。每批輸入的訓(xùn)練數(shù)據(jù)都在數(shù)據(jù)并行的 worker 之間劃分。每個(gè)worker處理訓(xùn)練數(shù)據(jù)的一個(gè)子集。
      • 使用集合通信原語或參數(shù)服務(wù)器定期與其他GPU同步權(quán)重。
      • 反向傳播后需要通信并規(guī)約梯度,以保證優(yōu)化器在各個(gè) worker 上進(jìn)行相同的更新。即,對單個(gè)worker計(jì)算的權(quán)重更新進(jìn)行聚合,以獲得反映所有輸入更新的最終權(quán)重更新。
      • 每個(gè)聚合傳遞的數(shù)據(jù)量與模型的大小成比例。

      數(shù)據(jù)并行性具有幾個(gè)明顯的優(yōu)勢,包括計(jì)算效率高和實(shí)現(xiàn)起來工作量小,這使得數(shù)據(jù)并行訓(xùn)練在一些流行的具有高計(jì)算通信比的模型上運(yùn)行良好,但有幾個(gè)重要的趨勢威脅著它的有效性:

      • 顯存效率:數(shù)據(jù)并行會(huì)在所有 worker 之間進(jìn)行模型和優(yōu)化器的復(fù)制,因此顯存效率不高。
      • 計(jì)算效率:隨著我們提高并行度,每個(gè) worker 執(zhí)行的計(jì)算量是恒定的。數(shù)據(jù)并行可以在小規(guī)模上實(shí)現(xiàn)近乎線性擴(kuò)展。但是,在 worker 之間規(guī)約梯度的通信開銷跟模型大小成正相關(guān),所以當(dāng)模型很大或通信帶寬很低時(shí),計(jì)算效率會(huì)受限。而GPU計(jì)算能力的快速增長進(jìn)一步將訓(xùn)練的瓶頸轉(zhuǎn)移到跨模型的通信上。另外,參數(shù)同步的頻率影響統(tǒng)計(jì)和硬件效率。
      • 伸縮性:不斷增長的模型大小增加了每次聚合的通信量。事實(shí)上,一些廣泛使用的模型足夠大,使得通信超過了計(jì)算時(shí)間,限制了伸縮性并支配了總的訓(xùn)練時(shí)間。而且,數(shù)據(jù)并行的 batch 大小隨 worker 數(shù)量提高,而我們往往無法在不影響收斂性的情況下一直增加 batch 大小。

      2.1.2 模型并行

      模型并行在傳統(tǒng)上用于訓(xùn)練過程中太大而無法保存在工作者內(nèi)存或緩存中的模型。其特點(diǎn)如下:

      • 模型并行涉及到將模型在worker之間進(jìn)行劃分,以便每個(gè)worker僅對模型參數(shù)的一個(gè)子集進(jìn)行評估和更新。這樣就可以分為層間并行和層內(nèi)模型并行。
      • 層間模型并行會(huì)在多個(gè) worker 之間劃分模型的各個(gè)層。
      • 層內(nèi)模型并行把每層的模型參數(shù)切分到多個(gè)設(shè)備。層內(nèi)模型并行在有的論文里被叫做 "Tensor 級別的模型并行" ,是對某一層(如 Linear/Dense Layer 里的 Variable )的模型 Tensor 切分,從而將大的模型 Tensor 分成多個(gè)相對較小的 Tensor 進(jìn)行并行計(jì)算;
      • 層間值(激活和梯度)往往是需要跨機(jī)器通信的唯一參數(shù)。

      就其本質(zhì)而言,模型并行性的計(jì)算和通信因模型結(jié)構(gòu)而異,因此在實(shí)現(xiàn)上有很大的工作量。

      然而,即使模型并行能夠訓(xùn)練非常大的模型,傳統(tǒng)的模型并行也會(huì)導(dǎo)致計(jì)算資源的嚴(yán)重利用率不足,因?yàn)樗淮沃恢鲃?dòng)使用一個(gè)worker(如果每個(gè)層被分配給一個(gè)worker),或者不能重疊計(jì)算和通信(如果每個(gè)層被分區(qū))。

      • 顯存效率:模型并行DNN訓(xùn)練導(dǎo)致GPU資源的嚴(yán)重利用不足。模型并行通過在模型并行 worker 之間劃分激活顯存,會(huì)根據(jù) worker 數(shù)量成比例地減少顯存使用量。至關(guān)重要的是,這是減少單個(gè)網(wǎng)絡(luò)層的激活顯存的唯一方法。
      • 計(jì)算效率:由于每次前向和反向傳播中都需要額外通信激活值,模型并行的計(jì)算效率很低。模型并行需要高通信帶寬,并且不能很好地?cái)U(kuò)展到通信帶寬受限的節(jié)點(diǎn)。此外,每個(gè)模型并行worker 都會(huì)減少每個(gè)通信階段之間執(zhí)行的計(jì)算量,從而影響計(jì)算效率。模型并行性通常與數(shù)據(jù)并行性結(jié)合使用,以在內(nèi)存和計(jì)算效率之間進(jìn)行權(quán)衡。
      • 開發(fā)效率:跨多個(gè)GPU劃分模型的負(fù)擔(dān)留給了程序員,即使對于最有經(jīng)驗(yàn)的機(jī)器學(xué)習(xí)實(shí)踐者來說,確定如何在工作者中最好地劃分DNN模型也是一項(xiàng)具有挑戰(zhàn)性的任務(wù),這往往會(huì)導(dǎo)致額外的效率低下。最近有些工作探索了如何使用增強(qiáng)學(xué)習(xí)來自動(dòng)確定模型并行性的設(shè)備位置。不幸的是,這樣的在線決策技術(shù)是時(shí)間和資源密集型的;它們也不能無縫地結(jié)合流水線、數(shù)據(jù)并行和模型并行。

      2.1.3 流水線并行

      流水并行(Pipeline Model Parallelism)在有的論文里叫做流水線級別的模型并行,其特點(diǎn)是:

      • 將整個(gè)網(wǎng)絡(luò)分段(stage),不同段在不同的設(shè)備上,前后階段流水分批工作,通過一種“接力”的方式并行。
      • 流水線并行將模型的各層劃分為可以并行處理的階段。當(dāng)一個(gè)階段完成一個(gè) micro-batch 的正向傳遞時(shí),激活內(nèi)存將被通信至流水線的下一個(gè)階段。類似地,當(dāng)下一階段完成反向傳播時(shí),將通過管道反向通信梯度。必須同時(shí)計(jì)算多個(gè) micro-batch 以確保流水線的各個(gè)階段能并行計(jì)算。
      • 流水線并行訓(xùn)練有可能在數(shù)據(jù)并行性困難時(shí)提供較高的DNN訓(xùn)練性能。特別是,工作人員之間的通信可以限制在分配給不同工作人員的相鄰層之間的激活(在前向通道上)和梯度(后向)上。

      但是流水線并行依然有一些問題:

      • 顯存效率:流水線并行減少的顯存與流水線的階段數(shù)成正比,使模型的大小可以隨 worker 的數(shù)量線性擴(kuò)展。但是,流水線并行不會(huì)減少每一層的激活函數(shù)的顯存占用量。此外,每個(gè) worker 必須存儲(chǔ)同時(shí)運(yùn)行的各個(gè) micro-batch 的激活值。這導(dǎo)致流水線第一階段的激活內(nèi)存與單個(gè) mirco batch 的總激活內(nèi)存大致相同。
      • 計(jì)算效率:流水線并行具有最低的通信量,因?yàn)樗耐ㄐ帕恐缓驮诟麟A段邊界的各層的激活值大小成正比。但是,它不能無限擴(kuò)展。像模型并行一樣,增加流水線大小會(huì)減少每個(gè)流水線階段的計(jì)算量,這會(huì)降低計(jì)算與通信的比率。如果要實(shí)現(xiàn)好的計(jì)算效率,流水線并行還要求其每個(gè)階段的計(jì)算負(fù)載完美的均衡。此外,流水線并行性會(huì)在每個(gè) batch 的開始和結(jié)束時(shí)因?yàn)樾枰匦绿畛浠蚺趴樟魉€而產(chǎn)生 bubble overhead。
      • 開發(fā)效率:DNN的雙向性(正向傳遞后反向傳遞相同的層)使得流水線具有挑戰(zhàn)性,更重要的是,一個(gè)簡單的流水線機(jī)制引入了過時(shí)權(quán)重的最新計(jì)算,導(dǎo)致最終模型的精度低于數(shù)據(jù)并行訓(xùn)練。

      2.2 如何使用

      給定一個(gè)特定的神經(jīng)網(wǎng)絡(luò)模型和一批計(jì)算資源,從任務(wù)到設(shè)備之間的映射有多種方式,但不同的映射方案運(yùn)行效率不同。哪種方案最優(yōu)既取決于作業(yè)本身的特性,也取決于底層硬件的拓?fù)洹?/p>

      神經(jīng)網(wǎng)絡(luò)由很多局部計(jì)算搭建組成,一般來說,同一個(gè)神經(jīng)網(wǎng)絡(luò)的不同算子可能適合不同的并行模式。一個(gè)局部計(jì)算是采用數(shù)據(jù)并行,還是模型并行取決于這個(gè)局部任務(wù)的計(jì)算傳輸比。比如:

      • 某些算子(比如卷積) 運(yùn)算參數(shù)量很小,但中間結(jié)果量大,為了較少傳輸量,所以最劃算的方法是數(shù)據(jù)并行。即對數(shù)據(jù)進(jìn)行切分,不同的設(shè)備處理不同的數(shù)據(jù),在設(shè)備之間偶爾進(jìn)行參數(shù)同步;
      • 某些算子,中間計(jì)算結(jié)果相對于參數(shù)量更少,就適合模型并行。
      • 還有一些算子,網(wǎng)絡(luò)參數(shù)量/中間計(jì)算結(jié)果都很大,可能采用流水并行(也就是接力的形式)是最優(yōu)的。
      • 相比于一個(gè)算子只使用單一的并行模式,一個(gè)算子也可以同時(shí)使用多樣的并行模式可能進(jìn)一步地減少傳輸量,譬如在隱藏層比較大的地方,就可能同時(shí)對數(shù)據(jù)矩陣切割以及對模型矩陣切割。

      因此,對于每一個(gè)任務(wù)選擇最優(yōu)的并行模式是一個(gè)非常復(fù)雜的問題,需要具體情況具體分析。

      0x03 Pytorch 手動(dòng)指定并行方式

      目前已有的深度學(xué)習(xí)框架,大多數(shù)提供了對數(shù)據(jù)并行的原生支持,但是對模型并行支持的還不完善。如果用戶想要將模型參數(shù)分配到不同設(shè)備上,往往會(huì)遇到需要人工指定模型切分方式、手工編寫數(shù)據(jù)通信邏輯代碼等問題。

      我們就看看 Pytorch 如何手動(dòng)指定,主要摘錄(翻譯):

      https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

      3.1 基礎(chǔ)知識(shí)

      PyTorch以Tensor為基本單元,更符合算法工程師寫Python腳本的直覺,以面向?qū)ο蟮姆绞竭M(jìn)行模型搭建和訓(xùn)練。對Tensor進(jìn)行賦值、切片,就像numpy一樣易用。

      PyTorch 是單卡視角,一個(gè)設(shè)備上的 Tensor、模型腳本跟另一個(gè)設(shè)備上的 Tensor、模型腳本并無直接關(guān)系,對于每個(gè)設(shè)備上的模型腳本都完全對稱的(Mirror)最簡單的數(shù)據(jù)并行來說,PyTorch 這樣的設(shè)計(jì)沒有什么明顯的缺陷。每個(gè)設(shè)備上的腳本運(yùn)行到相同 batch 的模型更新部分(Optimizer),統(tǒng)一做一次模型同步(AllReduce 操作)就完成了數(shù)據(jù)并行,這就是 PyTorch 的 DDP(DistributedDataParallel)模塊。

      但在分布式情況下想把一個(gè)Tensor切分到不同機(jī)器上,需要手動(dòng)構(gòu)建傳輸過程,相當(dāng)于直接對物理編程,所以對分布式使用的門檻更高。

      3.2 特點(diǎn)

      PyTorch 模型并行將單個(gè)模型拆分到不同的GPU上,而不是在每個(gè)GPU上復(fù)制整個(gè)模型(具體來說,假設(shè)模型 m包含10層。如果使用 DataParallel,則每個(gè)GPU都具有這10層中每個(gè)層的副本,而如果在兩個(gè)GPU上使用模型并行時(shí),每個(gè)GPU可以托管5層)。

      模型并行的高級思想是將模型的不同子網(wǎng)絡(luò)放置在不同的設(shè)備上,并相應(yīng)地實(shí)現(xiàn)該forward方法以跨設(shè)備移動(dòng)中間輸出。由于模型的一部分只在任何單個(gè)設(shè)備上運(yùn)行,因此一組設(shè)備可以共同服務(wù)于一個(gè)更大的模型。

      3.3 基本用法

      讓我們從包含兩個(gè)線性層的玩具模型(toy model)開始。要在兩個(gè)GPU上運(yùn)行此模型,只需將每個(gè)線性層放在不同的GPU上,然后移動(dòng)輸入(input)中間輸出(intermediate outputs)以匹配層設(shè)備(layer devices)

      import torch
      import torch.nn as nn
      import torch.optim as optim
      
      class ToyModel(nn.Module):
        def __init__(self):
          super(ToyModel, self).__init__()
          self.net1 = torch.nn.Linear(10, 10).to('cuda:0')  # 將net1放置在第1個(gè)GPU上
          self.relu = torch.nn.ReLU()
          self.net2 = torch.nn.Linear(10, 5).to('cuda:1')   # 將net2放置在第2個(gè)GPU上
      
        def forward(self, x):
          x = self.relu(self.net1(x.to('cuda:0')))
          return self.net2(x.to('cuda:1'))
      

      請注意對于 ToyModel ,除了五個(gè)用于將線性層(linear layers)和張量(tensors)放置在適當(dāng)?shù)脑O(shè)備上的to(device)調(diào)用之外,以上內(nèi)容與在單個(gè)GPU上實(shí)現(xiàn)該功能非常相似。這是模型中唯一需要更改地方(即to(device) )。 backward()torch.optim 會(huì)自動(dòng)關(guān)注梯度(gradients),就好像模型是一個(gè)GPU一樣。調(diào)用損失函數(shù)時(shí),只需確保標(biāo)簽(label)與輸出(output)在同一設(shè)備(on the same device)上。

      model = ToyModel()
      loss_fn = nn.MSELoss()
      optimizer = optim.SGD(model.paraeters(), lr=0.001)
      
      optimizer.zero_grad()
      outputs = model(torch.randn(20, 10))
      labels = torch.randn(20, 5).to('cuda:1') # ToyMode 的 output 是在 'cuda:1' 上,此處的 label 也應(yīng)該置于 'cuda:1' 上
      loss_fn(outputs,labels).backward()
      optimizer.step()
      

      3.4 將模型并行化應(yīng)用于現(xiàn)有模塊

      只需更改幾行,就可以在多個(gè)GPU上運(yùn)行現(xiàn)有的單GPU模塊。以下代碼顯示了如何分解 torchvision.models.reset50() 為兩個(gè)GPU。思想是從現(xiàn)有 ResNet模塊繼承,并在構(gòu)建過程中將層拆分為兩個(gè)GPU。然后,覆蓋 forward方法來縫合兩個(gè)子網(wǎng),通過相應(yīng)地移動(dòng)中間輸出。

      from torchvision.models.resnet import ResNet, Bottleneck
      
      num_classes = 1000
      
      class ModelParallelResNet50(ResNet):
          def __init__(self, *args, **kwargs):
              super(ModelParallelResNet50, self).__init__(
                  Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)
      
              self.seq1 = nn.Sequential(
                  self.conv1,
                  self.bn1,
                  self.relu,
                  self.maxpool,
      
                  self.layer1,
                  self.layer2
              ).to('cuda:0')  # 放置在第1個(gè)GPU上
      
              self.seq2 = nn.Sequential(
                  self.layer3,
                  self.layer4,
                  self.avgpool,
              ).to('cuda:1')  # 放置在第2個(gè)GPU上
      
              self.fc.to('cuda:1')
      
          def forward(self, x):
              x = self.seq2(self.seq1(x).to('cuda:1'))
              return self.fc(x.view(x.size(0), -1))
      

      對于模型太大而無法放入單個(gè)GPU的情況,上述實(shí)現(xiàn)解決了該問題。但是,你可能已經(jīng)注意到,如果模型合適,它(model parallel)將比在單個(gè)GPU上運(yùn)行要慢。這是因?yàn)樵谌魏螘r(shí)間點(diǎn),兩個(gè)GPU中只有一個(gè)在工作,而另一個(gè)在那兒什么也沒做。在 layer2layer3之間,中間輸出需要從 cuda:0 復(fù)制到 cuda:1,這使得性能進(jìn)一步惡化。

      在整個(gè)執(zhí)行過程中,兩個(gè)GPU中的一個(gè)會(huì)處于空閑狀態(tài)。為了解決這個(gè)問題,有一種選擇是將每個(gè)批次進(jìn)一步劃分為拆分流水線,以便當(dāng)一個(gè)拆分到達(dá)第二子網(wǎng)時(shí),可以將下一個(gè)拆分饋入第一子網(wǎng)。這樣,兩個(gè)連續(xù)的拆分可以在兩個(gè)GPU上同時(shí)運(yùn)行。

      3.5 通過流水線輸入(Pipelining Inputs)加速

      在以下實(shí)驗(yàn)中,我們將每批次 120-image 進(jìn)一步劃分為 20-image 。當(dāng)PyTorch異步啟動(dòng)CUDA操作時(shí),該實(shí)現(xiàn)無需生成多個(gè)線程即可實(shí)現(xiàn)并發(fā)。

      class PipelineParallelResNet50(ModelParallelResNet50):
          def __init__(self, split_size=20, *args, **kwargs):
              super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
              self.split_size = split_size
      
          def forward(self, x):
              splits = iter(x.split(self.split_size, dim=0))
              s_next = next(splits)
              s_prev = self.seq1(s_next).to('cuda:1')
              ret = []
      
              for s_next in splits:
                  # A. s_prev runs on cuda:1
                  s_prev = self.seq2(s_prev)
                  ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
      
                  # B. s_next runs on cuda:0, which can run concurrently with A
                  s_prev = self.seq1(s_next).to('cuda:1')
      
              s_prev = self.seq2(s_prev)
              ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
      
              return torch.cat(ret)
      
      
      setup = "model = PipelineParallelResNet50()"
      pp_run_times = timeit.repeat(
          stmt, setup, number=1, repeat=num_repeat, globals=globals())
      pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
      
      plot([mp_mean, rn_mean, pp_mean],
           [mp_std, rn_std, pp_std],
           ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
           'mp_vs_rn_vs_pp.png')
      

      請注意,設(shè)備到設(shè)備的張量復(fù)制操作在源設(shè)備和目標(biāo)設(shè)備上的當(dāng)前流(current streams)上同步。如果創(chuàng)建多個(gè)流,則必須確保復(fù)制操作正確同步。在完成復(fù)制操作之前寫入源張量或讀取/寫入目標(biāo)張量可能導(dǎo)致不確定的行為。上面的實(shí)現(xiàn)僅在源設(shè)備和目標(biāo)設(shè)備上都使用默認(rèn)流,因此沒有必要強(qiáng)制執(zhí)行其他同步。

      0x04 關(guān)鍵技術(shù)

      因?yàn)槊總€(gè)模型的并行策略候選集合是指數(shù)級的,純手工從中挑出一種合適的并行策略,需要耗費(fèi)算法工程師大量的時(shí)間以及計(jì)算資源,而且算法工程師需要考慮的相關(guān)事宜太多,比如:如何分配內(nèi)存,層之間如何交互,如何減少通信代價(jià),分割的張量不能破壞原有數(shù)學(xué)模型,如何確定確定張量shape,如何確定輸入輸出等等。

      所以自動(dòng)并行技術(shù)(如何從框架層次自動(dòng)解決并行策略選擇問題)成為一個(gè)研究熱點(diǎn)

      自動(dòng)并行通過建立代價(jià)模型來預(yù)測并挑選一個(gè)較優(yōu)的并行策略(暫時(shí)無法保證是最優(yōu)的策略,因?yàn)樘舫鲎顑?yōu)的策略是個(gè)NP-Hard的問題),有希望將算法工程師從并行策略的選擇和配置中解放出來。

      因此,目前分布式模型訓(xùn)練有幾個(gè)必要并行技術(shù):

      • 流水并行,尤其是如何自動(dòng)設(shè)定流水;
      • 梯度累加;
      • 后向重計(jì)算;
      • 1F1B 策略(我們將采用PipeDream分析);

      下面我們結(jié)合 Gpipe代碼看看這些技術(shù)如何使用。

      0x05 基礎(chǔ)知識(shí) & 支撐系統(tǒng)

      5.1 Lingvo框架

      5.1.1 核心組件

      Lingvo 的核心組件如下:

      • Models : 一個(gè)model是一個(gè)抽象collection,包含一個(gè)或者多個(gè)tasks。Model相當(dāng)于對于Task的一層wrapper。對于multi-tasks模型,Model將控制哪些variable將在Task之間共享,以及訓(xùn)練時(shí)每個(gè)task如何采樣。
      • Tasks :一個(gè)task就是關(guān)于一個(gè)完整優(yōu)化問題的描述,比如圖片分類或者語音識(shí)別。包含input generator。
      • Layers :一個(gè)Layer代表一個(gè)包含有可訓(xùn)練參數(shù)的隨機(jī)函數(shù)。一個(gè)Layer可以包含其他的Layer作為孩子。Softmax, LSTM, Attension甚至一個(gè)task都是Layer的例子。
      • Params :此對象包含了模型的超參數(shù)。Layers,Tasks以及models都是通過Params中的specifications來構(gòu)建的。 Params是層級的,一個(gè)對象的params中可以包含其child對象的params。
      • NestedMap :一個(gè)dictionary結(jié)構(gòu),用于傳遞數(shù)據(jù)。代碼中大部分代碼中的Python對象要么是Tensor的實(shí)例,要么就是BaseLayer或者NestedMap的一個(gè)子類。

      5.1.2 模型定義

      在Lingvo中,網(wǎng)絡(luò)是一個(gè)層的嵌套結(jié)構(gòu)。Lingvo中的大多數(shù)類都是[Lingvo/core/base_layer.py] BaseLayer 的子類。

      • Params :用來配置類,定義了配置所需要的keys,這些keys在對象創(chuàng)建時(shí)都應(yīng)該被定義。Params對象還可以包含用于配置子層的Params對象。每個(gè)layer類都會(huì)有一個(gè)params的classmethod,這個(gè)方法將會(huì)創(chuàng)建一個(gè)新的params對象,并且通過定義的keys來配置這個(gè)layer,同時(shí)為這些keys給出一些合理的默認(rèn)值。

        Params對象中屬性包括:

        • cls: tParams對象關(guān)聯(lián)的python類。這可以用來構(gòu)造類的實(shí)例;
        • name: 該層的名稱;
        • dtype: 創(chuàng)建變量時(shí)使用的默認(rèn)數(shù)據(jù)類型。
      • __init__ constructor :所有子層和變量都應(yīng)該在這里創(chuàng)建。

      • CreateVariable :創(chuàng)建變量的方法。每個(gè)Layer負(fù)責(zé)創(chuàng)建和管理它自己的variable。

      • CreateChild :創(chuàng)建子層的方法。

      • FProp : 所有的layers都有一個(gè)FProp() 函數(shù), 實(shí)現(xiàn)該層的前向傳播,在計(jì)算的前向step時(shí)將會(huì)被調(diào)用。 因?yàn)榭梢栽诜植际接?xùn)練時(shí)在不同的設(shè)備上執(zhí)行,出于性能的考慮,Lingvo通過theta參數(shù)訪問variable,而不是通過self.vars或者self.theta。

      • FPropMeta : 返回該層關(guān)于FProp計(jì)算的元數(shù)據(jù)。其中 meta.flops在得到一個(gè)輸入張量時(shí)給出估計(jì)的floating point operations數(shù)目。

      對于模型算法的落地,有兩個(gè)指標(biāo)特別重要:

      • 前向傳播時(shí)所需的計(jì)算力,它反應(yīng)了對硬件如GPU性能要求的高低。
      • 參數(shù)個(gè)數(shù),它反應(yīng)所占內(nèi)存大小。

      接下來,我們需要看看如何計(jì)算模型訓(xùn)練的內(nèi)存大小,以及如何計(jì)算算力(后續(xù)流水線并行需要)。

      5.2 計(jì)算內(nèi)存

      5.2.1 總體分析

      我們主要參考了 ZeRO: Memory Optimization Towards Training A Trillion Parameter Models 這篇論文的思路。

      在模型訓(xùn)練期間,大部分內(nèi)存被以下三種情況之一消耗:

      • i)激活。
      • ii)OGP狀態(tài),即由優(yōu)化器狀態(tài)、參數(shù)梯度和參數(shù)本身組成的張量。
      • iii)臨時(shí)緩沖區(qū)。

      輸入數(shù)據(jù)所占用的顯存其實(shí)并不大,這是因?yàn)槲覀兺捎玫鞯姆绞阶x取數(shù)據(jù),這意味著我們其實(shí)并不是一次性的將所有數(shù)據(jù)讀入顯存,而這保證每次輸入所占用的顯存與整個(gè)網(wǎng)絡(luò)參數(shù)來比是微不足道的。

      我們逐一分析。

      5.2.2 激活函數(shù)

      對于激活函數(shù),有如下特點(diǎn):

      • 激活函數(shù)額外消耗的顯存隨 batch 大小而增加,batch 設(shè)置為1的情況下,訓(xùn)練萬億參數(shù)模型就會(huì)產(chǎn)生超過 1 TB 的激活函數(shù)用的顯存。
      • 業(yè)界已有方案如論文 Training deep nets with sublinear memory cost ,可以以33%的重新計(jì)算開銷為代價(jià),幾乎可以消除激活所需的所有內(nèi)存。這個(gè)技巧叫做 gradient checkpointing, 有時(shí)還叫做 rematerialization, re-forward。

      5.2.3 OGP狀態(tài)

      5.2.3.1 模型自身參數(shù)

      模型自身的參數(shù)指的就是各個(gè)網(wǎng)絡(luò)層的 Weight 和Bias,這部分顯存在模型加載完成之后就會(huì)被占用。另外需要注意到的是,有些層是有參數(shù)的,如CNN, RNN; 而有些層是無參數(shù)的, 如激活層, 池化層等。

      5.2.3.2 優(yōu)化器參數(shù)

      優(yōu)化器參數(shù)指的是模型在優(yōu)化過程即反向傳播中所產(chǎn)生的參數(shù), 這部分參數(shù)主要指的就是 dw, 即梯度,在SGD中, 其大小與參數(shù)一樣, 因此在優(yōu)化期間, 模型參數(shù)所占用的顯存會(huì)翻倍。

      值得注意的是,不同的優(yōu)化器其所需保存的優(yōu)化參數(shù)不同, 對于 Adam, 由于其還需要保存其余參數(shù), 模型的參數(shù)量會(huì)在優(yōu)化區(qū)間翻 4 倍。

      5.2.3.3 樣例

      對于OGP狀態(tài),讓我們以ADAM為例。使用ADAM對具有Ψ個(gè)參數(shù)的模型進(jìn)行混合精度訓(xùn)練。

      • 模型參數(shù):參數(shù)的fp16副本,內(nèi)存要求2Ψ字節(jié)。
      • 梯度 :梯度的fp16副本,內(nèi)存要求2Ψ字節(jié)。
      • 優(yōu)化器狀態(tài):參數(shù)、Momentum和Variance的fp32副本,內(nèi)存需求分別為4Ψ、4Ψ和4Ψ字節(jié)。讓我們用K來表示優(yōu)化器狀態(tài)的內(nèi)存乘數(shù),也就是說,存儲(chǔ)它們所需的額外內(nèi)存是KΨ字節(jié)。

      總的來說,OGP狀態(tài)有2Ψ+2Ψ+KΨ=16Ψ字節(jié)(混合精度ADAM的K=12)的內(nèi)存需求。

      具體如下:

      藍(lán)色是參數(shù),橙色是梯度,綠色是優(yōu)化器狀態(tài)。

      在內(nèi)存消耗公式中,Ψ表示模型大小(參數(shù)個(gè)數(shù)),K表示優(yōu)化器狀態(tài)的內(nèi)存乘數(shù),Nd表示數(shù)據(jù)并行度。在這個(gè)例子中,我們假設(shè)模型大小為Ψ=75億,基于Adam優(yōu)化器的混合精度訓(xùn)練,數(shù)據(jù)并行度為Nd=64(即64個(gè)GPU),K=12。

      對于一個(gè)擁有15億個(gè)參數(shù)的GPT-2這樣的模型,這導(dǎo)致了至少24gb的內(nèi)存需求,遠(yuǎn)遠(yuǎn)高于單獨(dú)保存fp16參數(shù)所需的3gb內(nèi)存。

      5.2.4 臨時(shí)緩沖區(qū)

      臨時(shí)緩沖區(qū)是用于存儲(chǔ)臨時(shí)結(jié)果的緩沖區(qū),例如,對于參數(shù)為15億的GPT-2模型, fp32緩沖區(qū)將需要6GB的內(nèi)存。

      5.3 計(jì)算算力

      5.3.1 背景知識(shí)

      • FLOPS:注意全大寫,是floating point operations per second的縮寫,意指每秒浮點(diǎn)運(yùn)算次數(shù),理解為計(jì)算速度。是一個(gè)衡量硬件性能的指標(biāo)。
      • FLOPs:注意s小寫,是floating point operations的縮寫(s表復(fù)數(shù)),意指浮點(diǎn)運(yùn)算數(shù),理解為計(jì)算量。可以用來衡量算法/模型的復(fù)雜度。

      前向傳播時(shí)所需的計(jì)算力就是由FLOPs體現(xiàn),那么FLOPs該怎么計(jì)算呢?

      我們知道,在一個(gè)模型進(jìn)行前向傳播的時(shí)候,會(huì)進(jìn)行卷積、池化、BatchNorm、Relu、Upsample等操作。這些操作的進(jìn)行都會(huì)有其對應(yīng)的計(jì)算力消耗產(chǎn)生,其中,卷積所對應(yīng)的計(jì)算力消耗是所占比重最高的。所以,我們以卷積操作為例,看看卷積所對應(yīng)的計(jì)算力。

      推導(dǎo)過程:卷積層 wx + b 需要計(jì)算兩部分,首先考慮前半部分 wx 的計(jì)算量:

      令 :

      • k 表示卷積核大小;
      • c 表示輸入 feature map 的數(shù)量;

      則對于輸出 feature map 上的單個(gè) Unit 有:

      k * k * c 次乘法,以及 k * k * c - 1 次加法

      如果輸出 feature map 的分辨率是 H * W ,且輸出 o 個(gè) feature map,則輸出 feature map 包含 Unit的總數(shù)就是 H * W * o。

      因此,該卷積層在計(jì)算 wx 時(shí)有:

      k * k * c * H * W * o 次乘法          --(1)
      (k * k * c - 1) * H * W * o 次加法    --(2)
      

      再考慮偏置項(xiàng) b 包含的計(jì)算量:

      由于 b 只存在加法運(yùn)算,輸出 feature map 上的每個(gè) Unit 做一次偏置項(xiàng)加法。因此,該卷積層在計(jì)算偏置項(xiàng)時(shí)總共包含:

      H * W * o 次加法      --(3)
      

      將該卷積層的 wx 和 b 兩部分的計(jì)算次數(shù)累計(jì)起來就有:

      式(1) 次乘法:

      k * k * c * H * W * o 次乘法
      

      式(2) + 式(3) 次加法:

      (k * k * c - 1) * H * W * o  + H * W * o  = k * k * c * H * W * o
      

      可見,式(2) + 式(3) = 式 (1)

      因此,對于帶偏置項(xiàng)的卷積層,該層的計(jì)算力消耗 為:

      k * k * c * H * W * o

      5.3.2 lingvo中實(shí)現(xiàn)

      在 Lingvo 之中,具體計(jì)算算力,就是通過每個(gè)類的 FPropMeta完成,這些方法都是每個(gè)類根據(jù)自己的特點(diǎn)實(shí)現(xiàn)。我們具體找?guī)讉€(gè)例子看看如何計(jì)算FLOPS。

      Conv2DLayerNoPadding如下計(jì)算:

        @classmethod
        def FPropMeta(cls, p, inputs):
          py_utils.CheckShapes((inputs,))
          b, h, w, c = inputs
          fh, fw, ic, oc = p.filter_shape
          assert ic == c
          sh, sw = p.filter_stride
          if p.padding == 'SAME':
            oh = sympy.ceiling(h / sh)
            ow = sympy.ceiling(w / sw)
          else:
            oh = sympy.ceiling((h - fh + 1) / sh)
            ow = sympy.ceiling((w - fw + 1) / sw)
          flops = b * oh * ow * fh * fw * ic * oc * 2  # mul/add counts as 2 flop.
          outputs = tshape.Shape([b, oh, ow, oc])
          return py_utils.NestedMap(flops=flops, out_shapes=(outputs,))
      

      DropoutLayer 如下計(jì)算:

        @classmethod
        def FPropMeta(cls, p, inputs, *args):
          py_utils.CheckShapes((inputs,))
          flops_per_element = 10  # Approximately 10 flops per element.
          return py_utils.NestedMap(
              flops=inputs.num_elements() * flops_per_element, out_shapes=(inputs,))
      
      

      BatchNormLayer 的 FLOPS 如下計(jì)算。

        @classmethod
        def FPropMeta(cls, p, inputs, padding=None):
          py_utils.CheckShapes((inputs,))
          return py_utils.NestedMap(
              flops=inputs.num_elements() * _BN_FLOPS_PER_ELEMENT,
              out_shapes=(inputs,))
      

      ActivationLayer 如下計(jì)算:

        @classmethod
        def FPropMeta(cls, p, inputs):
          py_utils.CheckShapes((inputs,))
          return py_utils.NestedMap(
              flops=inputs.num_elements() * GetFlops(p.activation),
              out_shapes=(inputs,))
      

      0x06 流水線

      6.1 背景知識(shí)

      6.1.1 問題點(diǎn)

      通信問題

      無論是數(shù)據(jù)并行還是模型并行,都會(huì)在相應(yīng)的機(jī)器之間進(jìn)行全連接的通信,當(dāng)機(jī)器數(shù)量增大時(shí),通信開銷和時(shí)延會(huì)大到難以忍受。

      比如參數(shù)服務(wù)器使用中,三段式流程如下:Pull weight ---> Compute new weight ---> Push new weight

      如果三段式流程串行的進(jìn)行通信和計(jì)算,無論這個(gè)通信是快是慢,這個(gè)時(shí)間開銷都會(huì)導(dǎo)致在分布式環(huán)境下每個(gè)iteration的時(shí)間比單機(jī)版要長(Ethernet的帶寬高低或者latency大小只會(huì)影響這個(gè)時(shí)間的長短,但并不能把這個(gè)時(shí)間降到零)。所以,把通信和計(jì)算重疊(overlap)起來以便 “掩蓋” 通信時(shí)間幾乎是一個(gè)必須的步驟。如何重疊計(jì)算和傳輸從而提高設(shè)備利用率就非常有挑戰(zhàn)。

      無法充分利用

      原則上我們可以通過并行計(jì)算在GPU或者TPU上訓(xùn)練更大的DNN模型。但是由于DNN的順序性,這種方法可能導(dǎo)致在計(jì)算期間只有一個(gè)加速器處于活動(dòng)狀態(tài),不能充分利用設(shè)備的計(jì)算能力

      6.1.2 如何設(shè)計(jì)系統(tǒng)

      回到神經(jīng)網(wǎng)絡(luò)的訓(xùn)練過程上,怎么設(shè)計(jì)系統(tǒng)來重疊計(jì)算和通信?

      在后向傳播之中有兩個(gè)特點(diǎn)可以利用:

      • 首先,神經(jīng)網(wǎng)絡(luò)的計(jì)算是一層接著一層完成的,不管是前向還是后向傳播,算完本層才能算下一層;
      • 另一方面,在后向傳播的過程中,一旦后一層拿到前一層的輸入,這一層的計(jì)算就不再依賴于前一層了。

      因此,根據(jù)這個(gè)特點(diǎn),人們引入了流水線并行。

      數(shù)據(jù)并行與模型并行都是讓設(shè)備執(zhí)行同一個(gè)層次的計(jì)算,流水并行則是把任務(wù)劃分為幾個(gè)有明確先后順序的階段,把不同的階段分給不同的計(jì)算設(shè)備,使得單設(shè)備只負(fù)責(zé)網(wǎng)絡(luò)中部分層的計(jì)算。模型網(wǎng)絡(luò)分布在各個(gè)設(shè)備上是非對稱的,各個(gè)設(shè)備“接力”執(zhí)行網(wǎng)絡(luò)的一部分。每個(gè)階段(stage) 和下一個(gè)階段之間僅有相鄰的某一個(gè) Tensor 數(shù)據(jù)需要傳輸,每臺(tái)機(jī)器的數(shù)據(jù)傳輸量跟總的網(wǎng)絡(luò)大小、機(jī)器總數(shù)、并行規(guī)模無關(guān)。

      在這種多設(shè)備接力完成一個(gè)網(wǎng)絡(luò)計(jì)算的模式下,可以支持更大的模型或者支持更大的 Batch Size。如果通信和計(jì)算重疊(overlap)好,又可以解決了機(jī)器之間的通信開銷的問題,

      總的來說,流水并行在通常大模型訓(xùn)練情況下具有優(yōu)勢。流水并行的數(shù)據(jù)傳輸量少,僅為階段之間需要傳輸?shù)臄?shù)據(jù)量之和,不像數(shù)據(jù)并行與模型并行那樣大,傳輸量與整個(gè)計(jì)算圖都有關(guān),因此對于帶寬較小的機(jī)器,會(huì)趨于使用流水并行。但某些情況下,流水并行與模型并行的結(jié)合則會(huì)優(yōu)于單一的模型并行與流水并行。同時(shí),在數(shù)據(jù)并行與模型并行中也存在計(jì)算時(shí)間掩蓋傳輸時(shí)間的優(yōu)化。

      6.2 Gpipe流水線綜述

      6.2.1 關(guān)鍵要點(diǎn)

      從概念上講,GPipe是一個(gè)分布式機(jī)器學(xué)習(xí)庫,它使用同步隨機(jī)梯度下降和流水線并行的方式進(jìn)行訓(xùn)練,適用于任何由多個(gè)有序的層組成的深度神經(jīng)網(wǎng)絡(luò)(Deep Neural Networks, DNN)。 Gpipe通過跨不同的加速器來分割模型,并自動(dòng)將一小批訓(xùn)練示例劃分成更小的批量。 該模型允許GPipe的加速器進(jìn)行并行操作,最大限度地提高了訓(xùn)練過程中的可擴(kuò)展性。

      GPipe 有幾個(gè)關(guān)鍵要點(diǎn):

      • Network partition(網(wǎng)絡(luò)分片):將一個(gè)N層的網(wǎng)絡(luò)劃分成K個(gè)partition, 每個(gè)partition在單獨(dú)的TPU上執(zhí)行,partition之間需要插入一些網(wǎng)絡(luò)通信操作。

      • Pipeline parallelism(流水線并行):把CPU里的流水線并發(fā)技術(shù)用在了深度學(xué)習(xí)上,主要是把計(jì)算和網(wǎng)絡(luò)通信兩種操作,更好地重排列。即自動(dòng)將mini-batch的訓(xùn)練樣本分成更小的micro-batch,并在pipeline中運(yùn)行,使TPU核心能夠并行操作。

      • Gradient Accumulation(梯度累積) :梯度一直在micro-batch中累積,因此分區(qū)數(shù)量不會(huì)影響模型質(zhì)量。

      • Re-Materialization(重計(jì)算) :Re-Materialization具體是指在前向計(jì)算過程中,GPipe只記錄stage劃分處的輸出,在計(jì)算梯度時(shí),GPipe會(huì)重新執(zhí)行前向計(jì)算邏輯,從而得到各個(gè)算子的前向結(jié)果,然后再計(jì)算梯度結(jié)果。跟OpenAI開源的gradient-checkpointing一樣,只不過GPipe是在TPU上實(shí)現(xiàn)的,OpenAI的只能運(yùn)行在GPU上。

      6.2.2 圖示

      • 下圖左端是原始模型。
      • 右端說明了具有多個(gè)有序?qū)拥纳窠?jīng)網(wǎng)絡(luò)的GPipe模型被劃分到了四個(gè)加速器上。 Fk是kth分區(qū)的復(fù)合正向計(jì)算函數(shù)。 Bk是其相對應(yīng)的反向傳播函數(shù)。 Bk依賴于來自上層的Bk+1和Fk的中間激活函數(shù)。
        • 上面的模型中,我們可以看到網(wǎng)絡(luò)的順序性是如何導(dǎo)致資源利用不足的。
        • 下面的模型則顯示了GPipe方法,在該方法中,輸入的小批量示例被劃分為更小的微批量,這些微批量可以由加速器同時(shí)處理。

      6.2.3 問題

      我們根據(jù)論文思路,提出了幾個(gè)問題,爭取以后按圖索驥。

      • 如何劃分 stage?
        • 將模型劃分為連續(xù)的幾個(gè)stage,每個(gè)stage各自對應(yīng)一個(gè)設(shè)備。這樣就使得模型的大小可以突破單個(gè)設(shè)備內(nèi)存的大小,因?yàn)橐慌_(tái)設(shè)備只需要能夠容納部分模型的參數(shù)和計(jì)算;
        • 因?yàn)閯澐至藄tage,所以整個(gè)系統(tǒng)中,處理最慢的stage會(huì)成為瓶頸。所以應(yīng)該平均分配算力
      • 依據(jù)什么分割做流水?
        • 如何將mini-batch進(jìn)一步劃分成更小的micro-batch,同時(shí)利用pipipline方案,每次處理一個(gè)micro-batch的數(shù)據(jù),得到結(jié)果后,將該micro-batch的結(jié)果發(fā)送給下游設(shè)備,同時(shí)開始處理后一個(gè) micro-batch的數(shù)據(jù),通過這套方案減小設(shè)備中的Bubble(設(shè)備空閑的時(shí)間稱為 Bubble)。
      • 如何做Re-Materialization?

      6.3 依據(jù)算力分割Stage

      神經(jīng)網(wǎng)絡(luò)有一個(gè)特點(diǎn):對不同的輸入,其運(yùn)行時(shí)間相差不大,因此可以預(yù)估其算力,時(shí)間,參數(shù)大小等等。Gpipe就是依據(jù)算力對圖進(jìn)行了分割,從而把不同層分配到不同的設(shè)備上。

      6.3.1 PartitionSequentialLayers

      PartitionSequentialLayers 把一個(gè)包括sequential layers的層分解,目的是讓每一個(gè)partition都大致?lián)碛型瑯拥?flops。最終目的是讓每個(gè) GPU 都擁有盡量同樣的算力

      • 輸入是:一個(gè)layer param 或者 一個(gè) layer param 列表;
      • 輸出是:一個(gè) FeatureExtractionLayer params 列表;

      邏輯是:

      • 如果params只是一個(gè)layer,那么就把這個(gè)layer構(gòu)建成一個(gè)包含sub-layers的列表 subs;
      • 利用 FPropMeta 計(jì)算出來這個(gè) subs 列表的shapes和總flops,賦值給了 histo;
      • 利用 histo 計(jì)算出來一個(gè)層代價(jià)(layer's cost)的歸一化累積直方圖
      • 構(gòu)建一個(gè)parts變量;
        • 該變量是一個(gè)num_partitions大小的數(shù)組;數(shù)組每個(gè)item也是一個(gè)數(shù)組;
        • 依據(jù)直方圖把subs分到parts中的每個(gè)item之中,這樣每個(gè)parts[i]都擁有部分layers,一些算力小的算子被合并到一個(gè) part 之中,目的是讓最終 parts 每個(gè)item的算力盡量相同
      • 把parts轉(zhuǎn)換成一個(gè) FeatureExtractionLayer param 列表;
      def PartitionSequentialLayers(params, num_partitions, *shapes):
        r"""Partition a layer composed of sequential layers.
      
        This routine strives to partition layers so that each partition costs roughly
        the same flops given the input shapes.
      
        Args:
          params: A layer param or a list of layer param.
          num_partitions: The desired number of partitions.
          *shapes: A tuple of tshape.Shape representing input tensors to the first
            layer.
      
        Returns:
          A list of FeatureExtractionLayer params.
        """
      
        # Recursively concatenate SequentialLayer into a list.
        # SequentialLayer 是一個(gè)層,其作用是把若干層按順序連接起來
        def FlattenSeq(p):
          if isinstance(p, list): # 已經(jīng)是列表則返回
            return p
          if p.cls not in [builder_layers.SequentialLayer, FeatureExtractionLayer]:
            return [p.Copy()]
          subs = []
          for _ in range(p.repeat): # 把p包含的所有層都組裝成一個(gè)層列表
            for s in p.sub:
              subs += FlattenSeq(s)
          return subs
      
        # 如果params是一個(gè)layer,那么就依據(jù)這個(gè)layer,構(gòu)建一個(gè)包含sub-layers的新列表subs,如果是列表則直接返回
        subs = FlattenSeq(params)
      
        assert len(shapes) == 1
        tf.logging.info('num_partitions: {} input_shape: {}'.format(
            num_partitions, shapes[0]))
      
        # 利用 FPropMeta 計(jì)算出來這個(gè) subs 列表的shapes和總flops,賦值給了 histo
        # Computes the estimate cost for each sub layer.
        # 假設(shè)有7個(gè)sub-layers,其flops分別是 10,40,30,10,20,50,10
        total, histo, output_shapes = 0, [], []
        for i, s in enumerate(subs):
          s.name = 'cell_%03d' % i
          meta = s.cls.FPropMeta(s, *shapes) # 
          total += meta.flops
          histo.append(total)
          output_shapes.append(meta.out_shapes)
          shapes = meta.out_shapes
        tf.logging.vlog(1, 'len %d histogram = %s', len(subs), histo)
        # 則對應(yīng)的histo 為:[10,50,80,90,110,160, 170],total為170
      
        # 利用 histo 計(jì)算出來一個(gè)層代價(jià)(layer's cost)的歸一化累積直方圖
        # Computes the normalized cumulative histogram of the layer's cost.
        histo_pct = [float(x / total) for x in histo]
        tf.logging.vlog(1, 'cost pct = %s', histo_pct)
        # histo_pct 為 [1/17,5/17,8/17,9/17,11/17,16/17, 1], 
        # 假設(shè) num_partitions = 3
      
        # 構(gòu)建一個(gè)parts變量,該變量是一個(gè)num_partitions大小的數(shù)組,數(shù)組每個(gè)item也是一個(gè)數(shù)組
        # 依據(jù)直方圖把subs分到parts中的每個(gè)item之中,這樣每個(gè)parts[i]都擁有部分layers,目的是讓最終 parts 每個(gè)item的算力盡量相同
        # i-th sub layer is put into partition j, where j is roughly i-th cumulative
        # histogram times num_partitions.
      
        parts = [[] for _ in range(num_partitions)]
        parts_cost = [0] * num_partitions
        pre_hist_cost = 0
        for i, s in enumerate(subs):
          # 從histogram數(shù)組中找出s對應(yīng)cost的index,j也就是s對應(yīng)的partition
          # 對于i,s,則 histo_pct[i] * num_partitions 分別為: [3/17, 15/17, 24/17, 27/17, 33/17, 48/17,3],j分別為[0,0,1,1,1,2,2]
          j = min(int(histo_pct[i] * num_partitions), num_partitions - 1)
          # The boundary at parts[j] where j > 0
          if j > 0 and not parts[j]:
            parts_cost[j - 1] = histo_pct[i - 1] - pre_hist_cost
            pre_hist_cost = histo_pct[i - 1]
          parts[j].append(s) # 把s加入到對應(yīng)的partition
          # 三個(gè)桶內(nèi)容分別為:[1,2],[3,4,5],[6,7]
          # 對應(yīng)每個(gè)桶的flops為: [60,280,330]
          
        # 把parts轉(zhuǎn)換成一個(gè) FeatureExtractionLayer 列表
        parts_cost[num_partitions - 1] = 1.0 - pre_hist_cost
        seqs = []
        for i, pa in enumerate(parts):
          tf.logging.info('Partition %d #subs %d #cost %.3f', i, len(pa),
                               parts_cost[i])
          seqs.append(FeatureExtractionLayer.Params().Set(name='d%d' % i, sub=pa))
        return seqs
      

      6.3.2 FeatureExtractionLayer

      上面代碼中使用了FeatureExtractionLayer,其功能就是返回一個(gè)層的序列。

      FeatureExtractionLayer 從一個(gè)層序列中提取特征,具體特點(diǎn)是:

      • 把一些層連接成一個(gè)序列;
      • 可以 得到&傳遞 激活點(diǎn);
      class FeatureExtractionLayer(base_layer.BaseLayer):
        """A layer that extrac features from a sequence of layers.
      
        FeatureExtractionLayer is a layer which connects a few layers in a sequence.
        It is also capable of fetching and forwarding activation endpoints.
        # TODO(huangyp): Make it a sublayer of builder_layers.SequentialLayer
        """
      
        @classmethod
        def Params(cls):
          p = super().Params()
          p.Define('variable_name_prefix', '',
                   'Prefix for variable names in sub layers')
          p.Define('sub', [], 'A list of layers\' params.')
          p.Define('num_act_inputs', 0, 'Number of activation inputs.')
          p.Define('num_act_outputs', 0, 'Number of activation outputs.')
          p.Define('act_fetch_layers', [],
                   'Names of fetch layers that cached extra activations')
          return p
      
        def __init__(self, params):
          super().__init__(params)
          p = self.params
          assert p.num_act_inputs >= 0
          assert p.num_act_outputs >= 0
          p.act_fetch_layers = p.act_fetch_layers or []
          assert p.num_act_outputs == p.num_act_inputs + len(p.act_fetch_layers)
          self._seq = []
          for sub in p.sub:
            assert sub.name
            sub.name = p.variable_name_prefix + sub.name
            self.CreateChild(sub.name, sub)
            self._seq.append((sub.name, self.children[sub.name])) # 把一些層連接成一個(gè)序列
      
        def FProp(self, theta, *args): # 實(shí)現(xiàn)該層的前向傳播,在計(jì)算的前向step時(shí)將會(huì)被調(diào)用
          p = self.params
          assert len(args) > p.num_act_inputs
          out_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
          extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
          for (name, ch) in self._seq:
            th = theta[name]
            out_args = _ToTuple(out_args)
            out_args = ch.FProp(th, *out_args)
          # Append fetched activations to fprop outputs.
          for fetch_layer in p.act_fetch_layers:
            assert fetch_layer in self.children
            activation = self.children[fetch_layer].activation # 子層激活點(diǎn)
            if isinstance(activation, (tuple, list)):
              activation = activation[0] # 如果是list,得到相應(yīng)激活點(diǎn)
            extra_args += (activation,) # 把激活點(diǎn)添加進(jìn)來
          if extra_args:
            out_args = _ToTuple(out_args) + extra_args # 最終返回所有激活點(diǎn)
          return out_args
      
        @classmethod
        def FPropMeta(cls, p, *args): # 返回該層關(guān)于`FProp`計(jì)算的元數(shù)據(jù)
          assert len(args) > p.num_act_inputs
          seq_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
          extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
          total = 0
          act_fetch_metas = {}
          for sub in p.sub:
            meta = sub.cls.FPropMeta(sub, *seq_args)
            if sub.name in p.act_fetch_layers:
              act_fetch_metas[sub.name] = meta.out_shapes[0]
            total += meta.flops
            seq_args = meta.out_shapes
          for fetch_layer in p.act_fetch_layers:
            extra_args += (act_fetch_metas[fetch_layer],)
          return py_utils.NestedMap(flops=total, out_shapes=seq_args + extra_args)
      

      計(jì)算過程如下圖,里面具體數(shù)值請參見上面幾段代碼之中的舉例:

        +--------------+   +--------------+   +---------------+
        |              |   |              |   |               |
        |  sub-layer 1 |   |  sub-layer 2 |   |  sub-layer n  |
        |              |   |              |   |               |
        +-------+------+   +-------+------+   +--------+------+
                |                  |                   |
                |FPropMeta         |FPropMeta          |FPropMeta
                |                  |                   |
                v                  v                   v
             flops 1            flops 2             flops n
                +                  +                   +
                |                  |                   |
                |                  |                   |
                +--------------------------------------+
                                   |
                                   |
                                   v
                        for i, s in enumerate(subs):
                           total += meta.flops
                           histo.append(total)
                        histo=[10,50,80,90,110,160,170]
                                   +
                                   |
                                   |
                                   v
      Computes the normalized cumulative histogram of the layer's cost
              histo_pct = [float(x / total) for x in histo]
             histo_pct=[1/17,5/17,8/17,9/17,11/17,16/17,1]
                                   +
                                   |
                                   |
                                   +
                 Assign layers to partition based on histogram
                         [1,2],[3,4,5],[6,7]
                                   +
                                   |
                                   |
                                   v
            +----------------------+----------------------------+
            | parts                                             |
            |                                                   |
            | +--------------+  +------------+  +-------------+ |
            | | sub-layer 1  |  |sub-layer 3 |  | sub-layer 6 | |
            | |              |  |            |  |             | |
            | | sub-layer 2  |  |sub-layer 4 |  | sub-layer 7 | |
            | |              |  |            |  |             | |
            | |              |  |sub-layer 5 |  |             | |
            | +--------------+  +------------+  +-------------+ |
            +---------------------------------------------------+
      

      6.4 流水線分配

      6.4.1 基礎(chǔ)類 SeqLayer

      該層的目的是:用 Round-robin 策略把 cell_tpl之中的每個(gè)children cell 部署到 工作設(shè)備之上

      Params 包括:

      • before_tpl :配置在流水線之前運(yùn)行的CNN層;
      • cell_tpl :FeatureExtractionLayer 列表;
      6.4.1.1 初始化

      初始化函數(shù)的邏輯是:

      • 遍歷before_tpl,對于每個(gè)item調(diào)用CreateChild構(gòu)建其子層,把item添加到 _before_layers 之中;
      • 遍歷cell_tpl,對于每個(gè)item調(diào)用CreateChild構(gòu)建其子層,把item添加到 _cells 之中;
        def __init__(self, params):
          super().__init__(params)
          p = self.params
          self._before_layers = []
          self._cells = []
          # 遍歷before_tpl,對于每個(gè)item調(diào)用CreateChild構(gòu)建其子層,把item添加到 _before_layers 之中
          for l in p.before_tpl:
            self.CreateChild(l.name, l)
            self._before_layers.append((l.name, self.children[l.name]))
          # 遍歷cell_tpl,對于每個(gè)item調(diào)用CreateChild構(gòu)建其子層,把item添加到 _cells 之中  
          for l in p.cell_tpl:
            self.CreateChild(l.name, l)
            self._cells.append((l.name, self.children[l.name]))
      
      6.4.1.2 _CreateChildrenVariables

      構(gòu)建變量。邏輯如下:

      • 如果使用 tpu,則
        • 利用 cluster.WorkerDeviceInModelSplit(0) 來構(gòu)建 before_tpl_device,即用集群的第一個(gè)設(shè)備作為 before_tpl_device;
        • 遍歷集群的其他設(shè)備,分配給cell_devices;
      • 遍歷 _before_layers,把其中每個(gè)變量部署在 before_tpl_device;
      • 遍歷 _cells,把其中每個(gè)變量部署在 cell_devices;
        def _CreateChildrenVariables(self):
          p = self.params
      
          num_cells = len(p.cell_tpl)
          before_tpl_device = ''
          cell_devices = [''] * num_cells
          if py_utils.use_tpu(): # 如果使用 tpu
            # 利用 `cluster.WorkerDeviceInModelSplit(0)` 來構(gòu)建 before_tpl_device,即用集群的第一個(gè)設(shè)備作為 before_tpl_device
            cluster = self.cluster
            before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
            # 遍歷集群的其他設(shè)備,分配給cell_devices
            cell_devices = [
                cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
            ]
      
          # 遍歷 _before_layers,把其中每個(gè)變量部署在 before_tpl_device
          for unused_name, l in self._before_layers:
            with tf.device(before_tpl_device):
              l.InstantiateVariables()
      
          # 遍歷 _cells,把其中每個(gè)變量部署在 cell_devices
          for i, (unused_name, l) in enumerate(self._cells):
            with tf.device(cell_devices[i]):
              l.InstantiateVariables()
      
          super()._CreateChildrenVariables()
      
      6.4.1.3 FProp

      前向傳播代碼,具體邏輯如下:

      • 遍歷 _before_layers,對于其中每層調(diào)用其FProp;
      • 遍歷 _cells,對于其中每層,在cluster.WorkerDeviceInModelSplit(i)之上調(diào)用其FProp;
        def FProp(self, theta, *args):
          """Round-robin every children cells in cell_tpl among worker devices.
      
          Args:
            theta: A NestedMap object containing weights' values of this layer and its
              children layers.
            *args: Input args
      
          Returns:
            A list contains one tensor of [batch_size, feature_height, feature_width,
              channel].
          """
          num_layers = len(self.params.cell_tpl)
          cluster = self.cluster
      
          # 遍歷 _before_layers,對于其中每層調(diào)用其FProp	
          for (name, l) in self._before_layers:
            l_theta = theta[name]
            args = _ToTuple(args)
            args = l.FProp(l_theta, *args)
          # 遍歷 _cells,對于其中每層,在`cluster.WorkerDeviceInModelSplit(i)`之上調(diào)用其FProp  
          for i in range(num_layers):
            with tf.device(cluster.WorkerDeviceInModelSplit(i)):
              cell_name, cell = self._cells[i]
              args = _ToTuple(args)
              args = cell.FProp(theta[cell_name], *args)
      
          return args
      
      6.4.1.4 具體實(shí)現(xiàn)

      SeqLayer 全部代碼如下:

      class SeqLayer(base_layer.BaseLayer):
        """Round-robin every children cells in cell_tpl among worker devices."""
      
        @classmethod
        def Params(cls):
          p = super().Params()
          p.Define('before_tpl', [],
                   'Config for the CNN layers that runs before pipelining.')
          p.Define('cell_tpl', [], 'A list of FeatureExtractionLayer layers.')
          return p
      
        def __init__(self, params):
          super().__init__(params)
          p = self.params
          self._before_layers = []
          self._cells = []
          for l in p.before_tpl:
            self.CreateChild(l.name, l)
            self._before_layers.append((l.name, self.children[l.name]))
          for l in p.cell_tpl:
            self.CreateChild(l.name, l)
            self._cells.append((l.name, self.children[l.name]))
      
        def _CreateChildrenVariables(self):
          p = self.params
      
          num_cells = len(p.cell_tpl)
          before_tpl_device = ''
          cell_devices = [''] * num_cells
          if py_utils.use_tpu():
            cluster = self.cluster
            before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
            cell_devices = [
                cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
            ]
      
          for unused_name, l in self._before_layers:
            with tf.device(before_tpl_device):
              l.InstantiateVariables()
      
          for i, (unused_name, l) in enumerate(self._cells):
            with tf.device(cell_devices[i]):
              l.InstantiateVariables()
      
          super()._CreateChildrenVariables()
      
        def FProp(self, theta, *args):
          """Round-robin every children cells in cell_tpl among worker devices.
      
          Args:
            theta: A NestedMap object containing weights' values of this layer and its
              children layers.
            *args: Input args
      
          Returns:
            A list contains one tensor of [batch_size, feature_height, feature_width,
              channel].
          """
          num_layers = len(self.params.cell_tpl)
          cluster = self.cluster
      
          for (name, l) in self._before_layers:
            l_theta = theta[name]
            args = _ToTuple(args)
            args = l.FProp(l_theta, *args)
          for i in range(num_layers):
            with tf.device(cluster.WorkerDeviceInModelSplit(i)):
              cell_name, cell = self._cells[i]
              args = _ToTuple(args)
              args = cell.FProp(theta[cell_name], *args)
      
          return args
      

      6.4.2 具體分配 PipeliningLayer

      PipeliningLayer 是 SeqLayer 的派生類

      • 在流水線最前面是device[0],負(fù)責(zé)處理前置條件。
      • 流水線中間是一系列 device,負(fù)責(zé)處理具體的 micro batches。
      • 流水線最后是 device[-1],負(fù)責(zé)整理形狀,最后輸出一個(gè)最終張量。
      6.4.2.1 得到中間層輸出形狀

      _CalculateOutputShapes 計(jì)算出中間層的output shape。具體邏輯如下:

      • 遍歷 _before_layers,對其中每層調(diào)用其FPropMeta,得到 output shapes,插入 state_shapes 數(shù)組之中;
      • 遍歷 _cells,對其中每層調(diào)用其FPropMeta,得到 output shapes,插入 state_shapes 數(shù)組之中;
        def _CalculateOutputShapes(self, input_shapes):
          """Calcuate the output shape of intermediate layers.
      
          Given the FPropMeta function in each FeatureExtractionLayer, calcuates
          the shapes of outputs of that layer. This is used to recover the shape
          information in StackedRecurrent.
      
          Args:
            input_shapes: NestedMap or tuple of input TensorShapes.
      
          Returns:
            Return a list of K + 1 NestedMaps or lists of tShape where K is
            the number of partitions.
          """
          p = self.params
          shapes = []
      
          # Converts TensorShape to tshape.Shape.
          def _ToTShape(x):
            if x is None:
              return None
            return tshape.Shape(x.as_list())
      
          shapes = py_utils.Transform(_ToTShape, input_shapes)
          shapes = _ToTuple(shapes)
      
          state_shapes = []
          # 遍歷_before_layers,對其中每層調(diào)用其FPropMeta,得到 output shapes,插入 state_shapes 數(shù)組之中
          for (_, cell) in self._before_layers:
            shapes = cell.FPropMeta(cell.params, *shapes).out_shapes
      
          state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)
      
          # 遍歷 _cells,對其中每層調(diào)用其FPropMeta,得到 output shapes,插入 state_shapes 數(shù)組之中
          for (_, cell) in self._cells:
            shapes = cell.FPropMeta(cell.params, *shapes).out_shapes
            state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)
      
          return state_shapes
      
      6.4.2.2 得到數(shù)據(jù)類型

      _get_state_dtype 的作用是得到數(shù)據(jù)類型。

        def _get_state_dtype(self, *args):
          if self.params.state_dtype:
            return self.params.state_dtype
          if self.params.nested_map_fprop:
            inputs = args[0].Filter(lambda x: x is not None)
            return py_utils.Flatten(inputs)[0].dtype
          return args[0].dtype
      
      6.4.2.3 得到輸入形狀

      Gpipe 會(huì)首先將一個(gè)小批量的訓(xùn)練樣本(mini-batch)分割成更小的小批量(micro-batches),然后將每組小批量的執(zhí)行通過管道傳送到單元上。

      _get_input_shapes作用是得到輸入的shapes,具體邏輯如下:

      • 從 args 得到輸入 input_tensors;
      • 遍歷 input_tensors,找出第一個(gè)不為空的張量,獲取這個(gè)張量的 batch size,賦給 mini_batch_size;
      • 從參數(shù)中得到 micro_batch_size,設(shè)置到 micro_batch_size;
      • 如果 micro_batch_size 沒有意義,則:
        • 如果 p.num_micro_batches 大于 mini_batch_size,則 p.num_micro_batches 為 mini_batch_size;
        • 把 micro_batch_size 設(shè)置為 mini_batch_size // p.num_micro_batches;
      • 建立一個(gè) input_shapes 集合,遍歷 input_tensors,對于每個(gè)張量,得到其shapes列表 input_shape,并且設(shè)置 input_shape 的 batch_dim 為 micro_batch_size;
      • 如果設(shè)置了 p.nested_map_fprop,則把 input_shapes 構(gòu)建成一個(gè)遞歸嵌套的結(jié)構(gòu);
      • 返回 input_shapes;
        def _get_input_shapes(self, *args):
          p = self.params
          if p.nested_map_fprop:
            assert len(args) == 1
            assert isinstance(args[0], py_utils.NestedMap)
            input_tensors = py_utils.Flatten(args[0])
          else:
            input_tensors = _ToTuple(args)
          
          # 遍歷 input_tensors,找出第一個(gè)不為空的張量,獲取這個(gè)張量的 batch size,賦給 mini_batch_size
          # Get batch size from the first tensor which is not None.
          mini_batch_size = None
          for input_tensor in input_tensors:
            if input_tensor is not None:
              mini_batch_size = input_tensor.get_shape().as_list()[p.batch_dim]
          assert mini_batch_size is not None
          micro_batch_size = p.micro_batch_size
          
          if not micro_batch_size: # 如果 micro_batch_size 沒有意義
            # 如果 p.num_micro_batches 大于 mini_batch_size,則 p.num_micro_batches 為 mini_batch_size
            if p.num_micro_batches > mini_batch_size:
              p.num_micro_batches = mini_batch_size
            # 把 micro_batch_size 設(shè)置為 mini_batch_size // p.num_micro_batches  
            micro_batch_size = mini_batch_size // p.num_micro_batches
          if mini_batch_size is not None:
            if micro_batch_size * p.num_micro_batches != mini_batch_size:
              raise ValueError('micro_batch_size * num_micro_batches != batch_size.')
      
          # 遍歷 input_tensors,對于每個(gè)張量,得到其shapes列表 input_shape,并且設(shè)置 input_shape 的 batch_dim 為 micro_batch_size
          input_shapes = ()
          for input_tensor in input_tensors:
            if input_tensor is not None:
              input_shape = input_tensor.get_shape().as_list()
              input_shape[p.batch_dim] = micro_batch_size
              input_shapes += (tf.TensorShape(input_shape),)
            else:
              input_shapes += (None,)
      
          # 如果設(shè)置了 p.nested_map_fprop,則把 input_shapes 構(gòu)建成一個(gè)遞歸嵌套的結(jié)構(gòu)    
          if p.nested_map_fprop:
            input_shapes = py_utils.Pack(args[0], input_shapes)
          return input_shapes
      
      6.4.2.4 FProp

      前向傳播函數(shù),用流水線方式在多個(gè)設(shè)備上運(yùn)行多個(gè) children cells。具體邏輯如下:

      • 做一些準(zhǔn)備工作,比如:
        • 得到 children cell個(gè)數(shù);
        • 得到集群;
        • 得到 輸入shapes,dtypes;
        • 利用 輸入shapes 計(jì)算出 輸出shapes;
      • 遍歷處理中間層:
        • 對于具體cell,把cell加入到累積層中,對于每個(gè)cell,得到對應(yīng)的function;
        • 為后續(xù)的 StackedRecurrent 運(yùn)行設(shè)置其初始狀態(tài);
        • 把cell_idx對應(yīng)的設(shè)備加入到devices列表;
      • 為流水線中間(去除頭尾)的各個(gè)設(shè)備設(shè)定一些變量;
      • 在第一個(gè)設(shè)備上執(zhí)行如下操作:
        • 遍歷_before_layers,運(yùn)行每層的FProp,最終得到 previous;
        • 對于 previous 繼續(xù)操作,構(gòu)建出 inputs,即利用_StackAndSplit分割張量;
        • 為流水線后續(xù)設(shè)備設(shè)置其輸入;
      • 在中間設(shè)備上執(zhí)行recurrent.StackedRecurrent操作 ;
      • 在最后一個(gè)設(shè)備上把micro_batches的形狀聚合,最終得到輸出張量:
        • 如果嵌套,則返回最后一個(gè)形狀;
        • 否則遍歷輸出,聚合各個(gè)輸出的形狀;
        def FProp(self, theta, *args):
          """Run multiple cells in different devices in a pipelining manner.
      
          Args:
            theta: A NestedMap object containing weights' values of this layer and its
              children layers.
            *args: Non-keyworded variable length argument list of input tensors.
      
          Returns:
            A list of output tensors
          """
          # TODO(huangyp): handle optional None inputs.
          p = self.params
          if self.do_eval and self.cluster.num_devices_per_split == 1: # 如果設(shè)置了 do_eval 并且集群的 num_devices_per_split 為 1
            outputs = copy.copy(args)
            # 就直接串行執(zhí)行
            for (name, l) in self._before_layers + self._cells:
              outputs = _ToTuple(outputs)
              outputs = l.FProp(theta[name], *outputs)
            return outputs
      
          num_cells = len(p.cell_tpl) # 得到 children cell個(gè)數(shù)
          cluster = self.cluster # 得到集群
      
          # Compute shapes of input and output tensors.
          # 得到 輸入shapes,dtypes
          input_shapes = self._get_input_shapes(*args)
          state_dtype = self._get_state_dtype(*args)
          # 利用 輸入shapes 計(jì)算出 輸出shapes
          state_shapes = self._CalculateOutputShapes(input_shapes)
          tf.logging.info('state_shapes={}'.format(state_shapes))
      
          def GetCellFn(i): # 對于第 i 個(gè)層,返回一個(gè)對應(yīng)的函數(shù),這個(gè)函數(shù)將在 StackedRecurrent 內(nèi)部執(zhí)行
            """Get the ith feature extraction layer."""
      
            def CellFn(theta, state0, inputs):
              """A cell fn is exectued inside of StackedRecurrent."""
              # 沒有深入研究StackedRecurrent,只從此函數(shù)看,作用是利用cell.FProp計(jì)算輸出,并且得到一個(gè)state,其中包括輸出和micro batch tensor
              del state0
      
              def _FPropInputSetShape(name, t_shape): # 給輸入設(shè)置shape
                if t_shape is None:
                  return None
                inputs[name].set_shape(t_shape.ToTensorShape().as_list())
                return inputs[name]
      
              if p.nested_map_fprop:
                # pylint: disable=protected-access
                fprop_inputs = state_shapes[i]._RecursiveMap(_FPropInputSetShape)
                # pylint: enable=protected-access
              else:
                fprop_inputs = []
                for input_idx, input_shape in enumerate(state_shapes[i]):
                  name = 's{}'.format(input_idx)
                  fprop_inputs.append(_FPropInputSetShape(name, input_shape))
      
              with py_utils.RemoveAssertContext(remove=True):
                with CellFnFPropOpReplacementWrapper():
                  tf.logging.info('cell {} input {}'.format(i, fprop_inputs))
                  mb_tensor = inputs[_MICRO_BATCH_STATE_NAME] # 得到輸入的 micro batch tensor
                  SetOverWriteGlobalStep(mb_tensor)
                  _, cell = self._cells[i]
                  fprop_inputs = _ToTuple(fprop_inputs)
                  outputs = cell.FProp(theta, *fprop_inputs) # 計(jì)算輸出
      
              if p.nested_map_fprop:
                assert py_utils.IsCompatible(outputs, state_shapes[i + 1])
                state1 = outputs.Filter(lambda x: x is not None)
              else:
                state1 = py_utils.NestedMap()
                outputs = _ToTuple(outputs)
                assert len(outputs) == len(state_shapes[i + 1])
                for output_idx in range(len(outputs)):
                  if outputs[output_idx] is not None:
                    name = 's{}'.format(output_idx)
                    state1[name] = outputs[output_idx]
              state1[_MICRO_BATCH_STATE_NAME] = mb_tensor
              return state1, py_utils.NestedMap()
      
            return CellFn
      
          cell_fns = []
          accumulator_layers = [] # 為了梯度累積
          thetas = []
          init_states = []
          devices = []
          # 遍歷,把cell_idx對應(yīng)的設(shè)備加入到devices列表
          for cell_idx in range(num_cells): # 遍歷 children cell
            cell_name, cell = self._cells[cell_idx] # 得到具體一個(gè) cell
            accumulator_layers.append(cell) # 把cell加入到累積層中
            cell_fns.append(GetCellFn(cell_idx)) # 對于每個(gè)cell,得到對應(yīng)的function
            thetas.append(theta[cell_name]) # 添加 theta
      
            # 返回一個(gè)帶有形狀t_shape的,類型為state_dtype的張量,并且所有元素都設(shè)為零.
            def _TfZeros(t_shape):
              if t_shape is None:
                return None
              return tf.zeros(t_shape.ToTensorShape().as_list(), dtype=state_dtype)
      
            # 為后續(xù)的 StackedRecurrent 運(yùn)行設(shè)置其初始狀態(tài)
            if p.nested_map_fprop:
              init_state = py_utils.Transform(_TfZeros, state_shapes[cell_idx + 1])
              init_state = init_state.Filter(lambda x: x is not None)
            else:
              init_state = py_utils.NestedMap()
              for output_idx, state in enumerate(state_shapes[cell_idx + 1]):
                state = _TfZeros(state)
                if state is not None:
                  name = 's{}'.format(output_idx)
                  init_state[name] = state
            init_state[_MICRO_BATCH_STATE_NAME] = tf.cast(0, dtype=state_dtype)
            init_states.append(init_state)
      
            # 把cell_idx對應(yīng)的設(shè)備加入到devices列表
            devices.append(cluster.WorkerDeviceInModelSplit(cell_idx))
      
          # 為流水線中間(去除頭尾)的各個(gè)設(shè)備設(shè)定一些變量
          cell_grads = [None] * num_cells
          cell_outs = [lambda x: x] * num_cells
          cell_out_grads = [lambda x: x] * num_cells
      
          # 在第一個(gè)設(shè)備上執(zhí)行如下操作
          with tf.device(devices[0]): 
            previous = _ToTuple(args)
            for (name, l) in self._before_layers: # 遍歷_before_layers,運(yùn)行每層的FProp,最終得到 previous
              previous = l.FProp(theta[name], *previous)
              previous = _ToTuple(previous)
      
            def _StackAndSplit(x): # 把張量分割成
              # Split tensors into microbatches.
              if x is None:
                return None
              # tf.split按照行或者列分割一個(gè)矩陣
              return tf.stack(tf.split(x, p.num_micro_batches, axis=p.batch_dim))
      
            # 對于 previous 繼續(xù)操作,構(gòu)建出 inputs,即利用_StackAndSplit分割張量
            if p.nested_map_fprop: # 嵌套情況,只選取previous[0]做處理
              inputs = py_utils.Transform(_StackAndSplit, previous[0]) #利用_StackAndSplit分割張量
              inputs = inputs.Filter(lambda x: x is not None)
            else: # 非嵌套
              inputs = py_utils.NestedMap()
              for output_idx, output_tensor in enumerate(previous): # 遍歷第一層的輸出
                output_tensor = _StackAndSplit(output_tensor) # 利用_StackAndSplit分割張量
                if output_tensor is not None:
                  name = 's{}'.format(output_idx)
                  inputs[name] = output_tensor
            gs_tensor = py_utils.GetGlobalStep()
            # 為流水線后續(xù)設(shè)備設(shè)置其輸入
            inputs[_MICRO_BATCH_STATE_NAME] = tf.stack([
                tf.cast(gs_tensor * p.num_micro_batches + t, dtype=state_dtype)
                for t in range(p.num_micro_batches)
            ])
            
          # 在中間設(shè)備上執(zhí)行操作    
          tf.logging.info('pipeline input = {}'.format(inputs))
          output_state, _ = recurrent.StackedRecurrent( 
              devices=devices,
              cell_fns=cell_fns,
              cell_grads=cell_grads,
              cell_outs=cell_outs,
              cell_out_grads=cell_out_grads,
              thetas=thetas,
              init_states=init_states,
              inputs=inputs,
              accumulator_layers=accumulator_layers,
              unused_acc_state=True)
      
          # 在最后一個(gè)設(shè)備上執(zhí)行如下操作,最終得到輸出張量
          with tf.device(devices[-1]):
            def _ReshapeRetVal(name, t_shape): # 把micro_batches的形狀聚合,得到最終輸出
              """Restore shape for tensors in microbatches."""
              if t_shape is None:
                return None
              output_tensor = output_state[name]
              if p.batch_dim != 0:
                perm = list(range(1, p.batch_dim + 1)) + [0]
                perm += list(range(p.batch_dim + 1, t_shape.rank + 1))
                output_tensor = tf.transpose(output_tensor, perm=perm)
              output_shape = t_shape.ToTensorShape().as_list()
              output_shape[p.batch_dim] *= p.num_micro_batches
              output_tensor = tf.reshape(output_tensor, output_shape)
              return output_tensor
      
            # Construct the final return values from output_state.
            if p.nested_map_fprop: # 如果嵌套,則返回最后一個(gè)形狀
              # pylint: disable=protected-access
              output_tensors = state_shapes[-1]._RecursiveMap(_ReshapeRetVal) # 聚合形狀
              # pylint: enable=protected-access
            else:
              output_tensors = []
              # 遍歷輸出,聚合各個(gè)輸出的形狀
              for output_idx, state_shape in enumerate(state_shapes[-1]): 
                output_name = 's{}'.format(output_idx)
                output_tensor = _ReshapeRetVal(output_name, state_shape) # 聚合形狀
                output_tensors.append(output_tensor)
              if len(output_tensors) == 1:
                output_tensors = output_tensors[0]
              else:
                output_tensors = tuple(output_tensors)
              
            tf.logging.info('pipeline output = {}'.format(output_tensors))
            return output_tensors
      
      6.4.2.5 類定義

      具體代碼如下:

      class PipeliningLayer(SeqLayer):
        """Pipelining a sequence of layers on multiple devices."""
      
        @classmethod
        def Params(cls):
          p = super().Params()
          p.Define('num_micro_batches', 1, 'Number of micro batches.')
          p.Define('micro_batch_size', None, 'Size of a micro batch.')
          p.Define('batch_dim', 0, 'The batch dimension.')
          p.Define('state_dtype', None, 'Externally specify dtype for states.')
          p.Define(
              'nested_map_fprop', False, 'Whether arguments and returns of '
              'cell fprop functions are nested maps')
          return p
      

      具體FProp函數(shù)邏輯如下圖:

      +--------------------------------------------------------------+
      | FProp             _CalculateOutputShapes                     |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |                        state_shapes                          |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |                for cell_idx in range(num_cells):             |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |       devices.append(WorkerDeviceInModelSplit(cell_idx))     |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |                  with tf.device(devices[0])                  |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |             recurrent.StackedRecurrent(cell_outs)            |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |                 with tf.device(devices[-1])                  |
      |                             +                                |
      |                             |                                |
      |                             |                                |
      |                             v                                |
      |                       output_tensors                         |
      |                                                              |
      +--------------------------------------------------------------+
      

      Device流水線邏輯如下:

                         devices[0]
                             +
                             |
                             |
                             |
                             v
      +----------------------+-------------------------+
      |Pipeline                                        |
      |                         devices[1]             |
      |                             +                  |
      |                             |                  |
      |                             |                  |
      |                             v                  |
      |  cell_grads[1~n]        devices[2]             |
      |                             +                  |
      |  cell_outs[1~n]             |                  |
      |                             |                  |
      |  cell_out_grads[1~n]        v                  |
      |                         devices[3]             |
      |                             +                  |
      |                             |                  |
      |                             |                  |
      |                             v                  |
      |                         devices[4]             |
      |                                                |
      +----------------------+-------------------------+
                             |
                             |
                             |
                             v
                         devices[-1]
      
      6.4.2.6 使用

      源碼中給出的例子是 GPipeBatchMajorTransformerStack,目前看來,繼承PipeliningLayer即可。

      class GPipeBatchMajorTransformerStack(PipeliningLayer):
        """Stacked self- multi-head attention and fully connected layers.
      
        With optional layer normalization applied to the final output.
      
        See 'Attention Is All You Need' https://arxiv.org/abs/1706.03762
        for details. 
      
        Implements a gipe stack for the batch major transformer variant.
        """
      

      GPipeBatchMajorTransformerStack 的 FProp 返回一個(gè)輸出張量的列表,其中下面代碼調(diào)用了PipeliningLayer的功能。

      logits = super().FProp(theta, source_input, source_paddings, target_input,
                             target_paddings, encoder_self_atten_segment_mask,
                             decoder_self_atten_segment_mask,
                             decoder_cross_atten_segment_mask, source_segment_pos,
                             target_segment_pos)
      

      具體代碼如下:

        def FProp(self,
                  theta,
                  source_input,
                  source_paddings,
                  target_input=None,
                  target_paddings=None,
                  source_segment_id=None,
                  target_segment_id=None,
                  labels=None,
                  label_weights=None,
                  source_segment_pos=None,
                  target_segment_pos=None):
      
          p = self.params
          if p.num_decoder_layers > 0:
            assert target_input is not None
            assert target_paddings is not None
            target_time = tf.shape(target_input)[1]
            batch = tf.shape(target_input)[0]
          encoder_self_atten_segment_mask = None
          decoder_self_atten_segment_mask = None
          decoder_cross_atten_segment_mask = None
      
          # Prepare segment masks from segment ids.
          if p.packed_input:
            dtype = py_utils.FPropDtype(p)
            assert source_segment_id is not None, (
                'Need to specify src_segment_id if packed input is supported.')
            assert source_segment_pos is not None, (
                'Need to specify src_segment_pos for packed input and embeddings.')
            encoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
                source_segment_id, source_segment_id, dtype, False)
            if target_segment_id is not None:
              decoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
                  target_segment_id, target_segment_id, dtype, False)
              causal_padding = tf.expand_dims(
                  tf.tile(
                      tf.expand_dims(
                          batch_major_attention.CausalPadding(
                              target_time, dtype=dtype), 0), [batch, 1, 1]), 1)
              decoder_self_atten_segment_mask = tf.math.maximum(
                  causal_padding, decoder_self_atten_segment_mask)
              decoder_cross_atten_segment_mask = batch_major_attention.SegmentMask(
                  target_segment_id, source_segment_id, dtype, False)
      
          # FProp through the gpipe pipeline.
          # 這里調(diào)用了基類的PipeliningLayer,完成流水線操作。
          logits = super().FProp(theta, source_input, source_paddings, target_input,
                                 target_paddings, encoder_self_atten_segment_mask,
                                 decoder_self_atten_segment_mask,
                                 decoder_cross_atten_segment_mask, source_segment_pos,
                                 target_segment_pos)
                  
          label_weights = tf.reshape(label_weights, [-1])
          target_probs = None
          if p.label_smoothing:
            target_probs = self.smoother.FProp(
                theta.smoother, target_paddings, labels, target_ids=None)
            target_probs = tf.reshape(target_probs, [-1, p.softmax_tpl.num_classes])
          reshaped_logits = tf.reshape(logits, [-1, p.softmax_tpl.num_classes])
          tgt_labels = tf.reshape(labels, [-1])
          num_splits = len(p.splits)
          softmax = self.children['cell_{}'.format(num_splits - 1)].softmax
          softmax_theta = theta['cell_{}'.format(num_splits - 1)].softmax
          per_example_xent, _ = softmax.XentLossFromLogits(
              softmax_theta,
              reshaped_logits,
              class_weights=tf.reshape(label_weights, [-1]),
              class_ids=tgt_labels,
              class_probabilities=target_probs)
          xent_shape = tf.shape(logits)[:2]
          per_example_xent = tf.reshape(per_example_xent, xent_shape)
          return per_example_xent, logits
      

      0xFF 參考

      僅此一文讓你掌握OneFlow框架的系統(tǒng)設(shè)計(jì)(上篇)

      DeepSpeed: Extreme-scale model training for everyone

      [譯] DeepSpeed:所有人都能用的超大規(guī)模模型訓(xùn)練工具

      訓(xùn)練GPT-3,為什么原有的深度學(xué)習(xí)框架吃不消?

      GPT-3模型為何難以復(fù)現(xiàn)?這也許是分布式AI框架的最優(yōu)設(shè)計(jì)

      FLOPs與模型推理速度

      深度學(xué)習(xí)中parameters個(gè)數(shù)和FLOPS計(jì)算(以CNN中經(jīng)典的AlexNet網(wǎng)絡(luò)結(jié)構(gòu)為例)

      CNN 模型所需的計(jì)算力flops是什么?怎么計(jì)算?

      CNN中計(jì)算量FLOPs的計(jì)算

      有關(guān)FLOPS的定義與計(jì)算

      論文解讀系列第十三篇:ZeRO——面向萬億級參數(shù)的模型訓(xùn)練方法

      模型并行最佳實(shí)踐(PyTorch)

      Tensorflow: Model parallelism 模型并行計(jì)算

      https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

      pytorch 模型并行 model parallel

      https://arxiv.org/pdf/1802.09941.pdf

      深度學(xué)習(xí)的模型并行是什么原理?

      https://www.microsoft.com/en-us/research/uploads/prod/2019/08/fiddle_pipedream_sosp19.pdf

      論文解讀系列第五篇:微軟斯坦福等PipeDream快速訓(xùn)練大規(guī)模神經(jīng)網(wǎng)絡(luò)

      微軟提出 DNN 并行訓(xùn)練新方法 PipeDream,比傳統(tǒng)方法快四倍

      如何評價(jià)Google 開源的并行加速庫 GPipe?

      論文解讀系列第四篇:谷歌GPipe訓(xùn)練超大規(guī)模神經(jīng)網(wǎng)絡(luò)

      如何減少神經(jīng)網(wǎng)絡(luò)的內(nèi)存?

      訓(xùn)練時(shí)顯存優(yōu)化技術(shù)——OP合并與gradient checkpoint

      顯存不夠時(shí),如何利用GPU訓(xùn)練數(shù)據(jù)

      GPU 顯存不足怎么辦?

      模型訓(xùn)練太慢?顯存不夠?這個(gè)方法讓你的GPU聯(lián)手CPU

      TF-Replicator, GPipe, Mesh-Tensorflow 三個(gè)庫對比

      深度神經(jīng)網(wǎng)絡(luò)訓(xùn)練中的數(shù)據(jù)并行(Data Parallelism)總結(jié)

      【新】Facebook的深度學(xué)習(xí)推薦系統(tǒng)

      https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

      https://github.com/pytorch/tutorials/blob/master/intermediate_source/model_parallel_tutorial.py

      分布式TensorFlow入門教程

      PyTorch中在反向傳播前為什么要手動(dòng)將梯度清零?

      Model.zero_grad() or optimizer.zero_grad()?

      A trick to use bigger batches for training: gradient accumulation

      Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups

      lingvo框架走讀筆記

      分布式訓(xùn)練從入門到放棄

      posted @ 2021-08-23 08:24  羅西的思考  閱讀(8888)  評論(4)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产精品成人aaaaa网站| 久久天天躁狠狠躁夜夜2020老熟妇| 美女视频黄频大全视频| 在线a级毛片无码免费真人| 国产绿帽在线视频看| 久久久久国产精品熟女影院| 国产美女免费永久无遮挡| 国产毛片三区二区一区| 亚洲av永久无码精品天堂久久| 国产不卡的一区二区三区| 国产精品久久一区二区三区| 亚洲欧美人成人让影院| 成人精品网一区二区三区| 国产精品免费中文字幕| 色综合 图片区 小说区| 国产成人精彩在线视频| 国产综合久久亚洲综合| 精品不卡一区二区三区| 国产成人综合欧美精品久久| 98久久人妻少妇激情啪啪| 一二三四免费中文字幕| 四虎精品永久在线视频| 国产精品黄色精品黄色大片| 中国性欧美videofree精品| 亚洲一区二区三区黄色片| 国产在线观看网址不卡一区| 高清自拍亚洲精品二区| 内射老阿姨1区2区3区4区| 国产怡春院无码一区二区| 成人乱码一区二区三区四区| 精品无码国产一区二区三区AV| 亚洲欧洲∨国产一区二区三区| 国产午夜福利一区二区三区| 99久久精品费精品国产一区二区 | 亚洲中文字幕精品一区二区三区| 国产AV福利第一精品| 昔阳县| 亚欧洲乱码视频在线专区| 国内精品一区二区在线观看| 国产精品v欧美精品∨日韩| 国内精品自线在拍|