<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] PyTorch 流水線并行實現 (2)--如何劃分模型

      [源碼解析] PyTorch 流水線并行實現 (2)--如何劃分模型

      0x00 摘要

      上一篇文章我們介紹了 PyTorch 流水線并行的基本知識,本文我們介紹其自動平衡機制和模型分割。

      流水線并行其他文章鏈接如下:

      [源碼解析] 深度學習流水線并行Gpipe(1)---流水線基本實現

      [源碼解析] 深度學習流水線并行GPipe (2) ----- 梯度累積

      [源碼解析] 深度學習流水線并行 GPipe(3) ----重計算

      [源碼解析] 深度學習流水線并行之PipeDream(1)--- Profile階段

      [源碼解析] 深度學習流水線并行 PipeDream(2)--- 計算分區

      [源碼解析] 深度學習流水線并行 PipeDream(3)--- 轉換模型

      [源碼解析] 深度學習流水線并行 PipeDream(4)--- 運行時引擎

      [源碼解析] 深度學習流水線并行 PipeDream(5)--- 通信模塊

      [源碼解析] 深度學習流水線并行 PipeDream(6)--- 1F1B策略

      [源碼解析] PyTorch 流水線并行實現 (1)--基礎知識

      本文圖來自論文和github源碼。

      0x01 問題

      流水線并行首先面對的問題就是:

      • 如何把一個大模型切分成若干小模型?切分的算法是什么?
      • 如何把這些小模型分配到多個設備之上?分配的算法是什么?
      • 如何做到整體性能最優或者近似最優?衡量標準是什么?

      比如一個擁有 6 個層的大模型,如何切分成三個小模型?

      +-----------------------------------------------------------------------------------------+
      |                                                                                         |
      | Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
      |                                                                                         |
      +------------------------------------------+----------------------------------------------+
                                                 |
                                                 |
                                                 | ? ? ? ? ?
                                                 |
                                                 |
                                                 v
      
        +--------------------+         +---------------------+      +--------------------+
        |Device 1            |         |Device 2             |      |Device 3            |
        |                    |         |                     |      |                    |
        |      Layer 1       |    +---------> Layer 4        |      |                    |
        |         +          |    |    |         +           |  +------->   Layer 6      |
        |         |          |    |    |         |           |  |   |                    |
        |         v          |    |    |         |           |  |   |                    |
        |      Layer 2       |    |    |         |           |  |   |                    |
        |         +          |    |    |         v           |  |   |                    |
        |         |          |    |    |      Layer 5 +---------+   |                    |
        |         v          |    |    |                     |      |                    |
        |      Layer 3  +---------+    |                     |      |                    |
        |                    |         |                     |      |                    |
        +--------------------+         +---------------------+      +--------------------+
      

      接下來,我們就看看 torchgpipe 是如何解決這些問題的。

      0x01 自動平衡

      torchgpipe提供了子模塊 torchgpipe.balance 來計算得到分區,目的是讓兩兩分區(pairwise)之間的資源差別盡量小。資源占用情況是通過分析(profile)來計算。

      1.1 Automatic Balancing

      切分模型會影響GPU的利用率,比如其中計算量較大的層會減慢下游的速度,所以需要找到一個模型的最佳平衡點。但是,確定模型的最佳平衡點是很難的,特別是,如果用戶仍在設計模型階段,則模型體系結構可能會隨著時間的推移而改變。在這種情況下,TorchPipe 強烈建議使用 torchgpipe.balance來自動平衡。這不會給用戶提供最佳的平衡,但這是一個足夠好的平衡。

      請注意,這個功能是由torchgpipe提供的,而不是來自Huang等人的GPipe 原始論文。

      torchgpipe提供了兩個平衡工具,兩者都基于每層的profile結果來使用,用戶可以根據需要選擇平衡工具。

      • ~torchgpipe.balance.balance by_time:跟蹤每層的運行時間。
      • ~torchgpipe.balance.balance by_size:檢測每層的CUDA內存使用情況。

      具體使用方式如下,用戶需要向模型中輸入一個樣本輸入。

         from torchgpipe import GPipe
         from torchgpipe.balance import balance_by_time
      
         partitions = torch.cuda.device_count()
         sample = torch.rand(128, 3, 224, 224) # 用戶需要向模型中輸入一個樣本輸入
         balance = balance_by_time(partitions, model, sample)
      
         model = GPipe(model, balance, chunks=8)
      

      1.2 基礎函數/函數

      1.2.1 Batch

      Batch 是一個基礎類,位于 torchgpipe/microbatch.py,其作用是把 tensor 或者 tensors 封裝起來做統一處理。Batch 把張量保存在自己的 value 成員變量之中。在調用 call 方法時候,就把傳入的方法應用到 value 張量之上

      比如后面我們會講到的 Pipeline.compute 方法之中會有如下,就是把 partition 應用到 batch 內的張量之上:

      def compute(batch: Batch = batch,
                                  partition: nn.Sequential = partition,
                                  skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                                  ) -> Batch:
        with use_skip_tracker(skip_tracker):
        	return batch.call(partition)
      
      	task = Task(streams[j], compute=compute, finalize=None)
      

      具體 Batch 定義如下:

      Tensors = Tuple[Tensor, ...]
      TensorOrTensors = Union[Tensor, Tensors]
      Function = Callable[[TensorOrTensors], TensorOrTensors]
      
      class Batch:
          """An abstraction of an atomic tensor or a tuple of tensors. This
          eliminates every boilerplate code to classify an atomic tensor or a tuple
          of tensors.
          ::
      
              x = generate_tensor_or_tensors()
              x = Batch(x)
      
              # in-place update
              x[0] = F.apply(x[0])
              x[:] = F.apply(*x)
      
              # f(x) if x is a tensor.
              # f(*x) if x is a tuple of tensors.
              # y is also a batch.
              y = x.call(f)
      
          """
      
          def __init__(self, value: TensorOrTensors) -> None:
              self.value = value
              self.atomic = torch.is_tensor(value)
      
          @property
          def tensor(self) -> Tensor:
              """Retrieves the underlying tensor."""
              if not self.atomic:
                  raise AttributeError('not atomic batch')
              return cast(Tensor, self.value)
      
          @property
          def tensors(self) -> Tensors:
              """Retrieves the underlying tensors."""
              if self.atomic:
                  raise AttributeError('batch is atomic')
              return cast(Tensors, self.value)
      
          @property
          def tensor_or_tensors(self) -> TensorOrTensors:
              """Retrieves the underlying tensor or tensors regardless of type."""
              return self.value
      
          def call(self, function: Function) -> 'Batch': # 這里是關鍵方法
              """Calls a function by the underlying tensor or tensors. It also wraps
              the output with :class:`Batch`.
              """
              return Batch(function(self.value)) # 調用模型的forward       
      

      1.2.2 layerwise_sandbox

      layerwise_sandbox 方法的作用是在不影響原有模型的基礎上,拷貝模型的層,這樣更容易profile。

      def layerwise_sandbox(module: nn.Sequential,
                            device: torch.device,
                            ) -> Generator[nn.Module, None, None]:
          """Copies layers for ease to profile. It doesn't modify the given
          module.
          """
          for layer in module:
              layer_copy = copy.deepcopy(layer)
              layer_copy.to(device)
              layer_copy.train()
              yield layer_copy
      

      1.2.3 detach

      detach 方法的作用是從autograd圖中detach一些張量,得到一組新的張量。這些張量從當前計算圖中被分離下來。但是仍指向原變量的存放位置。detach 可以切斷一些分支的反向傳播.。

      def detach(batch: Batch) -> None:
          """Detaches from autograd graph."""
          for i, x in enumerate(batch):
              batch[i] = x.detach().requires_grad_(x.requires_grad)
      

      torchgpipe代碼中,經常可以見到 detach 的使用,這個從注釋可以看出來,是因為 PyTorch 的一個bug 而采取的workround。

          # A Python autograd function might fail with this error:
          #
          #   RuntimeError: Returning Variables sharing storage with other Variables
          #   that require grad is not supported in Python functions. Please submit a
          #   feature request if you hit this error.
          #
          # It doesn't look like an essential restriction. But it happens on the
          # current PyTorch version. To avoid it, we should detach the tensor before
          # returning by identity autograd functions, such as Wait, Fork, and Join.
          #
      

      1.3 據計算時間來平衡

      balance_by_time 方法的作用就是依據運行時間來平衡,其中參數如下:

      • partitions :分區數目

      • module : 需要分區的順序模型

      • sample :給定 batch size 的樣本

      其實就是調用 profile_times 依據sample來得到運行時間,然后進行分區。

      def balance_by_time(partitions: int,
                          module: nn.Sequential,
                          sample: TensorOrTensors,
                          *,
                          timeout: float = 1.0,
                          device: Device = torch.device('cuda'),
                          ) -> List[int]:
          """Naive automatic balancing by elapsed time per layer.
          ::
      
              sample = torch.empty(128, 3, 224, 224)
              balance = balance_by_time(torch.cuda.device_count(), model, sample)
              gpipe = GPipe(model, balance, chunks=8)
      
          Args:
              partitions (int):
                  intended number of partitions
              module (torch.nn.Sequential):
                  sequential module to be partitioned
              sample (torch.Tensor):
                  example input with arbitrary batch size
      
          Keyword Args:
              timeout (float):
                  profiling iterates again if the timeout (in second) is not exceeded
                  (default: ``1.0``)
              device ('cpu' or 'cuda' device):
                  CPU or CUDA device where each layer is profiled (default: the
                  current CUDA device)
      
          Returns:
              A list of number of layers in each partition. Use it for the `balance`
              parameter of :class:`~torchgpipe.GPipe`.
      
          .. note::
              `module` and `sample` must be placed on the same device.
      
          """
          times = profile_times(module, sample, timeout, torch.device(device))
          return balance_cost(times, partitions)
      

      這里的 Batch 類就是對張量或者張量數組進行封裝,可以統一使用其方法。

      profile_times 依據sample來得到運行時間,具體邏輯是:

      • 遍歷模型中的層,針對每個層:
        • 等待當前設備上所有流中的所有kernel完成
        • 記錄起始運行時間
        • 對某層進行前向計算
        • 得到需要梯度的張量,如果存在,則進行后向計算
        • 等待當前設備上所有流中的所有kernel完成
        • 記錄終止時間
      • 最后返回一個每層運行時間列表。
      def profile_times(module: nn.Sequential,
                        sample: TensorOrTensors,
                        timeout: float,
                        device: torch.device,
                        ) -> List[int]:
          """Profiles elapsed times per layer."""
          if any(p.grad is not None for p in module.parameters()):
              raise ValueError('some parameter already has gradient')
      
          _batch = Batch(sample)
          for i, x in enumerate(_batch):
              _batch[i] = x.detach().to(device).requires_grad_(x.requires_grad)
      
          time_bufs: List[List[float]] = [[] for _ in module]
          begun_at = time.time()
      
          while time.time() - begun_at < timeout:
              batch = _batch
      
              # 遍歷模型中的層
              for i, layer in enumerate(layerwise_sandbox(module, device)):
                  detach(batch)
      
                  if device.type == 'cuda':
                      torch.cuda.synchronize(device) # 等待當前設備上所有流中的所有kernel完成
                  tick = time.time()# 起始運行時間
      
                  # Forward
                  batch = batch.call(layer) # 對某層進行前向計算
      
                  # Backward
                  # 得到需要梯度的張量
                  backward_tensors = tuple(y for y in batch if y.requires_grad)
                  # 進行后向計算
                  if backward_tensors:
                      torch.autograd.backward(backward_tensors, backward_tensors)
      
                  if device.type == 'cuda':
                      torch.cuda.synchronize(device) # 等待當前設備上所有流中的所有kernel完成
                  tock = time.time() # 終止時間
      
                  time_bufs[i].append(tock - tick)
      
          us = 1_000_000
          return [sum(int(t*us) for t in buf) for buf in time_bufs]
      
      

      1.4 據內存大小來平衡

      balance_by_size 方法的作用就是依據運行時內存大小來平衡,其中參數如下:

      • partitions :分區數目,從示例看,可以認為是設備數。

      • module : 需要分區的順序模型

      • sample :給定 batch size 的樣本

      其實就是調用 profile_sizes 依據sample來得到運行時內存大小,然后進行分區。

      在訓練期間,參數所需的內存取決于使用哪個優化器。優化器可以為每個參數使用緩沖區來在其內部跟蹤優化統計信息,例如SGD中的動量緩沖區。

      為了獲得更可靠的基于大小的平衡,用戶應該為優化器指定相應的“param_scale”。默認的“param_scale”是2,而不是1,這是因為梯度累積(gradient accumulation)是每個優化器所必需的。下面注釋之中也給出了一些參考取值。

      def balance_by_size(partitions: int,
                          module: nn.Sequential,
                          input: TensorOrTensors,
                          *,
                          chunks: int = 1,
                          param_scale: float = 2.0,
                          device: Device = torch.device('cuda'),
                          ) -> List[int]:
          """Naive automatic balancing by CUDA memory usage per layer.
      
          During training, required memory for parameters depends on which optimizer
          is used. Optimizers may use buffers for each parameter to track
          optimization statistics internally, such as momentum buffer in SGD.
      
          To get more reliable size based balance, you should specify `param_scale`
          with regard to your optimizer. The default `param_scale` is 2 instead of 1
          due to gradient accumulation which is necessary for every optimizer.
      
          Follow this guide to choose correct `param_scale` for typical optimizers:
      
          =========  =============  =========================================
          Optimizer  `param_scale`  Internal State
          =========  =============  =========================================
          SGD        2--3           (momentum_buffer)
          Adam       4--5           exp_avg, exp_avg_sq, (max_exp_avg_sq)
          Adadelta   4              square_avg, acc_delta
          Adagrad    3              sum
          RMSprop    3--5           square_avg, (momentum_buffer), (grad_avg)
          =========  =============  =========================================
      
          Here's a simple example with the Adam optimizer::
      
              balance = balance_by_size(
                  torch.cuda.device_count(),
                  model,
      
                  # Same size with mini-batch to train
                  torch.empty(1024, 3, 224, 224),
      
                  # Number of micro-batches to train with GPipe
                  chunks=8,
      
                  # 4 for Adam
                  param_scale=4.0,
              )
      
              gpipe = GPipe(model, balance, chunks=8)
              adam = Adam(gpipe.parameters())
      
          Args:
              partitions (int):
                  intended number of partitions
              module (torch.nn.Sequential):
                  sequential module to be partitioned
              input (torch.Tensor):
                  example mini-batch with the same size to train
      
          Keyword Args:
              chunks (int):
                  number of micro-batches will be used to train (default: ``1``)
              param_scale (float):
                  how many copies of parameters would be allocated for training. It
                  depends on optimizer. See the above guide. (default: ``2.0``)
              device ('cuda' device):
                  CUDA device where each layer is profiled (default: the current CUDA
                  device)
      
          Returns:
              A list of number of layers in each partition. Use it for the `balance`
              parameter of :class:`~torchgpipe.GPipe`.
      
          .. note::
              `module` and `input` must be placed on the same CUDA device.
      
          """
          sizes = profile_sizes(module, input, chunks, param_scale, torch.device(device))
          return balance_cost(sizes, partitions)
      
      

      profile_sizes 邏輯如下:

      • 遍歷模型中的層,針對每個層:
        • 使用 torch.cuda.memory_allocated 計算前向傳播用到的顯存,就是激活值。torch.cuda.memory_allocated(device=None) 返回給定設備device的張量所占用的當前GPU內存。
        • 使用 p.storage().size() * p.storage().element_size() 計算參數尺寸。
          • pytorch中的storage指的是連續的內存塊,而tensor可以認為是映射到storage的視圖。
          • element_size() 返回單個元素的字節。
        • 把激活值和參數加在一起,插入列表。
      • 返回內存大小列表。
      def profile_sizes(module: nn.Sequential,
                        input: TensorOrTensors,
                        chunks: int,
                        param_scale: float,
                        device: torch.device,
                        ) -> List[int]:
          """Profiles CUDA memory usage per layer."""
          if device.type != 'cuda':
              raise ValueError('size profiler supports only CUDA device')
      
          batch = Batch(input)
          sizes: List[int] = []
      
          latent_scale = batch[0].size(0) / chunks
          for i, x in enumerate(batch):
              batch[i] = x[:1].detach().to(device).requires_grad_(x.requires_grad)
      
          for layer in layerwise_sandbox(module, device):
              detach(batch)
      
              # Detect memory usage at forward.
              # 計算前向傳播用到的顯存,就是激活值
              memory_before = torch.cuda.memory_allocated(device)
              batch = batch.call(layer) # 對某層進行前向傳播
              memory_after = torch.cuda.memory_allocated(device)
              latent_size = memory_after - memory_before
      
              # Analyze size of parameters.
              # 計算參數尺寸
              param_size = sum(p.storage().size() * p.storage().element_size()
                               for p in layer.parameters())
      
              # 把激活值和參數加在一起,插入列表
              # Combine size of parameters and activations with normalize scales.
              size = latent_size*latent_scale + param_size*param_scale
              sizes.append(int(size))
      
          return sizes # 返回內存大小列表
      

      1.5 分割算法

      得到每層的計算時間或者內存大小之后,會通過如下代碼來進行具體分割。

      times = profile_times(module, sample, timeout, torch.device(device))
      return balance_cost(times, partitions)
      

      具體 balance_cost 只是一個封裝而已,算法還是 blockpartition.solve。

      def balance_cost(cost: List[int], partitions: int) -> List[int]:
          partitioned = blockpartition.solve(cost, partitions)
          return [len(p) for p in partitioned]
      

      從其注釋可知,blockpartition.solve 實現了這篇論文的算法。

      Implements "Block Partitions of Sequences" by Imre Bárány et al.Paper: https://arxiv.org/pdf/1308.2452.pdf
      

      這是一篇數學論文,其算法偽代碼如下(與后續實現中注釋基本一一對應)。

      該論文是純粹的數學論證,我們不去研究其內部機制,只是看看其運行結果。

      我們回憶一下,這里支持的模型是順序模型,所以無論時間還是內存大小,都是一個list。solve的作用就是把這個list盡量平均分配成若干組

      假設模型有6層,每層的運行時間如下,需要分配到兩個device之上,那么應該如何分割呢?

      blockpartition.solve([1, 2, 3, 4, 5, 6], partitions=2) # 就是第一層運行時間是1個單位,第二層運行時間是2個單位,依次類推。
      
      結果是 [[1, 2, 3, 4], [5, 6]],可以看到,這個6個層被比較均勻的按照運行時間分成了兩個partition
      

      如果分成三個device,則:

      solve([1, 2, 3, 4, 5, 6], partitions=3)
      
      結果是 [[1, 2, 3], [4, 5], [6]],可以看到,這個6個層被比較均勻的按照運行時間分成了三個partition
      

      然后 balance_cost 會獲得每一個 partition 的具體層數,得到balance的最終是:

      [3,2,1]
      

      分區算法具體代碼如下,有興趣的朋友可以結合論文仔細研究。

      def solve(sequence: List[int], partitions: int = 1) -> List[List[int]]:
          """Splits a sequence into several partitions to minimize variance for each
          partition.
      
          The result might not be optimal. However, it can be done only in O(kn3),
          where k is the number of partitions and n is the length of the sequence.
      
          """
          if partitions < 1:
              raise ValueError(f'partitions must be a positive integer ({partitions} < 1)')
      
          n = len(sequence)
          if n < partitions:
              raise ValueError(f'sequence is shorter than intended partitions ({n} < {partitions})')
      
          # Normalize the sequence in [0, 1].
          minimum = min(sequence)
          maximum = max(sequence) - minimum
      
          normal_sequence: List[float]
          if maximum == 0:
              normal_sequence = [0 for _ in sequence]
          else:
              normal_sequence = [(x-minimum)/maximum for x in sequence]
      
          splits = [n//partitions * (x+1) for x in range(partitions-1)] + [n]
      
          def block_size(i: int) -> float:
              start = splits[i-1] if i > 0 else 0
              stop = splits[i]
              return sum(normal_sequence[start:stop])
      
          def leaderboard() -> Iterator[Tuple[float, int]]:
              return ((block_size(i), i) for i in range(partitions))
      
          while True:
              """
              (1) Fix p ∈ [k] with M(P) = bp. So Bp is a maximal block of P.
              """
              # max_size: M(P)
              max_size, p = max(leaderboard())
      
              while True:
                  """
                  (2) If M(P) ≤ m(P) + 1, then stop.
                  """
                  # min_size: m(P)
                  min_size, q = min(leaderboard())
      
                  if max_size <= min_size + 1:
                      return [sequence[i:j] for i, j in zip([0]+splits[:-1], splits)]
      
                  """
                  (3) If M(P) > m(P) + 1, then let m(P) = bq for the q ∈ [k] which is
                  closest to p (ties broken arbitrarily). Thus Bq is a minimal block
                  of P. Let Bh be the block next to Bq between Bp and Bq. (Note that
                  Bh is a non-empty block: if it were, then m(P) = 0 and we should
                  have chosen Bh instead of Bq.)
                  """
                  if p < q:
                      """
                      So either p < q and then h = q?1 and we define P ? by moving
                      the last element from Bh = Bq?1 to Bq,
                      """
                      h = q - 1
                      splits[h] -= 1
                  else:
                      """
                      or q < p, and then h = q + 1 and P ? is obtained by moving the
                      first element of Bh = Bq+1 to Bq.
                      """
                      h = q + 1
                      splits[q] += 1
      
                  """
                  Set P = P ? . If p = h, then go to (1), else go to (2).
                  """
                  if p == h:
                      break
      

      0x02 模型劃分

      2.1 調用

      既然得到了 profile 的結果,下面就是對模型的各個層進行分割。如何分割可以參見下面注釋中的使用示例,把balance 作為參數傳遞給 GPipe構造函數。

      '''
      If your model is still under development, its optimal balance would change
      frequently. In this case, we highly recommend 'torchgpipe.balance' for naive
      automatic balancing:
      
        from torchgpipe import GPipe
        from torchgpipe.balance import balance_by_time
      
        partitions = torch.cuda.device_count()
        sample = torch.empty(...)
        balance = balance_by_time(partitions, model, sample)
      
        model = GPipe(model, balance, ...)
      '''
      

      2.2 GPipe構建

      Gpipe 的 __init__中可以看到,使用了 split_module 函數進行分割:

          def __init__(self,
                       module: nn.Sequential,
                       balance: Optional[Iterable[int]] = None,
                       *,
                       devices: Optional[Devices] = None,
                       chunks: int = chunks,
                       checkpoint: str = checkpoint,
                       spawn_workersdeferred_batch_norm: bool = False,
                       ) -> None:
              super().__init__()
      
              chunks = int(chunks)
              checkpoint = str(checkpoint)
      
              verify_module(module)
              # Verify if the underlying skippable modules satisfy integrity. The
              # integrity can be verified before forward() because it is static.
              verify_skippables(module)
      
              self.chunks = chunks
              self.checkpoint = checkpoint
      
              if deferred_batch_norm:
                  module = DeferredBatchNorm.convert_deferred_batch_norm(module, chunks)
      
              if devices is None:
                  devices = range(torch.cuda.device_count())
              devices = [torch.device(d) for d in devices]
              devices = cast(List[torch.device], devices)
      
              try:
                  # 對模型進行切分
                  self.partitions, self.balance, self.devices = split_module(module, balance, devices)
              except BalanceError as exc:
                  raise ValueError(recommend_auto_balance(str(exc)))
      
              self._copy_streams: List[List[AbstractStream]] = []
              self._skip_layout = inspect_skip_layout(self.partitions)
      

      所以我們看看 split_module 函數,其主要邏輯如下:

      • 遍歷模型包含的層
        • 把新的層加入到數組layers中
        • 如果數組大小等于balance[j],就是達到了device j應該包含的層數,則:
          • 把分區數組構建成一個sequential module,得到變量 partition。
          • 利用 partition.to(device) 把partition放置到相關設備之上,這就是前文提到的,~torchgpipe.GPipe使用CUDA進行訓練。用戶不需要自己將模塊移動到GPU,因為~torchgpipe.GPipe自動把每個分區移動到不同的設備上。
          • 把這個partition加入到分區數組中
          • 然后去下一個device看看
      • 最后返回 partitions, balance, devices。
      def split_module(module: nn.Sequential,
                       balance: Iterable[int],
                       devices: List[torch.device],
                       ) -> Tuple[List[nn.Sequential], List[int], List[torch.device]]:
          """Splits a module into multiple partitions.
      
          Returns:
              A tuple of (partitions, balance, devices).
      
              Partitions are represented as a :class:`~torch.nn.ModuleList` whose
              item is a partition. All layers in a partition are placed in the
              same device.
      
          Raises:
              BalanceError:
                  wrong balance
              IndexError:
                  the number of devices is fewer than the number of partitions.
      
          """
          balance = list(balance)
      
          j = 0
          partitions = []
          layers: NamedModules = OrderedDict()
      
          for name, layer in module.named_children(): # 遍歷模型包含的層
              layers[name] = layer # 把新的層加入到數組中
      
              if len(layers) == balance[j]: # 如果數組大小等于balance[j],就是達到了device j應該包含的層數
                  # Group buffered layers as a partition.
                  partition = nn.Sequential(layers) # 把層數組組合成一個sequential module
      
                  device = devices[j]
                  partition.to(device) # 把層放置到相關設備之上
      
                  partitions.append(partition) # 這個新module加入到分區數組中
      
                  # Prepare for the next partition.
                  layers.clear()
                  j += 1 # 去下一個device看看
      
          partitions = cast(List[nn.Sequential], nn.ModuleList(partitions))
          del devices[j:]
      
          return partitions, balance, devices
      

      結合上面例子,balance 如下:

      [3,2,1]
      

      所以前三個層 [1, 2, 3] 組合成一個module,中間兩個層 [4, 5] 組合成一個 module,最后層 [6] 是一個module。

      最后分區數組為:

      [ module([1, 2, 3]),  module([4, 5]),  module([6])]
      

      2.3 示例

      我們再具體打印輸出看看,模型包含了6個層,分為 3 個partitions,分區內的層數分別是:3個,2個,1個。

      a = nn.Linear(1, 1)
      b = nn.Linear(1, 1)
      c = nn.Linear(1, 1)
      d = nn.Linear(1, 1)
      e = nn.Linear(1, 1)
      f = nn.Linear(1, 1)
      
      balance = [3,2,1] # 分成了3個partition,第一個partition包括3個層,第2個包括2個層,第3個包括1個層。
      model = nn.Sequential(a, b, c, d, e, f)
      print(model)
      model = GPipe(model, balance, devices=['gpu', 'gpu','gpu'])
      print(model)
      

      結果如下,可以看到原模型被分成3個partition,每個 partition 都是一個Sequential。

      Sequential(
        (0): Linear(in_features=1, out_features=1, bias=True)
        (1): Linear(in_features=1, out_features=1, bias=True)
        (2): Linear(in_features=1, out_features=1, bias=True)
        (3): Linear(in_features=1, out_features=1, bias=True)
        (4): Linear(in_features=1, out_features=1, bias=True)
        (5): Linear(in_features=1, out_features=1, bias=True)
      )
      
      GPipe(
        (partitions): ModuleList(
          (0): Sequential(
            (0): Linear(in_features=1, out_features=1, bias=True)
            (1): Linear(in_features=1, out_features=1, bias=True)
            (2): Linear(in_features=1, out_features=1, bias=True)
          )
          (1): Sequential(
            (3): Linear(in_features=1, out_features=1, bias=True)
            (4): Linear(in_features=1, out_features=1, bias=True)
          )
          (2): Sequential(
            (5): Linear(in_features=1, out_features=1, bias=True)
          )
        )
      )
      

      運行時變量如下:

      model = {GPipe: 6} 
       balance = {list: 3} [3, 2, 1]
       checkpoint = {str} 'except_last'
       chunks = {int} 1
       devices = {list: 3} 
        0 = {device} gpu
        1 = {device} gpu
        2 = {device} gpu
       partitions = {ModuleList: 3} 
         _modules = 
         '0' = {Sequential: 3} 
              Sequential( 
              (0): Linear(in_features=1, out_features=1, bias=True)  
              (1): Linear(in_features=1, out_features=1, bias=True) 
              (2): Linear(in_features=1, out_features=1, bias=True))   
         '1' = {Sequential: 2} 
              Sequential(  
              (3): Linear(in_features=1, out_features=1, bias=True)  
              (4): Linear(in_features=1, out_features=1, bias=True))
         '2' = {Sequential: 1} 
              Sequential(
              (5): Linear(in_features=1, out_features=1, bias=True))
      

      需要注意一點:GPipe 的 partitions 成員變量是 nn.ModuleList 類型。nn.ModuleList是一個容器,其儲存不同 module,并自動將每個 module 的 parameters 添加到網絡中。但是nn.ModuleList 并沒有定義一個網絡,而只是將不同的模塊儲存在一起,這些模塊之間并沒有什么先后順序,網絡的執行順序是根據 forward 函數來決定的。

      隨之而來問題就是:partition內部可以用Sequential來進行一系列的前向操作,但是如何配置partitions 之間的執行順序?這個我們會在后續文章中分析。

      2.4 總結

      最后總結一下,流程是從上至下。

      1. 使用 balance_by_size 或者 balance_by_time 來先運行系統,得到 profile 結果。
      2. 然后使用 split_module 來對模型進行分割。
      3. 最后就得到了一個相對平衡的分區結果。
      4. 把這些分區分配到不同的設備之上。

      具體如下圖:

      +-----------------------------------------------------------------------------------------+
      |                                                                                         |
      | Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
      |                                                                                         |
      +--------------------------+---------------------------+----------------------------------+
                                 |                           |
                 balance_by_size | 1                       1 |  balance_by_time
                                 |                           |
                                 v                           v
                      [[1, 2, 3], [4, 5], [6]]         [[1, 2, 3, 4], [5, 6]]
                                 +                           +
                                 |                           |
                                 +-----------+      +--------+
                                             |      |
                                             v      v
                                       2  split_module
                                                +
                                                |
                                                |
         3                                      v
       +------------------------------------------------------------------------------------+
       | +--------------------+         +---------------------+      +--------------------+ |
       | |Partition 1         |         |Partition 2          |      |Partition 3         | |
       | |                    |         |                     |      |                    | |
       | |      Layer 1       |    +---------> Layer 4        |      |                    | |
       | |         +          |    |    |         +           |  +------->   Layer 6      | |
       | |         |          |    |    |         |           |  |   |                    | |
       | |         v          |    |    |         |           |  |   |                    | |
       | |      Layer 2       |    |    |         |           |  |   |                    | |
       | |         +          |    |    |         v           |  |   |                    | |
       | |         |          |    |    |      Layer 5 +---------+   |                    | |
       | |         v          |    |    |                     |      |                    | |
       | |      Layer 3  +---------+    |                     |      |                    | |
       | |                    |         |                     |      |                    | |
       | +---------+----------+         +---------+-----------+      +-----------+--------+ |
       |           |                              |                              |          |
       +------------------------------------------------------------------------------------+
                   |                              |                              |
                 4 |                            4 |                            4 |
                   v                              v                              v
         +---------+----------+         +---------+-----------+       +----------+---------+
         |                    |         |                     |       |                    |
         |    Device 1        |         |     Device 2        |       |     Device 3       |
         |                    |         |                     |       |                    |
         +--------------------+         +---------------------+       +--------------------+
      

      至此,我們分析了自動平衡機制,下一篇我們看看如何切分數據和一些運行時機制。

      0xFF 參考

      https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

      CUDA學習:基礎知識小結

      CUDA隨筆之Stream的使用

      NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT

      Accelerating Wide & Deep Recommender Inference on GPUs

      HugeCTR: High-Performance Click-Through Rate Estimation Training

      https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

      https://github.com/NVIDIA/apex/

      https://github.com/justheuristic/prefetch_generator

      https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

      https://pytorch.org/docs/stable/autograd.html

      https://pytorch.org/docs/notes/cuda.html

      https://zhuanlan.zhihu.com/p/61765561

      https://pytorch.apachen.org/docs/1.7/64.html

      https://zhidx.com/p/217999.html

      posted @ 2021-09-24 16:24  羅西的思考  閱讀(2069)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久久精品人妻一区二区三区| 亚洲国产一区二区三区久| 无码一区二区波多野结衣播放搜索 | 国产亚洲精品合集久久久久| 国产高清在线A免费视频观看| 国内精品自产拍在线播放| 亚洲精品无码高潮喷水A| 国产成人片无码视频在线观看 | 亚洲av永久一区二区| 久久一区二区中文字幕| 色综合天天综合天天综| 爽爽精品dvd蜜桃成熟时电影院| 欧美成人午夜性视频| 国内精品大秀视频日韩精品| 熟女国产精品一区二区三| 久久国产精品老人性| 97人妻精品一区二区三区| 久久国产一区二区三区| 国产免费又黄又爽又色毛| 我要看特黄特黄的亚洲黄片| 伊人久久精品无码二区麻豆| 亚洲春色在线视频| 亚洲男人第一无码av网站| 新巴尔虎右旗| 又大又紧又粉嫩18p少妇| 十四以下岁毛片带血a级| 九九热精品在线视频观看| 婷婷丁香五月深爱憿情网| 少妇人妻精品无码专区视频| 三门峡市| 成人国产精品一区二区网站公司| 国产免费播放一区二区三区| 亚洲婷婷六月的婷婷| 中国女人内谢69xxxx| 久久发布国产伦子伦精品| 国产自产视频一区二区三区| 亚洲中文字幕国产精品| 亚洲av成人一区在线| 亚洲精品韩国一区二区| 亚洲av日韩av中文高清性色| 亚洲欧美综合一区二区三区|