[PyTorch] DDP源碼閱讀
[PyTorch] DDP源碼閱讀
-
PyTorch的DistributedDataParallel (DDP) 允許多臺機器,多臺GPU之間的數據并行。本文簡單講解DDP的流程,并從代碼層面理解DDP如何訪問底層的通信框架。
-
DDP使用單機多進程來控制多個GPU。模型需要能放入單個GPU中。
DDP的用法
-
首先,創建多個進程,使用
torch.distributed.launch或者torch.multiprocessing.spawn -
然后,為每個進程分配GPU,可以使用
CUDA_VISIBLE_DEVICES或者torch.cuda.set_device(i) -
在每個進程,都需要初始化進程組
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
- 然后,將模型包裝成DDP
model = DistributedDataParallel(model, device_ids=[i], output_device=i)
- 最后,每個進程獨立運行模型
DDP概覽
參考PyTorch Distributed Overview — PyTorch Tutorials 2.6.0+cu124 documentation
- DDP依賴于PyTorch distributed communication layer (C10D) 的
ProcessGroup進行通信。 - 初始化:
- 將
state_dict()從rank0進程廣播到所有進程,保證所有進程的初始狀態相同 - 每個進程創建一個
Reducer,負責反向傳播階段的梯度同步 - 為了效率,每個
Reducer將參數分為多個桶。一旦一個桶內的所有參數都完成了反向傳播,就開始這個桶的梯度同步。 - 為了檢測每個參數是否完成了反向傳播,為每個參數都注冊一個
autograd hook。
- 將
- 前向:不需要進行進程間同步
- 反向:
backward()函數位于loss的Tensor上,這不在DDP的控制范圍。因此,DDP借助autograd hook來得知哪些參數已經完成了反向傳播。一旦一個桶內的所有參數都完成了反向傳播,Reducer就會在所有進程之間進行異步的allreduce。若所有桶都完成了反向傳播,Reducer會阻塞等待allreduce結束。 - 優化:每個進程都只優化本地的模型。因為所有進程的模型都進行過梯度同步,因此他們的優化結果也相同。
- 其他:
- 一個額外的選項是
find_unused_parameters。如果模型的反向傳播不會更新所有參數,則那些不更新的參數不會觸發autograd hook,則Reducer可能會永遠的等待這些參數。在這種情況下,用戶需要設置find_unused_parameters=True。此時,模型會在前向傳播時尋找所有未用到的參數,并標記這些參數是“已完成”的,Reducer不會等待這些參數。注意這個過程需要額外的搜索所有參數,會導致些許時間開銷。
- 一個額外的選項是
代碼解讀
DDP的主要代碼位于torch/nn/parallel/distributed.py,其中可能會用到一些其他的通信接口,但最終所有的通信都會調用ProcessGroup。
- 在
init_process_group中,會根據backend字符串,決定使用哪個process group(如ProcessGroupNCCL)。
這里只截取部分關鍵代碼
class DistributedDataParallel(Module, Joinable):
def __init__(...):
# Build parameters for reducer.
parameters, expect_sparse_gradient = self._build_params_for_reducer()
# _build_params_for_reducer大致內容:
# 找到self.module.named_modules()中,所有的module.named_parameters()
# 且param.requires_grad=True,且不屬于self.parameters_to_ignore
# 確認所有進程上的模型大小和狀態相同
# All collectives during initialization are gated by this flag.
if init_sync:
# 確認模型大小
# Verify model equivalence.
_verify_param_shape_across_processes(self.process_group, parameters)
# 同步模型參數
# Sync params and buffers. Ensures all DDP models start off at the same value.
_sync_module_states(
module=self.module,
process_group=self.process_group,
broadcast_bucket_size=self.broadcast_bucket_size,
src=0,
params_and_buffers_to_ignore=self.parameters_to_ignore,
broadcast_buffers=self.broadcast_buffers,
)
# _sync_module_states大致內容:
# 獲取所有module.named_parameters()與module.named_buffers()
# 然后調用torch/csrc/cuda/comm.cpp提供的broadcast_coalesced接口
# 在內部調用nccl::broadcast
# 創建reducer
# Builds reducer.
self._ddp_init_helper(
parameters,
expect_sparse_gradient,
param_to_name_mapping,
static_graph,
)
# _ddp_init_help內容如下:
"""
DDP init helper function to manage parameters, grad hooks, logging, and SyncBatchNorm.
Initialization helper function that does the following:
(1) bucketing the parameters for reductions
(2) resetting the bucketing states
(3) registering the grad hooks
(4) Logging construction-time DDP logging data
(5) passing a handle of DDP to SyncBatchNorm Layer
"""
# 內部調用torch/csrc/distributed/c10d/reducer.cpp創建Reducer
# 獲取每個參數的grad函數
# auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable);
# 添加autograd_hook
# grad_accumulator->add_post_hook(..., this->autograd_hook(...), ...)
# 保存將grad函數保存到grad_accumulators_
以上就是初始化部分,接下來再看一看在反向傳播中,autograd_hook是如何運作的
-
autograd_hook的主要內容是mark_variable_ready- 若當前桶已經全部完成,則調用
mark_bucket_ready mark_bucket_ready內部進行all_reduce_bucket- 調用
ProcessGroup的allreduce()- 具體地,
ProcessGroupNCCL有自己的allreduce_impl,執行nccl:all_reduce
- 具體地,
- 若當前桶已經全部完成,則調用
-
mark_variable_ready還會檢查是否所有桶都已準備好,若所有的桶都已經準備好了,則在所有參數的梯度更新完成后,調用this->finalize_backward(),其內部等待所有的桶的任務完成
另外,ProcessGroup是異步提交通訊任務的。那怎樣得知一個任務是否完成呢?
-
在
ProcessGroupNCCL創建任務時,會將其加入未完成隊列,并記錄它的事件work->ncclEndEvent_->record(ncclStream) -
在
ProcessGroupNCCL初始化時,還會創建一個ncclCommWatchdog,每隔一段時間,就檢測未完成任務隊列中的任務是否完成。 -
檢測任務完成是通過
work->ncclEndEvent_->query()判斷的,其內部最終調用cudaEventQuery()來判斷任務的事件是否完成
| 歡迎來原網站坐坐! >原文鏈接<

浙公網安備 33010602011771號