<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] PyTorch 流水線并行實現 (3)--切分數據和運行時系統

      [源碼解析] PyTorch 流水線并行實現 (3)--切分數據和運行時系統

      0x00 摘要

      前幾篇文章我們介紹了 PyTorch 流水線并行的基本知識和自動平衡機制,本文我們介紹如何切分數據和運行時系統。

      流水線并行其他文章鏈接如下:

      [源碼解析] 深度學習流水線并行Gpipe(1)---流水線基本實現

      [源碼解析] 深度學習流水線并行GPipe (2) ----- 梯度累積

      [源碼解析] 深度學習流水線并行 GPipe(3) ----重計算

      [源碼解析] 深度學習流水線并行之PipeDream(1)--- Profile階段

      [源碼解析] 深度學習流水線并行 PipeDream(2)--- 計算分區

      [源碼解析] 深度學習流水線并行 PipeDream(3)--- 轉換模型

      [源碼解析] 深度學習流水線并行 PipeDream(4)--- 運行時引擎

      [源碼解析] 深度學習流水線并行 PipeDream(5)--- 通信模塊

      [源碼解析] 深度學習流水線并行 PipeDream(6)--- 1F1B策略

      [源碼解析] PyTorch 流水線并行實現 (1)--基礎知識

      [源碼解析] PyTorch 流水線并行實現 (2)--如何劃分模型

      最后得出運行時系統如下:

      0x01 分割小批次

      我們首先看看如何把一個 mini-batch 分割為多個 micro-batches。

      1.1 使用

      從下面示例代碼可以看出來,具體使用scatter方法進行了分割。

      # Divide a mini-batch into micro-batches.
      batches = microbatch.scatter(input, self.chunks)
      
      # Run pipeline parallelism.
      pipeline = Pipeline(batches,
                                  self.partitions,
                                  self.devices,
                                  copy_streams,
                                  self._skip_layout,
                                  checkpoint_stop)
      
      pipeline.run()
      
      # Merge the micro-batches into one mini-batch.
      output = microbatch.gather(batches)
      return output
      

      1.2 PyTorch 基礎

      我們先看看 PyTorch 的一些基礎代碼。

      1.2.1 chunk

      chunk方法可以對張量分塊,返回一個張量列表,其參數是:

      • ensor :要分割的張量。
      • chunks : 分割的塊數
      • dim :沿著哪個軸分塊

      具體舉例如下:

      import numpy as np
      import torch
      
      data = torch.from_numpy(np.random.rand(3, 5))
      print(str(data))
      
      for i, data_i in enumerate(data.chunk(3, 0)): # 沿0軸分為3塊
          print(str(data_i))
          
      輸出
      tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410],
              [0.7889, 0.4480, 0.7607, 0.7903, 0.4118],
              [0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64)
      
      tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410]], dtype=torch.float64)
      tensor([[0.7889, 0.4480, 0.7607, 0.7903, 0.4118]], dtype=torch.float64)
      tensor([[0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64)
      
      

      1.2.2 cat

      cat 的用法則是把張量拼接在一起,或者把一個張量列表拼接起來。

      Z = torch.cat( (X,Y),0 )  # 按維數0拼接,就是豎著拼
      Z = torch.cat( (X,Y),1 )  # 按維數1拼接,就是橫著拼
      

      我們用示例看看:

      X = torch.ones(2, 5)
      Y = torch.ones(4, 5)
      Z = torch.cat((X, Y), 0)
      print(Z)
      

      結果是:

      tensor([[1., 1., 1., 1., 1.],
              [1., 1., 1., 1., 1.],
              [1., 1., 1., 1., 1.],
              [1., 1., 1., 1., 1.],
              [1., 1., 1., 1., 1.],
              [1., 1., 1., 1., 1.]])
      

      1.3 分割 & 聚合

      具體回到分割批次,我們來看看Scatter 代碼。

      def scatter(input: TensorOrTensors, chunks: int) -> List[Batch]:
          """Splits an input mini-batch into multiple micro-batches."""
          inputs: Iterable[TensorOrTensors]
      
          if isinstance(input, Tensor):
              inputs = input.chunk(chunks) # 如果是張量,則直接分割
          else:
              rotated: List[Tensors] = []
      
              for tensor in input: # 如果是張量數組,則遍歷
                  tensors = tensor.chunk(chunks) # 對于每一個張量進行分割
                  rotated.append(cast(Tensors, tensors)) # 分割結果映射為 Tuple list
      
              inputs = zip(*rotated) # 把 list 之中的Tuple 分別聚合
      
          return [Batch(x) for x in inputs] # 映射成 Batch 列表返回
      

      gather 方法則是把scatter的結果重新聚集起來,就是一個逆向操作。

      def gather(outputs: List[Batch]) -> TensorOrTensors:
          """Concatenates output micro-batches into a mini-batch."""
          output: TensorOrTensors
      
          if outputs[0].atomic:
              tensors = tuple(b.tensor for b in outputs)
              output = torch.cat(tensors)
          else:
              rotated = [b.tensors for b in outputs]
              output_buf = []
      
              for tensors in zip(*rotated):
                  output_buf.append(torch.cat(tensors))
      
              output = tuple(output_buf)
      
          return output
      

      1.4 剖析

      我們看看如何使用,下面代碼是把ab這個張量列表打散,分割成兩個塊。

      def test_scatter_tuple():
          ab = (torch.ones(2, 1), torch.zeros(4, 2), torch.zeros(6, 3))
      
          a, b = scatter(ab, chunks=2)
      
          assert a.tensors[0].size() == (1, 1)
          assert b.tensors[0].size() == (1, 1)
          assert a.tensors[1].size() == (2, 2)
          assert b.tensors[1].size() == (2, 2)
          assert a.tensors[2].size() == (3, 3)
          assert b.tensors[2].size() == (3, 3)
      

      我們畫個圖來看看。

          +-------------------------------------------------------------+
          | ab                                                          |
          |                                                             |
          |    +-----------+         +---------+        +----------+    |
          |    |           |         |         |        |  0 0 0   |    |
          |    |           |         |   0 0   |        |  0 0 0   |    |
          |    |     1     |         |   0 0   |        |  0 0 0   |    |
          |    |     1     |         |   0 0   |        |  0 0 0   |    |
          |    |           |         |   0 0   |        |  0 0 0   |    |
          |    |           |         |         |        |  0 0 0   |    |
          |    +-----------+         +---------+        +----------+    |
          |                                                             |
          +-------------------------------+-----------------------------+
                                          |
                                          |
                                          |
                               a, b = scatter(ab, chunks=2)
                                          |
                                          |
                                          |
                                          |
                                          |
                                          v
      
      
      +------------------------------+         +-----------------------------+
      | a                            |         |b                            |
      |  +---+  +-----+  +--------+  |         |  +---+  +-----+ +--------+  |
      |  | 1 |  | 0 0 |  | 0 0 0  |  |         |  | 1 |  | 0 0 | | 0 0 0  |  |
      |  +---+  | 0 0 |  | 0 0 0  |  |         |  +---+  | 0 0 | | 0 0 0  |  |
      |         +-----+  | 0 0 0  |  |         |         +-----+ | 0 0 0  |  |
      |                  +--------+  |         |                 +--------+  |
      +------------------------------+         +-----------------------------+
      

      使用下面的示例代碼也可以看到如何聚合。

      def test_gather_tensors():
          a = torch.zeros(1, 1)
          b = torch.zeros(1, 1)
          ab = gather([Batch(a), Batch(b)])
      
          assert ab.size() == (2, 1)
      
      
      def test_gather_tuples():
          a = (torch.zeros(1, 1), torch.zeros(2, 2))
          b = (torch.zeros(1, 1), torch.zeros(2, 2))
          ab = gather([Batch(a), Batch(b)])
      
          assert isinstance(ab, tuple)
          assert ab[0].size() == (2, 1)
          assert ab[1].size() == (4, 2)
      

      0x02 運行

      我們接下來看看運行時的一些基礎設施,具體包括 Stream,Task,Worker。

      2.1 Stream

      Stream 類是用來封裝 CUDA stream 和 CPU stream。代碼位于:torchgpipe/stream.py。

      CUDA流表示一個GPU操作隊列,即某個設備綁定的,按照順序執的核(kernel)序列。我們可以把一個流看作是GPU之上的一個任務。用戶向流的隊列上添加一系列操作,GPU會按照添加到流中的先后順序而依次執行這一系列操作。在同一個流之中,所有操作是串行序列化,因此這些操作永遠不會并行。因此,要想并行,兩個操作必須位于不同的 stream 中。不同流中的核函數可以交錯,甚至可能重疊。

      class CPUStreamType:
          pass
      
      # The placeholder on place of streams for the CPU device instead of CUDA.
      CPUStream = CPUStreamType()
      
      # It represents both CUDA streams and the CPU stream.
      AbstractStream = Union[torch.cuda.Stream, CPUStreamType]
      

      本文用到的相關操作為 use_stream。

      torch.cuda.stream(stream) 的作用是選擇給定流的上下文管理器。

      @contextmanager
      def use_stream(stream: AbstractStream) -> Generator[None, None, None]:
          """:func:`torch.cuda.stream` for either CPU or CUDA stream."""
          if not is_cuda(stream):
              yield
              return
      
          with torch.cuda.stream(as_cuda(stream)):
              yield
              
      def is_cuda(stream: AbstractStream) -> bool:
          """Returns ``True`` if the given stream is a valid CUDA stream."""
          return stream is not CPUStream
      
      
      def as_cuda(stream: AbstractStream) -> torch.cuda.Stream:
          """Casts the given stream as :class:`torch.cuda.Stream`."""
          return cast(torch.cuda.Stream, stream)        
      

      2.2 Task

      Task 表示如何在一個分區上計算微批次數據(micro-batch)。它由兩部分組成:

      • compute應在工作線程中并發執行。
      • finalize應在工作線程完成后執行。

      可以理解為一個業務處理邏輯。如果有安卓經驗的同學,可以理解為類似于 業務Message。其實 Android message也叫task,其封裝了本任務攜帶的信息和處理該任務的handler。

      這里的 Task 也是類似的,在構建Task 時候,就傳入了 compute 方法和finalize方法,舉例如下:

      task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
      

      或者如下:

      def compute(batch: Batch = batch,
                  partition: nn.Sequential = partition,
                  skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                  ) -> Batch:
          with use_skip_tracker(skip_tracker):
              return batch.call(partition)
      
      task = Task(streams[j], compute=compute, finalize=None)
      

      具體Task定義如下,Task是綁定在 Stream 之上,即可以運行在任何device之上,這就用到了上一節的內容。

      class Task:
          """A task represents how to compute a micro-batch on a partition.
      
          It consists of two parts: :meth:`compute` and :meth:`finalize`.
          :meth:`compute` should be executed in worker threads concurrently.
          :meth:`finalize` should be executed after when worker threads complete to
          execute :meth:`compute`.
      
          :meth:`compute` might be boosted by worker threads. Because it produces
          several CUDA API calls by user code. In PyTorch, parallel CUDA API calls
          are not serialized through GIL. So more than one CUDA API call can be
          produced at the same time.
          """
      
          def __init__(self,
                       stream: AbstractStream,
                       *,
                       compute: Callable[[], Batch],
                       finalize: Optional[Callable[[Batch], None]],
                       ) -> None:
              self.stream = stream
              self._compute = compute
              self._finalize = finalize
      
          def compute(self) -> Batch:
              with use_stream(self.stream): # 綁定在stream之上
                  return self._compute() # 調用傳入的業務代碼
      
          def finalize(self, batch: Batch) -> None:
              if self._finalize is None:
                  return
              with use_stream(self.stream): # 綁定在stream之上
                  self._finalize(batch)  # 調用傳入的業務代碼
      

      2.3 Worker

      worker是用來運行task的,每個 device 有一個 worker 來負責執行這個 device 上的 task。如果有安卓經驗的同學,可以理解為是 Looper。

      需要注意,worker只是一個函數,如果運行,還需要一個線程作為寄托。這就是后續 spawn_workers 的工作。

      def worker(in_queue: InQueue,
                 out_queue: OutQueue,
                 device: torch.device,
                 grad_mode: bool,
                 ) -> None:
          """The main loop of a worker thread."""
          torch.set_grad_enabled(grad_mode)
      
          with use_device(device):
              while True:
                  task = in_queue.get() # 從輸入隊列中獲取task
      
                  if task is None:
                      break
      
                  try:
                      batch = task.compute() # 計算task
                  except Exception:
                      exc_info = cast(ExcInfo, sys.exc_info())
                      out_queue.put((False, exc_info))
                      continue
      
                  out_queue.put((True, (task, batch))) # 把task和計算結果放到輸出隊列
      
          done = (False, None)
          out_queue.put(done)
      

      2.4 生成 worker

      這里使用了 @contextmanager 注解,這是實現了上下文管理協議的對象,主要用于保存和恢復各種全局狀態,關閉文件等,并為try...except...finally提供了一個方便使用的封裝。

      spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函數是 worker。

      spawn_workers 不止生成了若干 workers,也生成了一對消息隊列 (in_queues, out_queues) ,這個 (in_queues, out_queues) 在Pipeline 生命周期之內全程都存在,具體來說是:

      • spawn_workers 內部會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是串行來執行業務操作。
      in_queue, out_queue = workers[device]
      
      • 這些 queues 被添加到 (in_queues, out_queues) 之中。
      in_queues.append(in_queue)
      out_queues.append(out_queue)
      
      • 之后就是使用 (in_queues, out_queues) 作為各個task 之間傳遞信息的上下文。

      • in_queues 里面的順序就是 device 的順序,也就是partition的順序。out_queues 亦然。

      具體代碼如下:

      @contextmanager
      def spawn_workers(devices: List[torch.device],
                        ) -> Generator[Tuple[List[InQueue], List[OutQueue]], None, None]:
          """Spawns worker threads. A worker thread is bound to a device."""
          in_queues: List[InQueue] = []
          out_queues: List[OutQueue] = []
      
          # Spawn workers.
          workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {}
      
          def normalize_device(device: torch.device) -> torch.device:
              if device.type == 'cuda' and device.index is None:
                  return torch.device('cuda', index=torch.cuda.current_device())
      
              if device.type == 'cpu' and device.index is not None:
                  return torch.device('cpu')
      
              return device
      
          for device in devices:
              device = normalize_device(device) # 得到使用的設備
      
              try:
                  in_queue, out_queue = workers[device] # 臨時放置queue
              except KeyError: # 如果 device 還沒有生成對應的queues,則生成
                  in_queue = Queue() # 生成新的queue
                  out_queue = Queue()
                  
                  # 取出queue
                  workers[device] = (in_queue, out_queue) # 賦值給workers
      
                  t = Thread(
                      target=worker, # Thread的執行程序是 worker 函數
                      args=(in_queue, out_queue, device, torch.is_grad_enabled()),
                      daemon=True,
                  )
                  t.start() # 啟動工作線程
      
              in_queues.append(in_queue) # 插入queue
              out_queues.append(out_queue) # 插入queue
      
          try:
              yield (in_queues, out_queues) # 返回給調用者
          finally:
              # Close workers.
              for in_queue in set(in_queues):
                  in_queue.put(None)
      
              # Join running workers.
              running = set(out_queues)
              while running:
                  out_queue = running.pop()
                  ok, payload = out_queue.get()
      
                  done = (False, None)
                  if (ok, payload) == done:
                      continue
      
                  running.add(out_queue)
      

      2.5 使用

      2.5.1 何時生成worker

      使用例子位于 torchgpipe/pipeline.py,在 Pipeline 類之中的 run 函數中會生成workers。我們可以看到,對于 Pipeline 來說,有意義的就是 (in_queues, out_queues)。

          def run(self) -> None:
              """Runs pipeline parallelism.
      
              It modifies the given batches in place.
      
              """
              batches = self.batches
              partitions = self.partitions
              devices = self.devices
              skip_layout = self.skip_layout
      
              m = len(batches)
              n = len(partitions)
      
              skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches]
      
              with spawn_workers(devices) as (in_queues, out_queues): # 生成 workers,并且得到隊列
                  for schedule in clock_cycles(m, n): # 這里是按照算法有次序的運行多個fence, compute
                      self.fence(schedule, skip_trackers)
                      # 把隊列傳遞進去
                      self.compute(schedule, skip_trackers, in_queues, out_queues)
      

      2.5.2 剖析

      Torchgpipe 使用了 Python 的 Queue 數據結構。

      Queue 類實現了一個基本的先進先出(FIFO)容器。

      A multi-producer, multi-consumer queue.
      

      其主要方法是:

      • Queue.get([block, [timeout]]) 讀隊列,從隊列尾部移除元素,timeout為等待時間,如果隊列滿,則阻塞。
      • Queue.put(item, [block, [timeout]]) 寫隊列,將元素添加到序列尾端,timeout為等待時間,如果隊列空,則阻塞。

      我個人更習慣于把 (in_queues, out_queues) 理解為類似 Linux 的 管道(Pipe)。

      Linux 管道是一種最基本的IPC機制,作用于有血緣關系的進程之間,完成數據傳遞,具體特性如下:

      • 管道是由核函數管理的一個FIFO文件,其實是一個緩沖區,相當于我們放入內存中的一個管道,兩個進程分別處于管道兩端,通過這個管道來傳遞信息。
      • 管道的一端連接一個進程的輸出。這個進程會向管道中放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另一端的進程取出信息。
      • 管道的另一端連接另一個進程的輸入,這個進程取出被放入管道的信息。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另一端的進程放入信息。

      具體回到 TorchPipe,我們提前看看論文的內容:

      對于這種細粒度的順序控制,torchgpipe把checkpointing 使用兩個單獨的autograd函數Checkpoint和Recompute來實現。在任務 \(F^{'}_{i,j}\) 的執行時間之內,生成一對具有共享內存的Checkpoint和Recompute。該共享內存在向后傳播中被使用,用于將通過執行Recompute生成的本地計算圖傳輸到Checkpoint來進行反向傳播。

      于是,這里就有很多并行處理的需求,于是我們可以看到 Pipeline 類的 compute 方法(省略部分代碼)中有向 in_queues 之中放入 Task,從 out_queues 之中去除 Task 的執行結果。

          def compute(self,
                      schedule: List[Tuple[int, int]],
                      skip_trackers: List[SkipTrackerThroughPotals],
                      in_queues: List[InQueue],
                      out_queues: List[OutQueue],
                      ) -> None:
      
      
              # With checkpointing, the autograd graph looks like this diagram:
              # ┌─────┸──────┐
              # │    Copy    │
              # └─────┰──────┘   (fence)
              # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
              #       ┃          (compute)
              # ┌─────┸──────┐
              # │    Wait    │ [1] Synchronize the current stream with the copy stream.
              # └─────┰──────┘
              # ┌─────┸──────┐
              # │ Checkpoint │ [2] Compute a partition within checkpointing.
              # └─────┰──────┘
              # ┌─────┸──────┐
              # │    Wait    │ [3] Synchronize the copy stream with the current stream.
              # └─────┰──────┘
              #       ┠ ─ ─ ─ ┐
              #       ┃ ┌─────┴─────┐
              #       ┃ │ Recompute │ [4] Schedule the recomputation at backpropagation.
              #       ┃ └─────┬─────┘
              #       ┠ ─ ─ ─ ┘
              #       ┃
              # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
              # ┌─────┸──────┐   (fence)
              # │    Copy    │
              # └─────┰──────┘
              for i, j in schedule: # 并行執行
                  batch = batches[i]
                  partition = partitions[j]
      
                  # Synchronize with the copied input. ([1] in the diagram)
      
                  # Determine whether checkpointing or not.
                  if checkpoint:
                      def function(input: TensorOrTensors,
                                   partition: nn.Sequential = partition,
                                   skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                                   ) -> TensorOrTensors:
                          with use_skip_tracker(skip_tracker):
                              return partition(input)
      
                      chk = Checkpointing(function, batch)
                      # 生成一個Task
                      task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
                      del function, chk
      
                  else:
                      def compute(batch: Batch = batch,
                                  partition: nn.Sequential = partition,
                                  skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                                  ) -> Batch:
                          with use_skip_tracker(skip_tracker):
                              return batch.call(partition)
      								# 生成一個Task
                      task = Task(streams[j], compute=compute, finalize=None)
                      del compute   
                      
                  # Compute tasks in parallel. ([2] in the diagram)
                  in_queues[j].put(task) # 給第j個partition放入一個新的task。因為 i, j 已經在clock算法中設定了,所以前向傳播就是按照這個來走的。
      
              for i, j in schedule:
                  ok, payload = out_queues[j].get() # 取出第j個partition的運行結果
                  # .......
                  
              # 省略后續代碼
      

      2.6 總結

      我們總結梳理一下大致業務邏輯(后文還會細化):

      1. 系統調用 spawn_workers 來生成若干 workers。
      2. spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函數是 worker。spawn_workers 內部也會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是串行來執行業務操作。
      3. 這些 queues 被添加到 (in_queues, out_queues) 之中。然后把 (in_queues, out_queues) 返回給 Pipeline 主線程。之后就是使用 (in_queues, out_queues) 作為各個task 之間傳遞信息的上下文。
      4. Pipeline 主線程得到 (in_queues, out_queues) 之后,如果要通過 compute 方法運行一個Task,就找到其device對應的in_queue,把Task插進去。
      5. Worker Thread 阻塞在 in_queue 之上,如果發現有內容,就讀取 Task,運行Task。
      6. Worker Thread 把運行結果插入到 out_queue之中。
      7. Pipeline 的 compute 方法會取出 out_queue 之中的運行結果,進行后續處理。

      如下圖所示:

                                 +-------------------------------------------------------------------------+
                                 |                                1                                        |
                                 |     +--------------------------------------------------------------+    |
                                 |     |               3   (in_queues, out_queues)                    |    |
                                 |     v                                                              |    v
      +--------------------------------+---------+                                             +------+----+-----------------------------------------------------------------------+
      | Pipeline                 |               |                                             | spawn_workers                                                                     |
      |                          |               |                                             |                                                                                   |
      |                          |               |                                             | +-------------------------------------+                                           |
      |                          |               |                                             | | workers                             |                                           |
      |                          |               |                                             | |                                     |     t = Thread(                           |
      |                          +               |                                             | |                                     |       target=worker,                      |
      |                 spawn_workers(devices)   |                                             | |  device 1 : in_queue 1, out_queue 1 |       args=(in_queue, out_queue, device), |
      |                                          |                                             | |                                     |       daemon=True,                        |
      |                                          |                                             | |  device 2 : in_queue 2, out_queue 2 |     )                                     |
      | +--------------------------------------+ |                                             | |                                     |     t.start()                             |
      | | compute                              | |                                             | |  device 3 : in_queue 3, out_queue 3 |          +                                |
      | |                                      | |                                             | |                                     |          |                                |
      | |                                      | |    4                                        | |                                     |          |                                |
      | |  in_queues[j].put(task)  +-----------------------+                                   | +-------------------------------------+          |                                |
      | |                                      | |         |                                   +-----------------------------------------------------------------------------------+
      | |                                      | |         |                                                                                      | 2
      | |  ok, payload = out_queues[j].get()<--------+     |         +---------------------+                                                      |
      | |                                      | |   |     |         | in_queues           |                                                      v
      | +--------------------------------------+ |   |     |         |                     |
      |                                          |   |     +------------> in_queue 1 +--------+          +---------------------------------------------------------------------+
      +------------------------------------------+   |               |    in_queue 2       |  |          | Thread                                                              |
                                                     |               |    in_queue 3       |  |          |                                                                     |
                                                     |               |                     |  | 5        |    +------------------------------------------------------------+   |
                                                     | 7             +---------------------+  |          |    | Worker                                                     |   |
                                                     |               +---------------------+  |          |    |                                                            |   |
                                                     |               | out_queues          |  |          |    |        device 1      task = in_queue.get()                 |   |
                                                     |               |                     |  |  task    |    |                                                            |   |
                                                     +------------------+ out_queue 1 <--+ |  +----------------------> in_queue 1    batch = task.compute()                |   |
                                            (True, (task,,batch))    |    out_queue 2    | |             |    |                                                            |   |
                                                                     |    out_queue 3    +---------------------------+ out_queue 1   out_queue.put((True, (task, batch)))  |   |
                                                                     |                     |      6      |    |                                                            |   |
                                                                     +---------------------+             |    +------------------------------------------------------------+   |
                                                                                                         +---------------------------------------------------------------------+
      
      

      手機如下:

      至此,我們分析了如何切分數據和一些運行時機制,下一篇我們結合論文看看具體實現。

      0xFF 參考

      Markdown公式用法大全

      markdown中公式編輯教程

      https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

      CUDA學習:基礎知識小結

      CUDA隨筆之Stream的使用

      NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT

      Accelerating Wide & Deep Recommender Inference on GPUs

      HugeCTR: High-Performance Click-Through Rate Estimation Training

      https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

      https://github.com/NVIDIA/apex/

      https://github.com/justheuristic/prefetch_generator

      https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

      https://pytorch.org/docs/stable/autograd.html

      https://pytorch.org/docs/notes/cuda.html

      https://zhuanlan.zhihu.com/p/61765561

      https://pytorch.apachen.org/docs/1.7/64.html

      https://zhidx.com/p/217999.html

      posted @ 2021-09-26 20:38  羅西的思考  閱讀(1628)  評論(2)    收藏  舉報
      主站蜘蛛池模板: 亚洲欧美日韩精品久久亚洲区色播| 免费无码中文字幕A级毛片| 麻豆精品在线| 色老99久久精品偷偷鲁| 亚洲精品欧美综合二区| 麻豆亚州无矿码专区视频| 亚洲最大国产成人综合网站| 好男人社区影视在线WWW| 欧美牲交a欧美在线| 中文无码乱人伦中文视频在线| 国产亚洲欧洲AⅤ综合一区| 国产精品一区二区不卡91| 久久AV中文综合一区二区| 亚洲乱码精品久久久久..| 成人啪精品视频网站午夜| 亚洲日韩av无码| 伊人久久大香线蕉AV网禁呦| 久久精品蜜芽亚洲国产av| 黑人巨大AV在线播放无码| 亚洲一区av在线观看| 亚洲人成人无码网WWW电影首页 | 99国产精品欧美一区二区三区| 精品av无码国产一区二区| 亚洲激情视频一区二区三区| 公喝错春药让我高潮| 丁香婷婷色综合激情五月| 亚洲sm另类一区二区三区| 高清国产一区二区无遮挡| 尹人香蕉久久99天天拍| 拉孜县| 久久午夜无码鲁丝片直播午夜精品| 精品无码人妻一区二区三区| 久章草在线毛片视频播放 | 粗壮挺进邻居人妻无码| 中文字幕色偷偷人妻久久| 无码人妻黑人中文字幕| av天堂久久精品影音先锋| 运城市| 国产午夜福利在线视频| 少妇人妻偷人精品免费| 国产精品一区二区久久不卡|