[megatron代碼閱讀] 2. TP和PP實現
訓練并行實現
TensorParallel
張量并行代碼路徑, 代碼路徑: megatron/core/tensor_parallel
主要包含Linear / VocabEmbedding / cross_entropy 三部分.
Linear
參數初始化
如果是從checkpoint熱啟, perform_initialization需要打開這個配置
1.set_tensor_model_parallel_attributes: 設置weight的三個屬性: is_parallel / partition_dim / stride
2.調用傳入的init_method, 初始化weight. 這里注意要使用同一個隨機種子. 如果是expert網絡, 每個expert要用自己獨立的rng_tracker
3.如果啟用expert_parallel, 設置allreduce屬性為false. 否則為true
列并行
sequence_parallel回憶, 為了節省顯存,拆分了layernorm后的激活存儲. 在進入TP前通過allGather獲取到完整的激活, 經過TP后再通過reduceScatter分離到各張卡.
grad_accumulation_fusion
代碼: https://github.com/NVIDIA/apex/blob/master/csrc/megatron/fused_weight_gradient_dense_cuda.cu
主要作用是在顯存受限,無法一次性更新大batch_size的時候, 通過mini-batch來累加多個小批量的梯度到weight.main_grad, 這里fused意思就是在main_grad上原地更新, 最后在用main_grad來更新大batch里的weight.
LinearWithGradAccumulationAndAsyncCommunication
forward:
- 輸入: 如果開sp, torch.distributed.all_gather_into_tensor input
- Matmul(input, weight) + bias
backward:
- wgrad_deferral_limit: TODO: 沒弄懂用于降低pipeline flush延遲的含義
- 如果開sp, grad_input bp對應的集合通信操作是dist_reduce_scatter_func
- 沒開sp, grad_input bp對應的是all_reduce_func
- 如果開了grad_accumulation_fusion & sp, 需要先all_gather input, 也就是X, 因為求grad_weight的時候需要matmul input的轉置.
- 這里的異步優化指的是先進行 grad_input的異步集合通信, 在此同時計算grad_weight, 算完grad_weight后再等grad_input通信完成, 這樣就能overlap一部分通信耗時.
行并行
forward
- 開sp, input不做處理,因為直接輸入切分好的input, 經過線性層后ouput reduceScatter到對應的節點
- 關sp, input需要先進行ReduceScatter對輸入x做切分, 經過線性層后output allReduce結果.
Backward: 沒有集合通信
PipelineParallel
核心配置參數有兩個:
-
pipeline_model_parallel_size: pp切分數, transformer_layer實際被切分為多少個group -
virtual_pipeline_model_parallel_size: 舉例 tensor_model_parallel_size=1, pipeline_model_parallel_size=4, virtual_pipeline_model_parallel_size=2, 一共有16個transform_layer的情況下, 模型被切分為:GPU 0: [1, 2] [9, 10] GPU 1: [3, 4] [11, 12] GPU 2: [5, 6] [13, 14] GPU 3: [7, 8] [15, 16]一共8個stage, 每個stage有2個layer. PP原理回憶
PP代碼邏輯位置 megatron/core/pipeline_parallel
train_step->get_forward_backward_func->forward_backward_pipelining_with_interleaving
P2P通信
有兩種方式 batch_isend_irecv 與 _p2p_ops, 后者即send和recv獨立作為一個通信操作
batch_isend_irecv: 將send_prev/recv_prev/send_next/recv_next可以異步并發執行
p2p通信步驟:
- 傳輸tensor_shape, int64類型 (類似sequence壓縮通信方式, 先傳長度)
- 對所有的pp group進行遍歷, 如果需要recv_prev / recv_next, 先創建空tensor用于結果存儲 (這里是否能優化)
- 根據是否batch傳輸, 分別進行并行/串行的方式通信.
- 等待通信完成, 進行cuda流同步
1F1B(非交錯式)
缺點: 無法支持 p2p通信耗時的overlap
Warmup
num_warmup_microbatchs = min(microbatch, pp_world_size - pp_rank - 1), 比如device1的warmup就是 4 - 0 - 1 = 3, 前3個microbatch warmup的時候, 整體pipeline處于串行的執行狀態.
步驟: recv_forward->forward_step->send_forward 再到下一層PP, 直到warmup步驟全部走完.
Steady
在穩態狀態下就是1F1B描述的情況. 交替進行fp和bp
以device3剛進入steady狀態為例:
forward_step: warmup執行了microbatch1, steady執行的第一個forward是 batch2send_forward_recv_backward: 向device4發batch2的fp結果, 同時等device4返回batch1的bp結果. 這里是同步通信, 需要等bp執行完成, 這時候并沒有跑到batch3的fp上.backward_step: 執行batch1的bpsend_backward_recv_forward: 把batch1的bp結果發給device2, 同時接受device2的batch3 fp結果, 用來執行下一輪的batch3 fp.
5,6,7,8 圖上描述的狀態和代碼是完全一致的, 但1,2,3,4不完全一致.
Cooldown
和warmup剛好是相反的邏輯.根據warmup microbatchs的個數, 等待bp執行完成.
1F1B with interleaving
虛擬流水線的主要目的是讓microbatch_size更小更多, 從而減少氣泡。方法是讓一個device虛擬成 \(v\) 個device,從計算1個連續的layer段(有 \(x\) 個 layer)變成計算 \(v\) 個不連續的layer段(每段 layer 數量為 \(x\)/\(v\)). 比如之前1F1B時device1負責layer 1~4,device2負責 5~8,在 Interleaved 1F1B下device1負責layer 1~2 和 9~10,device2負責 3~4 和 11~12,這樣可以讓流水線中每個stage更小,因而下個stage的等待時間更短,氣泡更小。需要注意的是,micro_batch_size需要是 pipeline_parallel_size的整數倍。
初始化
- warmup_batch數計算方法, 如下代碼:
total_num_microbatches = num_microbatches * num_model_chunks #模型分塊數(virtual pipeline size) * microbatch
all_warmup_microbatches = False
if forward_only:
num_warmup_microbatches = total_num_microbatches
else:
# 這里*2的原因是 為了充分利用設備資源,會使用雙倍緩沖技術。這意味著每個設備會同時處理兩個microbatches,一個在前向傳播,另一個在后向傳播。因此,熱身階段的microbatches數量需要乘以2,以覆蓋前向和后向傳播。
num_warmup_microbatches = (pipeline_parallel_size - pipeline_parallel_rank - 1) * 2
# microbatch_group_size_per_vp_stage 默認值 = pipeline_parallel_size, 用于
num_warmup_microbatches += (num_model_chunks - 1) * config.microbatch_group_size_per_vp_stage
if num_warmup_microbatches >= total_num_microbatches:
num_warmup_microbatches = total_num_microbatches
all_warmup_microbatches = True
num_microbatches_remaining = total_num_microbatches - num_warmup_microbatches
- 設置schedule_table, 為了方便計算, 將microbatch+chunk重映射成了virtual_microbatch_id
# PP2 N3M5 with VP2 is constructed as below:
# virtual_microbatch_id | 0 1 2 3 4 5 6 7 8 9
# microbatch_id | 0 1 2 0 1 2 3 4 3 4
# model_chunk_id | 0 0 0 1 1 1 0 0 1 1
根據chunk_id, 還能判斷出這個virtual_id是這個device上的第一個chunk還是最后一個chunk
recv_tensor_from_previous_stage: 先判斷當前stage是否為leading stage(forward第一個, backward最后一個), 如果virtual_microbatch_id < (pipeline_parallel_size - 1), 說明當前stage沒有任何前置需要接受的tensor, 否則說明他和之前的最后一個stage連在一起. 以PP=4舉個例子:
# 0 1 2 3 ... 這里的microbatch 0的下一個stage是 device0的microbatch3
# 0 1 2 3 ...
# 0 1 2 3 ...
# 0 1 2 3 ...
warmup
注意配置項: overlap_p2p_comm_warmup_flush: 在打開這個開關后支持overlap warmup和flush階段前向計算和通信, 后面看代碼默認這個開關打開, warmup步驟:
- 根據microbatch id判斷是不是leading_stage, 如果不是的話需要等上一個循環發出的異步接受前向結果的handle.
- 異步通信結果保存在fwd_recv_buffer, 異步發出預取下個循環的recv_forward請求
- 進行該stage的forward_step
- 把output_tensor 異步發出 send_forward, 等上一個循環的send_next_wait_handle完成.
- 把通信完的fwd_recv_buffer 賦值給input_tensor用于下個循環的forward
- 在warmup的最后, 觸發異步等待recv_backward的請求. 方便銜接steady階段
steady
循環num_microbatches_remaining = total_num_microbatches - num_warmup_microbatches次, 步驟:
- 等warmup的recv_prev 異步執行完, 收到forward結果到buffer里
- Forward_step
send_forward_recv_forward: 同時接受previous stage的forward結果, 同時把next stage的forward輸入發出.- Wait recv_next傳回來的grad
- Backward_step
send_backward_recv_backward: 反向往之前的stage發grad- 等上一個batch的backward send_prev發完, 相當于一個buffer切換過程.
整個流程像下面這個流水線示意圖.
cooldown
與warmup剛好完全相反, 只有backward的計算和通信操作.
注意在每個階段完成的時候都回將通信用到的output_tensor重新釋放回顯存池, 用來緩解顯存壓力.
參考:
對VPP的進一步優化: https://zhuanlan.zhihu.com/p/681363624

浙公網安備 33010602011771號