<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      PyTorch之分布式操作Barrier

      PyTorch之分布式操作Barrier

      原始文檔:https://www.yuque.com/lart/ugkv9f/gy7sva

      關于 barrier 的概念

      關于 barrier 這個概念可以參考 Wiki 中的介紹:同步屏障(Barrier)是并行計算中的一種同步方法。對于一群進程或線程,程序中的一個同步屏障意味著任何線程/進程執行到此后必須等待,直到所有線程/進程都到達此點才可繼續執行下文。

      這里要注意,barrier 這一方法并不是 pytorch 獨有的,這是并行計算中的一個基本概念,其他的并行計算的場景下也可能會涉及這一概念和操作。本文主要討論 pytorch 中的情況。

      torch.distributed.barrier(group=None, async_op=False, device_ids=None)
      
      Synchronizes all processes.
      
      This collective blocks processes until the whole group enters this function, if async_op is False, or if async work handle is called on wait().
      
      Parameters
      group (ProcessGroup, optional) – The process group to work on. If None, the default process group will be used.
      async_op (bool, optional) – Whether this op should be an async op
      device_ids ([int], optional) – List of device/GPU ids. Valid only for NCCL backend.
      
      Returns
      Async work handle, if async_op is set to True. None, if not async_op or if not part of the group
      

      在多卡訓練的時候,由于不同的 GPU 往往被設定在不同的進程中,有時候為了在單獨的進程中執行一些任務,但是又同時希望限制其他進程的執行進度,就有了使用barrier的需求。
      一個實際的場景是準備數據集:我們只需要在 0 號進程處理,其他進程沒必要也執行這一任務,但是其他進程的后續工作卻依賴準備好的數據。于是就需要在 0 號進程執行過程中阻塞其他的進程,使其進入等待狀態。等到處理好之后,再一起放行。

      這種需求下,一個典型的基于上下文管理器形式的構造如下:

      # https://github.com/ultralytics/yolov5/blob/7d56d451241e94cd9dbe4fcb9bfba0e92c6e0e23/utils/torch_utils.py#L29-L38
      
      @contextmanager
      def torch_distributed_zero_first(local_rank: int):
          """
          Decorator to make all processes in distributed training
          wait for each local_master to do something.
          """
          if local_rank not in [-1, 0]:
              dist.barrier(device_ids=[local_rank])
          yield
          if local_rank == 0:
              dist.barrier(device_ids=[0])
      

      關于 barrier 的細節

      # -*- coding: utf-8 -*-
      
      import os
      import time
      
      import torch.distributed as dist
      import torch.multiprocessing as mp
      
      
      def ddp_test_v0(local_rank, word_size):
          # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
          dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)
      
          print("first before barrier{}\n".format(local_rank))
          if local_rank != 0:
              dist.barrier()
          print("first after barrier{}\n".format(local_rank))
      
          print("inter {}".format(local_rank))
      
          print("second before barrier{}\n".format(local_rank))
          if local_rank == 0:
              dist.barrier()
          print("second after barrier{}\n".format(local_rank))
      
          print("{} exit".format(local_rank))
      
      
      def ddp_test_v1(local_rank, word_size):
          # Initializes the distributed backend which will take care of synchronizing nodes/GPUs
          dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)
      
          if local_rank != 0:
              print("1 before barrier{}\n".format(local_rank))
              start = time.time()
              time.sleep(5)
              dist.barrier()
              print(time.time() - start)
              print("1 after barrier{}\n".format(local_rank))
              dist.barrier()
              print("1 after barrier{}\n".format(local_rank))
          else:
              print("0 before barrier{}\n".format(local_rank))
              start = time.time()
              dist.barrier()
              print(time.time() - start)
              print("0 after barrier{}\n".format(local_rank))
              print("0 after barrier{}\n".format(local_rank))
              dist.barrier()
              print("0 after barrier{}\n".format(local_rank))
      
          print("{} exit".format(local_rank))
      
      
      def main():
          world_size = 2
          os.environ["MASTER_ADDR"] = "127.0.0.1"
          os.environ["MASTER_PORT"] = "29500"
          mp.spawn(ddp_test_v0, args=(world_size,), nprocs=world_size, join=True)
      
      
      if __name__ == "__main__":
          main()
      

      這里展示了兩個例子,實際上在官方展示的 dist.barrier  之外顯示了該方法的一個重要特性,就是其操作實際上是每一個進程內部都需要對應的執行同樣的次數,才會對應的由阻塞變為正常運行。
      先看第一個例子:

      def ddp_test(local_rank, word_size):
          # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
          dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)
      
          print("first before barrier{}\n".format(local_rank))
          if local_rank != 0:
              dist.barrier()
          print("first after barrier{}\n".format(local_rank))
      
          print("inter {}".format(local_rank))
      
          print("second before barrier{}\n".format(local_rank))
          if local_rank == 0:
              dist.barrier()
          print("second after barrier{}\n".format(local_rank))
      
          print("{} exit".format(local_rank))
      

      其輸出是:

      first before barrier1
      first before barrier0
      
      
      first after barrier0
      
      inter 0
      second before barrier0
      
      second after barrier0
      
      0 exit
      first after barrier1
      
      inter 1
      second before barrier1
      
      second after barrier1
      
      1 exit
      
      Process finished with exit code 0
      

      可以看到,有幾個細節:

      • barrier  之前,所有的操作都是各 GPU 進程自己輸出自己的。
        • 由于 local_rank=0  執行到自己可見的 barrier  中間會輸出多個,而 local_rank=1  則只有一條 first before barrier1 。
      • second before barrier0  之后,0 號執行到了屬于自己的 barrier ,這回讓使得其他進程不再阻塞,開始正常運行。由于中間操作的時間,所以先是 0 號輸出自己的 second after barrier0  并隨之退出,之后 1 號也接著開始輸出自己的結果。

      這里有一點值得注意,不同進程的 barrier  實際上是互相對應的,必須所有進程都執行一次barrier,才會重新放行正常前進。
      對于第二段代碼:

      def ddp_test_v1(local_rank, word_size):
          # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
          dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)
      
          if local_rank != 0:
              print("1 before barrier{}\n".format(local_rank))
              start = time.time()
              time.sleep(5)
              dist.barrier()
              print(time.time() - start)
              print("1 after barrier{}\n".format(local_rank))
              dist.barrier()
              print("1 after barrier{}\n".format(local_rank))
          else:
              print("0 before barrier{}\n".format(local_rank))
              start = time.time()
              dist.barrier()
              print(time.time() - start)
              print("0 after barrier{}\n".format(local_rank))
              print("0 after barrier{}\n".format(local_rank))
              dist.barrier()
              print("0 after barrier{}\n".format(local_rank))
      
          print("{} exit".format(local_rank))
      

      則是有輸出:

      1 before barrier1
      0 before barrier0
      
      
      5.002117395401001
      5.0021262168884281 after barrier1
      
      
      0 after barrier0
      
      0 after barrier0
      
      0 after barrier0
      
      0 exit
      1 after barrier1
      
      1 exit
      
      Process finished with exit code 0
      

      可以看到一個重要的點,就是這兩處 print(time.time() - start)  的輸出是基本一樣的,不管前面延時多少, barrier  后面的時間都是按照最長到達并執行 barrier  的間隔時間來的。這個更體現了不同進程 barrier  之間的互相限制關系。而 0 到達自己的第二個 barrier  之后,會使得 1 號再次運行。但是此時 0 是先結束的。
      另外,可以驗證,如果某個編號對應的代碼中的兩個 barrier  之中的一個,那么另一個就會陷入無限等待之中。
      例如:

      
      def ddp_test_v1(local_rank, word_size):
          # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
          dist.init_process_group(backend="nccl", world_size=word_size, rank=local_rank)
      
          if local_rank != 0:
              print("1 before barrier{}\n".format(local_rank))
              start = time.time()
              time.sleep(5)
              dist.barrier()
              print(time.time() - start)
              print("1 after barrier{}\n".format(local_rank))
              # dist.barrier()
              print("1 after barrier{}\n".format(local_rank))
          else:
              print("0 before barrier{}\n".format(local_rank))
              start = time.time()
              time.sleep(3)
              dist.barrier()
              print(time.time() - start)
              print("0 after barrier{}\n".format(local_rank))
              print("0 after barrier{}\n".format(local_rank))
              dist.barrier()
              print("0 after barrier{}\n".format(local_rank))
      
          print("{} exit".format(local_rank))
      

      輸出:

      0 before barrier0
      1 before barrier1
      
      
      5.002458572387695
      1 after barrier1
      
      1 after barrier1
      
      1 exit
      5.002473831176758
      0 after barrier0
      
      0 after barrier0
      
      Traceback (most recent call last):
        File "/home/lart/Coding/SODBetterProj/tools/dist_experiment_test.py", line 67, in <module>
          main()
        File "/home/lart/Coding/SODBetterProj/tools/dist_experiment_test.py", line 63, in main
          mp.spawn(ddp_test_v1, args=(world_size,), nprocs=world_size, join=True)
        File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 199, in spawn
          return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
        File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 157, in start_processes
          while not context.join():
        File "/home/lart/miniconda3/envs/pt17/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 75, in join
          ready = multiprocessing.connection.wait(
        File "/home/lart/miniconda3/envs/pt17/lib/python3.8/multiprocessing/connection.py", line 931, in wait
          ready = selector.select(timeout)
        File "/home/lart/miniconda3/envs/pt17/lib/python3.8/selectors.py", line 415, in select
          fd_event_list = self._selector.poll(timeout)
      KeyboardInterrupt
      
      Process finished with exit code 137 (interrupted by signal 9: SIGKILL)
      

      會在第二個 barrier  處無限等待下去。
      這一特點在這個回答中也被提到了:

      when a process encounters a barrier it will block the position of the barrier is not important (not all processes have to enter the same if-statement, for instance) a process is blocked by a barrier until all processes have encountered a barrier, upon which the barrier is lifted for all processes

      https://stackoverflow.com/a/59766443

      重要的參考資料

      posted @ 2022-01-16 13:05  lart  閱讀(850)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲日本韩国欧美云霸高清| 在线免费不卡视频| 免费看美女被靠到爽的视频| 熟妇人妻无码中文字幕老熟妇| 日本一本无道码日韩精品| av无码小缝喷白浆在线观看| 婷婷色爱区综合五月激情韩国| 久久夜色撩人精品国产小说 | 熟女系列丰满熟妇AV| 亚洲色大成网站WWW尤物| 亚洲人成色77777在线观看| 另类国产精品一区二区| 天干天干天啪啪夜爽爽99| 国产一区二区在线影院| 亚洲aⅴ无码专区在线观看q | 国产露脸无套对白在线播放| 激情动态图亚洲区域激情| 少妇人妻系列无码专区视频| 丝袜美腿诱惑之亚洲综合网| 欧美人与性动交α欧美精品| 国产欧亚州美日韩综合区| 国产午夜三级一区二区三| a级黑人大硬长爽猛出猛进| 国产成人亚洲欧美二区综合| 人妻激情文学| 国99久9在线 | 免费| 国产精品亚洲二区在线播放| 北京市| 亚洲最大av一区二区| 中文字幕精品人妻丝袜| 国产精品久久无中文字幕| 人妻中文字幕精品一页| 国产综合有码无码中文字幕| 国产在线视频精品视频| 香港特级三A毛片免费观看| 亚洲AV日韩AV激情亚洲| 中文字幕亚洲一区二区va在线| 在线亚洲妇色中文色综合| 亚洲精品一区二区口爆| 国产欧美综合在线观看第十页| 夜夜嗨久久人成在日日夜夜|