MPK(Mirage Persistent Kernel)源碼筆記(5)--- 執行引擎
MPK(Mirage Persistent Kernel)源碼筆記(5)--- 執行引擎
0x00 概述
MPK 包含內置 GPU 運行時系統,可在單個 GPU 巨型內核內完整執行任務圖。這使得系統能在推理過程中無需額外內核啟動的情況下,實現任務執行與調度的細粒度控制,以實現高吞吐量與低延遲。
這座超級工廠能全自動運轉,核心在于MPK設計了一套跑在GPU上的運行時系統。這套系統的精髓,在persistent_kernel.py(前端接口)和persistent_kernel.cuh(后端實現)里體現得淋漓盡致。
由于所有的調度和任務切換都發生在單一內核上下文內,任務間的開銷極低,通常僅需 1-2 微秒,從而能夠高效地執行多層、多 GPU 的 LLM 工作負載。
0x01 SM不同角色
為了實現任務執行與調度的細粒度控制,MPK 在啟動時將 GPU 上所有流式多處理器(SM)靜態分區為兩種角色:即工作單元(Worker)和調度器(Scheduler)。工作 SM 與調度 SM 的數量在內核啟動時固定配置,且總和等于物理 SM 總數,從而徹底避免動態上下文切換開銷。
下圖展示了 MPK 的執行時間線,其中每個矩形代表一個在工作單元上運行的任務;每個圓圈代表一個事件。當一個任務完成時,它會遞增其對應觸發事件的計數器。當事件計數器達到預設閾值時,該事件被視為已激活,并被加入Scheduler的事件隊列。隨后,Scheduler會啟動所有依賴于該事件的下游任務。
這種設計實現了細粒度的軟件流水線化,并允許計算與通信之間重疊,比如
- 矩陣乘法(Matmul)任務可以與來自不同層的注意力任務并行執行。
- 一旦有部分 matmul 結果可用,即可開始 Allreduce 通信。

1.1 Scheduler SM
調度決策由 MPK 的分布式Scheduler處理,每個Scheduler運行于單個線程束(warp)上。由于每個流式多處理器(SM)可以容納多個線程束,因此單 SM 最多可并發運行 4 個Scheduler。每個Scheduler維護激活事件隊列,并持續執行以下操作:
- 事件出隊:移除依賴已滿足的激活事件(即所有前置任務均已完成)。
- 任務啟動:調度依賴該激活事件的任務集。
這種分布式調度機制在實現跨 SM 可擴展執行的同時,最小化協同開銷。
這些SM不負責計算,它們是GPU內部的“調度系統”。它們監控著一系列“事件”(Event)。一個事件代表一個或多個前置任務已經完成。當Scheduler監測到某個事件被觸發(例如,一個矩陣乘任務完成了),它就會查詢預先編譯好的任務圖,找到所有依賴這個事件的后續任務,然后把這些新任務的ID(TaskId)投遞到工人們的任務隊列里。
在persistent_kernel.cuh里,else分支就是Scheduler的邏輯。它們也是在一個while(true)循環里,不斷檢查sched_queues,處理激活的事件,并分派新任務。
1.2 Worker SM
這些SM是純粹的執行單元,負責干具體的計算活,比如矩陣乘、向量加法等。每個工人SM都有一個自己的任務隊列(Task Queue),它們的工作就是不斷地從隊列里取任務、執行、再取下一個。這在persistent_kernel.cuh的源碼里體現得很清楚,if (blockIdx.x < config.num_workers)這個分支里的while(true)循環,就是工人SM執行邏輯的直接實現。
每個工作單元獨占一個流式多處理器(SM),并維護專屬任務隊列。其執行遵循以下高效簡潔的循環流程:
- 獲取任務:從隊列中提取下一待執行任務。
- 執行計算:運行任務(如矩陣乘法 / 注意力機制 / GPU 間數據傳輸)。
- 事件觸發:任務完成后通知觸發事件。
- 循環執行:重復上述過程。
該機制既保障了工作單元的持續滿載運行,又實現了跨層和跨操作的異步任務執行。
0x02 推理引擎
persistent_kernel.cuh 是運行引擎的入口文件。
2.1 初始化
init_persistent_kernel是初始化函數,負責初始化運行時環境、各種數據結構和分配內存。具體如下:
- 參數解析和配置設置
- 接受來自調用方的參數,包括元數據張量、性能分析緩沖區、GPU rank、工作線程數、調度器數等。
- 設置全局運行時配置(global_runtime_config)的各種參數,如工作線程數、調度器數、序列長度限制等。
- 初始化NVSHMEM(如果啟用)
- 初始化NVSHMEM(NVIDIA SHared Memory)環境,用于多GPU間的通信。
- 獲取當前GPU ID和總GPU數量。
- 調用_init_persistent_kernel函數,該函數用于初始化所有任務和事件描述。
- 內存分配和初始化。比如
- 為所有任務(任務描述/TaskDesc)、所有事件(EventDesc)分配內存并復制數據。
- 為工作隊列(worker_queue)和調度器隊列(sched_queue )分配內存并復制數據。
- 為事件計數器(EventCounter)分配內存并復制數據。
- 為第一個任務分配內存并復制數據。
- 啟動內核
- 調用init_kernel進行內核初始化。
- 如果使用NVSHMEM,則添加全局屏障確保所有初始化完成。
流程圖如下

代碼如下
// 外部C語言接口:Mirage持久化內核(MPK)的初始化函數
// 負責配置內核運行時參數、初始化分布式通信(如NVSHMEM)、分配GPU內存并加載任務/事件數據
extern "C" void init_persistent_kernel(std::vector<void *> meta_tensors,
void *profiler_buffer,
int my_rank,
int num_workers,
int num_local_schedulers,
int num_remote_schedulers,
int max_seq_length,
long long eos_token_id) {
// 斷言:確保元數據張量數量為3(對應step、tokens、new_token_nums三個核心張量)
assert(meta_tensors.size() == 3);
// 將元數據張量指針轉換為對應類型,賦值給全局運行時配置
global_runtime_config.step = static_cast<int *>(meta_tensors[0]); // 步驟標記張量(記錄當前生成步驟)
global_runtime_config.tokens = static_cast<long long *>(meta_tensors[1]); // 生成的token序列張量
global_runtime_config.new_token_nums = static_cast<int *>(meta_tensors[2]);// 新增token數量張量
// 配置工作單元、調度器數量參數
global_runtime_config.num_workers = num_workers; // 工作單元(worker)總數
global_runtime_config.num_local_schedulers = num_local_schedulers; // 本地調度器數量
global_runtime_config.num_remote_schedulers = num_remote_schedulers; // 遠程調度器數量(分布式場景)
// 配置序列生成相關參數
global_runtime_config.max_seq_length = max_seq_length; // 最大序列長度(token生成上限)
global_runtime_config.eos_token_id = eos_token_id; // 結束符token的ID
// 配置性能分析緩沖區(用于存儲性能數據)
global_runtime_config.profiler_buffer = profiler_buffer;
// 計算總調度器數量(本地調度器 + 遠程調度器)
int num_schedulers = num_local_schedulers + num_remote_schedulers;
// 初始化分布式通信庫NVSHMEM(僅當啟用USE_NVSHMEM宏時執行)
cudaSetDevice(my_rank); // 設置當前使用的GPU設備(對應分布式場景中的GPU編號)
#ifdef USE_NVSHMEM
MPI_Comm mpi_comm = MPI_COMM_WORLD; // 初始化MPI通信域(NVSHMEM依賴MPI實現分布式通信)
nvshmemx_init_attr_t attr = NVSHMEMX_INIT_ATTR_INITIALIZER; // 初始化NVSHMEM屬性結構體
attr.mpi_comm = &mpi_comm; // 將MPI通信域綁定到NVSHMEM屬性
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr); // 基于MPI通信域初始化NVSHMEM
nvshmem_barrier_all(); // 所有GPU進程同步,確保NVSHMEM初始化完成
// 獲取NVSHMEM相關進程信息
int mype = nvshmem_my_pe(); // 當前GPU在NVSHMEM中的進程編號
int npes = nvshmem_n_pes(); // NVSHMEM中的總進程數(即GPU總數)
int mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE); // 當前進程在節點內的編號
// 打印進程信息(用于調試和日志記錄)
printf("mype(%d) npes(%d) mype_node(%d)\n", mype, npes, mype_node);
#else
// 未啟用NVSHMEM時,默認單GPU場景配置
int mype = 0; // 進程編號默認為0
int npes = 1; // 總進程數默認為1
#endif
// 配置隊列長度參數(每個工作單元、調度器的隊列容量)
global_runtime_config.per_worker_queue_len = 1024; // 每個工作單元隊列的最大任務數
global_runtime_config.per_sched_queue_len = 1024; // 每個調度器隊列的最大事件數
// 配置GPU相關參數
global_runtime_config.num_gpus = npes; // GPU總數(對應NVSHMEM的總進程數)
global_runtime_config.my_gpu_id = mype; // 當前GPU的編號
global_runtime_config.num_graphs = 1; // 內核圖數量(默認1個)
global_runtime_config.split_worker_scheduler = true; // 啟用工作單元與調度器分離的架構
// 聲明任務、事件相關數據結構(用于存儲任務描述、事件描述、初始任務列表)
std::vector<TaskDesc> all_tasks; // 所有任務的描述信息列表
std::vector<EventDesc> all_events; // 所有事件的描述信息列表(用于管理任務依賴)
std::vector<TaskId> first_tasks; // 初始任務ID列表(內核啟動時首先執行的任務)
// 調用內部初始化函數,填充任務、事件、初始任務數據
_init_persistent_kernel(all_tasks, all_events, first_tasks, npes, mype);
// 初始化工作單元隊列的"最后就緒任務ID"數組(GPU端內存分配)
// 每個工作單元維護兩個隊列(本地隊列 + 遠程隊列),因此數組長度為2 * 工作單元數
global_runtime_config.worker_queue_last_ready_task_id =
gpu_malloc<unsigned long long int>((num_workers * 2) *
sizeof(unsigned long long int));
// 在主機端初始化該數組(初始值均為0,表示暫無就緒任務)
std::vector<unsigned long long int> host_worker_queue_last_task_id;
for (int i = 0; i < 2 * num_workers; i++) {
host_worker_queue_last_task_id.push_back(0);
}
// 將主機端數組數據拷貝到GPU端內存
cudaMemcpy(global_runtime_config.worker_queue_last_ready_task_id,
host_worker_queue_last_task_id.data(),
(num_workers * 2) * sizeof(unsigned long long int),
cudaMemcpyHostToDevice);
// 初始化調度器隊列的"最后就緒事件ID"和"下一個空閑事件ID"數組(GPU端內存分配)
// 額外增加1個隊列用于全局調度器,因此數組長度為 總調度器數 + 1
global_runtime_config.sched_queue_last_ready_event_id =
gpu_malloc<unsigned long long int>((num_schedulers + 1) *
sizeof(unsigned long long int));
global_runtime_config.sched_queue_next_free_event_id =
gpu_malloc<unsigned long long int>((num_schedulers + 1) *
sizeof(unsigned long long int));
// 在主機端初始化這兩個數組(初始值均為0)
std::vector<unsigned long long int> host_sched_queue_last_event_id;
for (int i = 0; i < (num_schedulers + 1); i++) {
host_sched_queue_last_event_id.push_back(0);
}
// 將主機端數組數據拷貝到GPU端內存(兩個數組初始值相同)
cudaMemcpy(global_runtime_config.sched_queue_last_ready_event_id,
host_sched_queue_last_event_id.data(),
(num_schedulers + 1) * sizeof(unsigned long long int),
cudaMemcpyHostToDevice);
cudaMemcpy(global_runtime_config.sched_queue_next_free_event_id,
host_sched_queue_last_event_id.data(),
(num_schedulers + 1) * sizeof(unsigned long long int),
cudaMemcpyHostToDevice);
// 初始化所有事件的計數器(GPU端內存分配)
global_runtime_config.all_event_counters =
gpu_malloc<EventCounter>(all_events.size() * sizeof(EventCounter)); // 事件觸發計數器
global_runtime_config.all_event_num_triggers =
gpu_malloc<int>(all_events.size() * sizeof(int)); // 事件所需觸發次數
// 在主機端初始化"事件所需觸發次數"數組(從事件描述中讀取對應值)
std::vector<int> host_all_event_counters;
for (size_t i = 0; i < all_events.size(); i++) {
host_all_event_counters.push_back(all_events.at(i).num_triggers);
}
// 將主機端數據拷貝到GPU端內存
cudaMemcpy(global_runtime_config.all_event_num_triggers,
host_all_event_counters.data(),
all_events.size() * sizeof(int),
cudaMemcpyHostToDevice);
// 將事件觸發計數器初始化為0(所有事件初始未觸發)
cudaMemset(global_runtime_config.all_event_counters,
0,
all_events.size() * sizeof(EventCounter));
// 初始化所有任務數據(將主機端任務描述拷貝到GPU端)
global_runtime_config.all_tasks =
gpu_malloc<TaskDesc>(all_tasks.size() * sizeof(TaskDesc));
cudaMemcpy(global_runtime_config.all_tasks,
all_tasks.data(),
all_tasks.size() * sizeof(TaskDesc),
cudaMemcpyHostToDevice);
// 初始化所有事件數據(將主機端事件描述拷貝到GPU端)
global_runtime_config.all_events =
gpu_malloc<EventDesc>(all_events.size() * sizeof(EventDesc));
cudaMemcpy(global_runtime_config.all_events,
all_events.data(),
all_events.size() * sizeof(EventDesc),
cudaMemcpyHostToDevice);
// 初始化工作單元隊列(GPU端內存分配)
{
std::vector<TaskId *> host_worker_queues; // 主機端存儲每個工作單元隊列的GPU內存指針
// 為每個工作單元的兩個隊列分配GPU內存
for (int i = 0; i < (num_workers * 2); i++) {
TaskId *worker_queue = gpu_malloc<TaskId>(
global_runtime_config.per_worker_queue_len * sizeof(TaskId));
host_worker_queues.push_back(worker_queue);
}
// 分配GPU內存存儲所有工作單元隊列的指針
global_runtime_config.worker_queues =
gpu_malloc<TaskId *>((num_workers * 2) * sizeof(TaskId *));
// 將隊列指針從主機端拷貝到GPU端
cudaMemcpy(global_runtime_config.worker_queues,
host_worker_queues.data(),
(num_workers * 2) * sizeof(TaskId *),
cudaMemcpyHostToDevice);
}
// 初始化調度器隊列(GPU端內存分配)
{
std::vector<EventId *> host_sched_queues; // 主機端存儲每個調度器隊列的GPU內存指針
// 為每個調度器隊列(含全局調度器隊列)分配GPU內存
for (int i = 0; i < (num_schedulers + 1); i++) {
EventId *sched_queue = gpu_malloc<EventId>(
global_runtime_config.per_sched_queue_len * sizeof(EventId));
host_sched_queues.push_back(sched_queue);
}
// 分配GPU內存存儲所有調度器隊列的指針
global_runtime_config.sched_queues =
gpu_malloc<EventId *>((num_schedulers + 1) * sizeof(EventId *));
// 將隊列指針從主機端拷貝到GPU端
cudaMemcpy(global_runtime_config.sched_queues,
host_sched_queues.data(),
(num_schedulers + 1) * sizeof(EventId *),
cudaMemcpyHostToDevice);
}
// 初始化初始任務數據(將主機端初始任務ID拷貝到GPU端)
{
global_runtime_config.first_tasks =
gpu_malloc<TaskId>(first_tasks.size() * sizeof(TaskId));
cudaMemcpy(global_runtime_config.first_tasks,
first_tasks.data(),
first_tasks.size() * sizeof(TaskId),
cudaMemcpyHostToDevice);
}
// 啟動初始化內核(GPU端執行):配置網格和線程塊維度(1個block,每個block含128個thread)
init_kernel<<<dim3(1, 1, 1), dim3(128, 1, 1)>>>(global_runtime_config);
cudaDeviceSynchronize(); // 等待GPU初始化內核執行完成
#ifdef USE_NVSHMEM
nvshmem_barrier_all(); // 分布式場景下,所有GPU進程同步,確保初始化全部完成
#endif
}
2.2 啟動內核
launch_persistent_kernel是啟動 CUDA 內核的入口函數,具體功能如下:
- 獲取設備信息。
- 獲取當前GPU設備編號
- 查詢設備的SM數量,用于確定grid大小。
- 根據配置選擇內核啟動模式
- 模式一,分離式內核(split_worker_scheduler = true),這是默認模式,分別啟動兩個獨立的內核。
- 線程塊大小:128個線程;網格大小:num_local_schedulers;執行scheduler_kernel函數。
- 線程塊大小:128個線程;網格大小:num_workers,每個線程塊對應一個worker;執行worker_kernel函數。
- 為所有內核設置最大動態共享內存大小為 MAX_SHARE_MEMORY_SIZE。
- 使用cudaDeviceSynchronize等待內核啟動完成。
- 模式二,單一持久化內核(split_worker_scheduler = false)
- 網格大小:sm_count(使用所有的sm);線程塊大小:128個線程;調用persistent_kernel函數。
- 如果使用NVSHMEM,則使用nvshmemx_collective_launch啟動,否則使用標準CUDA內核啟動。
- 為所有內核設置最大動態共享內存大小為 MAX_SHARE_MEMORY_SIZE。
- 使用cudaDeviceSynchronize等待內核啟動完成。
- 模式一,分離式內核(split_worker_scheduler = true),這是默認模式,分別啟動兩個獨立的內核。
流程圖如下:

代碼如下。
// 外部C語言接口:Mirage持久化內核(MPK)的啟動函數
// 根據運行時配置的架構模式(工作單元與調度器分離/一體化),啟動對應的GPU內核
extern "C" void launch_persistent_kernel() {
int device;
// 獲取當前正在使用的GPU設備編號
cudaGetDevice(&device);
int sm_count;
// 獲取當前GPU設備的流式多處理器(SM)數量(用于一體化內核的網格維度配置)
cudaDeviceGetAttribute(&sm_count, cudaDevAttrMultiProcessorCount, device);
// 判斷是否啟用"工作單元與調度器分離"的架構模式
if (global_runtime_config.split_worker_scheduler) {
// 打印日志:標識當前啟動的是分離模式(工作單元內核 + 調度器內核)
printf("worker kernel & scheduler kernel\n");
// 配置并啟動工作單元內核與調度器內核
// 1. 設置工作單元內核的最大動態共享內存大小(使用預定義的最大共享內存常量)
cudaFuncSetAttribute(worker_kernel,
cudaFuncAttributeMaxDynamicSharedMemorySize,
MAX_SHARE_MEMORY_SIZE);
// 2. 設置調度器內核的最大動態共享內存大小
cudaFuncSetAttribute(scheduler_kernel,
cudaFuncAttributeMaxDynamicSharedMemorySize,
MAX_SHARE_MEMORY_SIZE);
// 創建兩個獨立的CUDA流:分別用于工作單元內核和調度器內核的異步執行
cudaStream_t worker_stream, scheduler_stream;
cudaStreamCreate(&worker_stream); // 工作單元內核專屬流
cudaStreamCreate(&scheduler_stream); // 調度器內核專屬流
// 注:分離模式不支持NVSHMEM分布式通信
// 原因:nvshmemx_collective_launch會串行啟動內核,阻礙工作單元與調度器內核的交互
// 啟動工作單元內核
worker_kernel<<<
dim3(global_runtime_config.num_workers, 1, 1), // 網格維度:工作單元數量 × 1 × 1(每個工作單元對應一個block)
dim3(128, 1, 1), // 線程塊維度:128個thread × 1 × 1
MAX_SHARE_MEMORY_SIZE /* 動態共享內存大小 */,
worker_stream /* 綁定到工作單元專屬流 */
>>>(global_runtime_config); // 傳入全局運行時配置作為內核參數
// 啟動調度器內核
scheduler_kernel<<<
dim3(global_runtime_config.num_local_schedulers, 1, 1), // 網格維度:本地調度器數量 × 1 × 1
dim3(32, 1, 1), // 線程塊維度:32個thread × 1 × 1
0 /* 調度器內核無需動態共享內存,設為0 */,
scheduler_stream /* 綁定到調度器專屬流 */
>>>(global_runtime_config); // 傳入全局運行時配置作為內核參數
// 等待GPU上所有內核執行完成,并檢查執行錯誤
cudaError_t err = cudaDeviceSynchronize();
if (err != cudaSuccess) {
// 若執行出錯,打印錯誤信息(包含具體錯誤描述)
printf("CUDA kernel launch error: %s\n", cudaGetErrorString(err));
}
// 銷毀創建的CUDA流,釋放資源
cudaStreamDestroy(worker_stream);
cudaStreamDestroy(scheduler_stream);
// 打印日志:標識持久化內核啟動流程完成
printf("Finished Launch Persistent Kernel\n");
} else {
// 打印日志:標識當前啟動的是一體化模式(單個持久化內核)
printf("a single persistent kernel\n");
// 配置并啟動一體化持久化內核
// 設置一體化內核的最大動態共享內存大小
cudaFuncSetAttribute(persistent_kernel,
cudaFuncAttributeMaxDynamicSharedMemorySize,
MAX_SHARE_MEMORY_SIZE);
#ifdef USE_NVSHMEM
// 若啟用NVSHMEM分布式通信,使用NVSHMEM的集合式啟動接口(適配多GPU協同)
void *args[] = {&global_runtime_config}; // 封裝內核參數(全局運行時配置)
nvshmemx_collective_launch(
(void const *)persistent_kernel, // 待啟動的一體化內核函數
dim3(sm_count, 1, 1), // 網格維度:GPU的SM數量 × 1 × 1(每個SM對應一個block)
dim3(128, 1, 1), // 線程塊維度:128個thread × 1 × 1
args, // 內核參數數組
MAX_SHARE_MEMORY_SIZE /* 動態共享內存大小 */,
0 /* 不綁定特定流,使用默認流 */
);
#else
// 未啟用NVSHMEM時,直接啟動一體化內核(單GPU場景)
persistent_kernel<<<
dim3(sm_count, 1, 1), // 網格維度:SM數量 × 1 × 1
dim3(128, 1, 1), // 線程塊維度:128個thread × 1 × 1
MAX_SHARE_MEMORY_SIZE /* 動態共享內存大小 */
>>>(global_runtime_config); // 傳入全局運行時配置作為內核參數
#endif
// 等待GPU內核執行完成,并檢查執行錯誤
cudaError_t err = cudaDeviceSynchronize();
if (err != cudaSuccess) {
// 若執行出錯,打印錯誤信息
printf("CUDA kernel launch error: %s\n", cudaGetErrorString(err));
}
// 打印日志:標識持久化內核啟動流程完成
printf("Finished Launch Persistent Kernel\n");
}
}
persistent_kernel、worker_kernel、scheduler_kernel的函數如下:
__global__ void persistent_kernel(RuntimeConfig config) {
persistent_checker(config);
if (blockIdx.x < config.num_workers) {
execute_worker(config);
} else {
execute_scheduler(config, -(4 * config.num_workers));
}
}
__global__ void worker_kernel(RuntimeConfig config) {
worker_checker(config);
execute_worker(config);
}
__global__ void scheduler_kernel(RuntimeConfig config) {
scheduler_checker(config);
execute_scheduler(config, 0);
}
2.3 Scheduler 實現
2.3.1 功能
execute_scheduler是 CUDA 內核中負責任務調度的核心函數,主要功能如下:
-
初始化和設置調度器
- 每個線程塊處理4個調度器(int warp_thread_id = threadIdx.x % 32 / warp_id < 4)
- 依據調度器ID確定負責的工作節點范圍。
- 區分本地調度器(處理本地任務)和遠程調度器(處理跨GPU任務)的不同處理邏輯。這允許系統依據實際硬件環境和應用需求靈活調整調度器的數量和分工。
-
事件隊列處理循環
函數進入一個無限循環,持續處理事件隊列中的事件。
- 輪詢調度隊列,獲取待處理的事件。
- 使用原子操作確保隊列同步。
- 支持多個調度隊列(本地和全局)。
-
事件分類處理。
依據事件類型進行不同處理。
- 終止事件
- 向所有工作節點發送終止任務。
- 結束調度器運行。
- 任務圖結束事件 EVENT_END_OF_TASK_GRAPH。
- 調用prepare_next_batch檢查是否繼續下一批處理。
- 如果需要終止,則調用 terminate_schedulers
- 否則為下一次迭代啟動新的任務圖。
- 依賴任務事件 EVENT_LAUNCH_DEPENDENT_TASKS
- 增加迭代次數。
- 將任務分割給多個本地調度器,只有本地調度器才可以處理這類事件。
- 按照工作節點數量進行任務分組。
- 按照輪詢方式將任務分配給工作節點。分配算法如下:
- 計算每輪需要的任務數量:e.last_task_id - e.first_task_id + config.num_workers - 1) / config.num_workers。
- 對每個工作節點進行迭代分配:
- 位置索引 = e.first_task_id + i * config.num_workers + j
- 如果索引在有效范圍內,則分配給對應的工作節點。
- 使用next_worker變量實現輪詢機制。
- 大規模任務事件 EVENT_LAUNCH_MASSIVE_TASKS
- 將任務分割給多個本地調度器,只有本地調度器才可以處理這類事件。
- 按照順序將任務分發給對應的不同的工作節點,具體是使用get_first_last_ids來為每個調度器分配任務子集。
- 通過將大量任務均勻分配給所有本地調度器來實現負載均衡。每個調度器只處理分配給它的任務范圍。
- 普通事件任務 EVENT_LAUNCH_TASKS
- 直接將任務分配給工作節點
- 使用輪詢方式確保負載均衡
- 終止事件
2.3.2 流程
流程圖如下。

2.3.3 要點
需要注意的地方如下:
-
任務分發機制
- 使用輪詢方式將任務分發給工作節點。
- 維護每個工作節點的下一個可用任務位置。
- 使用原子操作確保線程安全的任務隊列更新。
-
同步和可見性保證
- 使用 relaxed 和 accuire / release 語義確保內存操作的正確順序。
- 通過原子操作維護隊列狀態。
- 支持NVSHMEM的跨GPU通信
-
任務間的依賴關系
-
通過任務描述符中的 dependent_event字段建立任務間的依賴關系。
-
在執行任務前檢查依賴是否滿足。
-
使用計數機制確保依賴任務按照正確順序執行。
-
任務索引計算:size_t position_index = e.first_task_id + i * config.num_workers + j; 通過這種方式,系統可以將連續的任務按順序分配給不同的工作單元。
-
處理依賴任務時,會
- 迭代編號更新。增加iteration_num,表示進入新的迭代
- 輪詢分配:依賴任務通過輪詢方式分配給工作隊列,確保任務在worker之間均勻分布。
- 本地調度器專用:依賴任務只能由本地調度器處理,遠程調度器不處理這種任務。
- 在execute_worker 中,系統會檢查任務的依賴關系。當一個任務完成時,會觸發相應事件,具備依賴關系的任務會放入廣播隊列中處理。
-
-
每個線程塊處理4個調度器
-
int warp_thread_id = threadIdx.x % 32; --- 128 / 32 = 4,每個warp對應一個調度器實例。每個調度器由足夠的線程資源(32個),這樣可以:充分利用硬件資源,將調度器均勻分布在多個線程塊中,避免單點瓶頸;避免過多調度器并行導致資源競爭;簡化調度器的同步和通信。
-
在 persistent_checker 中,有
assert(num_schedulers % 4 == 0); assert(gridDim.x == config.num_workers + num_schedulers / 4);這說明,如果有 N 個調度器,就需要 N/4個線程塊來處理,每個線程塊處理4個調度器。假設12個調度器,則需要3個線程塊來處理所有調度器。線程塊0處理調度器03,線程塊1處理調度器47。
-
只有warp中第一個線程(warp_thread_id == 0)實際執行調度器策略,其它線程處于空閑狀態(可能未來會擴展或者執行輔助任務)。
-
-
遠程調度器:
- 使用場景
- 多GPU環境中的跨GPU通信任務。當系統配置多個GPU時,需要使用遠程調度器處理GPU間的通信任務。尤其是處理ALLREDUCE等操作。
- NVSHEMEM相關任務。比如處理TASK_NVSHMEM_COPY任務時,需要遠程調度器。這些任務需要在不同GPU之間復制數據。
- 負責
- 協調不同GPU之間的任務執行順序
- 處理跨GPU的數據傳輸
- 確保全局唯一性。
- 使用場景
通過get_first_last_ids將工作負載劃分給不同的調度器。
__device__ __forceinline__ void
get_first_last_ids(unsigned long long int num_elements,
unsigned long long int num_workers,
unsigned long long int my_id,
unsigned long long int *my_first_element,
unsigned long long int *my_last_element) {
unsigned long long int num_elements_per_worker = num_elements / num_workers;
unsigned long long int reminder = num_elements % num_workers;
if (my_id < reminder) {
*my_first_element = (num_elements_per_worker + 1) * my_id;
*my_last_element = *my_first_element + num_elements_per_worker + 1;
} else {
*my_first_element = num_elements_per_worker * my_id + reminder;
*my_last_element = *my_first_element + num_elements_per_worker;
}
}
2.3.4 代碼
// 設備端函數:執行調度器邏輯(每個線程塊僅有一個warp參與,需注意同步限制)
// 功能:從調度器隊列讀取事件,解析事件類型并生成對應的任務,分配至工作單元隊列
__device__ void execute_scheduler(RuntimeConfig config, int offset) {
// 計算總調度器數量(本地調度器 + 遠程調度器)
int num_schedulers =
config.num_local_schedulers + config.num_remote_schedulers;
// 計算當前線程在warp內的ID(0-31,因每個線程塊僅一個warp,簡化同步邏輯)
int warp_thread_id = threadIdx.x % 32;
// 以下邏輯禁止使用__syncthreads(),避免跨warp同步導致的效率損失
// 僅warp內ID為0的線程執行調度核心邏輯(單線程負責調度決策,減少資源競爭)
if (warp_thread_id == 0) {
// 計算當前調度器的全局ID(偏移量+塊索引,區分不同調度器實例)
int sched_id = blockIdx.x + offset;
// 初始化調度器隊列相關參數
int num_sched_queues = 1; // 調度器需處理的隊列數量(初始為1)
size_t iteration_num = 0; // 當前迭代次數(用于任務ID生成)
EventId *sched_queues[2]; // 調度器需監聽的事件隊列指針(最多2個)
int sched_queue_ids[2]; // 隊列對應的全局ID
// 綁定當前調度器的專屬隊列
sched_queues[0] = config.sched_queues[sched_id];
sched_queue_ids[0] = sched_id;
// 用于記錄當前調度器管理的工作單元ID范圍
unsigned long long int my_first_worker, my_last_worker;
// 區分本地調度器與遠程調度器的邏輯
if (sched_id < config.num_local_schedulers) {
// 本地調度器額外處理全局隊列的事件(多隊列監聽)
sched_queues[num_sched_queues] = config.sched_queues[num_schedulers];
sched_queue_ids[num_sched_queues] = num_schedulers;
num_sched_queues++; // 隊列數量增至2
// 計算本地調度器管理的工作單元ID范圍(從總工作單元中分配)
get_first_last_ids(config.num_workers,
config.num_local_schedulers,
sched_id,
&my_first_worker,
&my_last_worker);
} else {
// 遠程調度器管理的工作單元ID范圍(偏移總工作單元數,避免與本地沖突)
get_first_last_ids(config.num_workers,
config.num_remote_schedulers,
sched_id - config.num_local_schedulers, // 遠程調度器的本地索引
&my_first_worker,
&my_last_worker);
// 遠程工作單元ID從總工作單元數開始編號
my_first_worker += config.num_workers;
my_last_worker += config.num_workers;
}
// 調試日志:打印調度器ID及管理的工作單元范圍(僅MPK_ENABLE_VERBOSE啟用時)
#ifdef MPK_ENABLE_VERBOSE
printf("[SCHD] sched_id(%d) first_worker(%llu) last_worker(%llu)\n",
sched_id,
my_first_worker,
my_last_worker);
#endif
// 初始化隊列位置跟蹤變量:當前處理位置與最新事件位置(支持2個隊列)
size_t cur_event_pos[2], last_event_pos[2];
for (int i = 0; i < 2; i++) {
cur_event_pos[i] = 0; // 當前已處理的事件索引
last_event_pos[i] = 0; // 隊列中最新的事件索引
}
// 記錄每個工作單元隊列的下一個空閑位置(避免原子操作開銷,本地緩存)
size_t worker_queue_next_free_task_pos[MAX_WORKER_PER_SCHEDULER];
for (int i = 0; i < MAX_WORKER_PER_SCHEDULER; i++) {
worker_queue_next_free_task_pos[i] = 0;
}
// 特殊初始化:ID為0的調度器首個工作單元的隊列起始位置設為1
if (sched_id == 0) {
worker_queue_next_free_task_pos[0] = 1;
}
// 任務分配的工作單元迭代變量(輪詢分配時使用)
int next_worker = my_first_worker;
// 當前監聽的隊列索引(初始為0)
int queue_idx = 0;
// 調度主循環(持續運行,直至收到終止事件)
while (true) {
// 等待隊列中有新事件:循環檢查當前隊列是否有未處理事件
while (cur_event_pos[queue_idx] == last_event_pos[queue_idx]) {
// 使用acquire語義讀取最新的事件位置(確保數據可見性)
last_event_pos[queue_idx] = ld_acquire_gpu_u64(
&config
.sched_queue_last_ready_event_id[sched_queue_ids[queue_idx]]);
// 若當前隊列有新事件,退出等待;否則切換到下一個隊列
if (cur_event_pos[queue_idx] < last_event_pos[queue_idx]) {
break;
} else {
queue_idx = (queue_idx == num_sched_queues - 1) ? 0 : queue_idx + 1;
}
// 短暫休眠(10納秒),避免空循環占用過多資源
__nanosleep(10);
}
// 斷言:確保調度器隊列未溢出(當前位置+隊列長度需大于最新事件位置)
assert(cur_event_pos[queue_idx] + config.per_sched_queue_len >
last_event_pos[queue_idx]);
// 讀取當前待處理的事件ID(使用relaxed語義,平衡性能與可見性)
EventId event_id = ld_relaxed_gpu_u64(
&sched_queues[queue_idx]
[cur_event_pos[queue_idx] % config.per_sched_queue_len]);
// 獲取事件描述信息
EventDesc e = config.all_events[event_id];
// 檢查是否為終止事件(退出調度循環)
if (is_termination_event(event_id, e)) {
// 若為本地調度器,向其管理的所有工作單元發送終止任務(ID=0)
if (sched_id < config.num_local_schedulers) {
for (int i = my_first_worker; i < my_last_worker; i++) {
// 獲取工作單元隊列的下一個空閑位置
size_t last_task_id =
worker_queue_next_free_task_pos[i - my_first_worker]++;
// 寫入終止任務ID(0)
st_relaxed_gpu_u64(
&config.worker_queues[i][last_task_id %
config.per_worker_queue_len],
0);
// 使用release語義更新工作單元隊列的最新就緒任務ID(確保任務可見)
atom_add_release_gpu_u64(&config.worker_queue_last_ready_task_id[i],
1);
}
}
// 退出調度器
return;
}
// 處理"任務圖結束"事件(EVENT_END_OF_TASK_GRAPH)
if (e.event_type == EVENT_END_OF_TASK_GRAPH) {
#ifdef MPK_ENABLE_VERBOSE
printf("[SCHD] END_OF_TASK_GRAPH\n");
#endif
// 檢查是否需要準備下一批次任務
if (!prepare_next_batch(config)) {
// 無需繼續,終止所有調度器
terminate_schedulers(config);
} else {
// 為下一次迭代啟動"任務圖開始"任務(ID=1)
size_t last_task_id =
worker_queue_next_free_task_pos[next_worker - my_first_worker]++;
// 計算任務ID(迭代次數+任務索引)并寫入工作單元隊列
st_relaxed_gpu_u64(
&config.worker_queues[next_worker]
[last_task_id % config.per_worker_queue_len],
compute_task_id(iteration_num + 1, 1 /*begin_task_graph*/));
// 更新工作單元隊列的最新就緒任務ID
atom_add_release_gpu_u64(
&config.worker_queue_last_ready_task_id[next_worker], 1);
#ifdef MPK_ENABLE_VERBOSE
// 打印調試信息:GPU ID、調度器ID、迭代次數、任務索引、工作單元ID等
printf("[%d][SCHD]EVENT_END_OF_TASK_GRAPH schd_id(%d) "
"iter_num(%llu) task_idx(1) "
"worker_id(%d) "
"worker_last_ready_pos(%llu)\n",
config.my_gpu_id,
sched_id,
iteration_num + 1,
next_worker,
last_task_id + 1);
#endif
// 輪詢切換到下一個工作單元
next_worker = (next_worker == my_last_worker - 1) ? my_first_worker
: next_worker + 1;
}
}
// 處理"啟動依賴任務"事件(EVENT_LAUNCH_DEPENDENT_TASKS)
else if (e.event_type == EVENT_LAUNCH_DEPENDENT_TASKS) {
// 迭代次數遞增(標識新一批任務)
iteration_num = iteration_num + 1;
// 斷言:該事件僅由本地調度器處理
assert(sched_id < config.num_local_schedulers);
// 按工作單元數量拆分任務范圍,輪詢分配給管理的工作單元
for (size_t i = 0;
i < (e.last_task_id - e.first_task_id + config.num_workers - 1) /
config.num_workers;
i++) {
for (size_t j = my_first_worker; j < my_last_worker; j++) {
// 計算當前任務在全局范圍內的索引
size_t position_index =
e.first_task_id + i * config.num_workers + j;
// 僅處理范圍內的任務
if (position_index < e.last_task_id) {
// 獲取工作單元隊列的下一個空閑位置
size_t last_task_id =
worker_queue_next_free_task_pos[next_worker -
my_first_worker]++;
// 計算任務ID并寫入工作單元隊列
st_relaxed_gpu_u64(
&config
.worker_queues[next_worker][last_task_id %
config.per_worker_queue_len],
compute_task_id(iteration_num, position_index));
// 更新工作單元隊列的最新就緒任務ID
atom_add_release_gpu_u64(
&config.worker_queue_last_ready_task_id[next_worker], 1);
// 輪詢切換到下一個工作單元
next_worker = (next_worker == my_last_worker - 1)
? my_first_worker
: next_worker + 1;
}
}
}
}
// 處理其他類型事件(如EVENT_LAUNCH_MASSIVE_TASKS)
else {
// 初始化當前調度器需處理的任務范圍
TaskId my_first_task = e.first_task_id, my_last_task = e.last_task_id;
// 若為"啟動大規模任務"事件,按本地調度器數量拆分任務范圍
if (e.event_type == EVENT_LAUNCH_MASSIVE_TASKS) {
assert(sched_id < config.num_local_schedulers); // 僅本地調度器處理
// 計算當前調度器負責的子任務范圍
get_first_last_ids(e.last_task_id - e.first_task_id,
config.num_local_schedulers,
sched_id,
&my_first_task,
&my_last_task);
// 映射到全局任務ID范圍
my_first_task += e.first_task_id;
my_last_task += e.first_task_id;
}
// 遍歷任務范圍,將任務分配給工作單元(輪詢策略)
for (size_t i = my_first_task; i < my_last_task; i++) {
// 獲取工作單元隊列的下一個空閑位置
size_t last_task_id =
worker_queue_next_free_task_pos[next_worker - my_first_worker]++;
// 計算任務ID并寫入工作單元隊列
st_relaxed_gpu_u64(
&config.worker_queues[next_worker]
[last_task_id % config.per_worker_queue_len],
compute_task_id(iteration_num, i));
// 更新工作單元隊列的最新就緒任務ID
atom_add_release_gpu_u64(
&config.worker_queue_last_ready_task_id[next_worker], 1);
// 輪詢切換到下一個工作單元
next_worker = (next_worker == my_last_worker - 1) ? my_first_worker
: next_worker + 1;
}
}
// 移動到下一個事件
cur_event_pos[queue_idx] += 1;
}
}
}
2.4 Worker實現
execute_worker 是 Mirage 持久化內核(MPK)中工作單元的核心設備端函數,運行于 GPU 線程塊上,負責任務的獲取、依賴檢查、執行及后續事件觸發,是 MPK 內核中實際承載計算任務的核心組件。其流程邏輯可分為五大核心階段,形成 “任務獲取 — 數據準備 — 依賴等待 — 任務執行 — 事件觸發” 的完整閉環:
2.4.1 功能
每個block對應一個worker。execute_worker函數負責從任務隊列中獲取任務并執行,主要功能如下:
-
初始化和設置。
函數首先初始化線程塊內的共享內存(存儲任務 ID 和任務描述),并根據當前工作單元 ID 綁定對應的本地任務隊列;在多 GPU 場景下,額外綁定遠程隊列以接收跨 GPU 任務。同時完成性能分析器的初始化(若啟用),為后續任務執行監控做好準備。此階段的核心目標是建立工作單元與任務隊列的關聯,明確任務來源。
-
任務獲取循環。
任務獲取。會等待任務隊列中有可用任務;使用原子操作確保線程安全;從共享內存中加載任務描述。
僅線程塊內第 0 個線程執行任務獲取邏輯:通過輪詢監聽本地 / 遠程隊列,使用 acquire 語義讀取隊列的最新就緒任務位置,若隊列無新任務則短暫休眠并切換隊列;若有新任務則讀取任務 ID,并通過共享內存同步給線程塊內其他線程。隨后,線程塊內線程并行將任務描述從全局內存拷貝到共享內存(提升訪問效率),并通過線程塊同步確保數據拷貝完成。
函數主體是無限循環,持續從任務隊列中獲取任務。其中,線程0(threadIdx.x == 0)負責協調,其它線程協助數據加載。
while(true) { // 獲取下一個任務。 // 執行任務 // 觸發事件 } -
任務依賴檢查階段
任務依賴檢查。檢查任務是否有依賴事件,如果有依賴,則等待依賴事件完成足夠的觸發次數;使用事件計步器進行同步。
僅第 0 個線程檢查當前任務是否存在依賴事件:若存在有效依賴事件,計算該任務所需的事件觸發次數,循環等待直至事件觸發次數滿足需求(使用 acquire 語義讀取事件計數器,確保數據可見性)。此階段通過精準的依賴管理,保證任務執行的順序正確性,避免因數據未就緒導致的計算錯誤。
-
任務執行
根據任務類型執行差異化邏輯:
- 終止任務TASK_TERMINATE:直接退出工作單元循環,結束執行;
- 任務圖開始任務TASK_BEGIN_TASK_GRAPH:無實際計算邏輯,僅作為流程標記;
- NVSHMEM 拷貝任務TASK_NVSHMEM_COPY:調用分布式通信接口完成跨 GPU 數據傳輸,并觸發遠程事件信號;
- 歸約任務TASK_REDUCE:調用專用歸約內核,處理多 GPU 場景下的二維歸約計算;
- 其他任務:通過通用任務執行函數_execute_task(task_desc, config);
-
事件觸發與隊列更新階段
任務執行完成后,僅第 0 個線程觸發對應的事件:
- 本地事件:原子累加事件計數器,若觸發次數達到當前迭代需求,將事件加入調度器隊列(大規模任務事件加入全局隊列,其他事件隨機分配給本地調度器),并通過 CAS 操作更新調度器隊列的就緒狀態;
- 遠程事件:依賴 NVSHMEM 拷貝任務在數據傳輸時自動觸發信號,此處僅打印調試日志。
2.4.2 流程
流程圖如下:

2.4.3 代碼
execute_worker
execute_worker的代碼如下。
// 設備端函數:執行工作單元邏輯(Worker)
// 功能:從工作單元隊列讀取任務,處理任務依賴,執行具體任務邏輯,并觸發后續事件
__device__ void execute_worker(RuntimeConfig config) {
// 共享內存變量:存儲當前待執行的任務ID和任務描述(線程塊內線程共享)
__shared__ TaskId cur_task_id;
__shared__ TaskDesc task_desc;
// 性能分析相關初始化(僅MPK_ENABLE_PROFILING宏啟用時執行)
#ifdef MPK_ENABLE_PROFILING
PROFILER_CLOSURE_PARAMS_DECL; // 聲明性能分析閉包參數
// 初始化性能分析器:傳入緩沖區指針、GPU ID等參數,僅線程塊內第0個線程執行初始化
PROFILER_INIT(static_cast<uint64_t *>(config.profiler_buffer),
0,
1,
(threadIdx.x % 128 == 0));
#endif
// 當前工作單元的ID(由線程塊索引blockIdx.x標識,每個工作單元對應一個線程塊)
int worker_id = blockIdx.x;
// 共享內存變量:存儲工作單元隊列指針及對應的隊列ID(支持本地和遠程隊列)
__shared__ TaskId *worker_queues[2];
__shared__ int worker_queue_ids[2];
// 獲取當前工作單元的本地隊列指針
TaskId *local_worker_queue = config.worker_queues[worker_id];
worker_queues[0] = local_worker_queue;
worker_queue_ids[0] = worker_id;
int num_worker_queues = 1; // 工作單元隊列數量(初始為1個本地隊列)
// 多GPU場景下,額外綁定遠程隊列(用于接收其他GPU的任務)
if (config.num_gpus > 1) {
TaskId *remote_worker_queue =
config.worker_queues[worker_id + config.num_workers];
worker_queues[num_worker_queues] = remote_worker_queue;
worker_queue_ids[num_worker_queues] = worker_id + config.num_workers;
num_worker_queues++; // 隊列數量增至2(本地+遠程)
}
// 記錄每個隊列當前處理的任務位置(線程私有變量)
size_t cur_task_pos[2];
// 共享內存變量:記錄每個隊列最新的任務位置(線程塊內同步)
__shared__ size_t last_task_pos[2];
// 初始化當前任務位置為0
for (int i = 0; i < 2; i++) {
cur_task_pos[i] = 0;
}
// 僅線程塊內第0個線程初始化最新任務位置為0
if (threadIdx.x == 0) {
for (int i = 0; i < 2; i++) {
last_task_pos[i] = 0;
}
}
// 當前監聽的隊列索引(初始為0)
int queue_idx = 0;
#ifdef MPK_ENABLE_PROFILING
size_t task_counter = 0; // 任務計數器(用于性能分析事件標記)
#endif
// 工作單元主循環(持續讀取并執行任務,直至收到終止任務)
while (true) {
// 階段1:從任務隊列獲取下一個任務(僅第0個線程執行,避免多線程競爭)
if (threadIdx.x == 0) {
// 等待隊列中有新任務:循環檢查當前隊列是否有未處理任務
while (cur_task_pos[queue_idx] == last_task_pos[queue_idx]) {
// 使用acquire語義讀取隊列最新就緒任務位置(確保數據可見性)
last_task_pos[queue_idx] = ld_acquire_gpu_u64(
&config
.worker_queue_last_ready_task_id[worker_queue_ids[queue_idx]]);
// 若當前隊列有新任務則退出等待,否則切換到下一個隊列
if (cur_task_pos[queue_idx] < last_task_pos[queue_idx]) {
break;
} else {
queue_idx = (queue_idx == num_worker_queues - 1) ? 0 : queue_idx + 1;
}
// 短暫休眠(10納秒),避免空循環占用過多GPU資源
__nanosleep(10);
}
// 斷言:確保工作單元隊列未溢出
assert(cur_task_pos[queue_idx] + config.per_worker_queue_len >
last_task_pos[queue_idx]);
// 讀取當前待執行的任務ID(使用relaxed語義平衡性能與可見性)
cur_task_id = ld_relaxed_gpu_u64(
&worker_queues[queue_idx][cur_task_pos[queue_idx] %
config.per_worker_queue_len]);
}
// 線程塊內同步:確保第0個線程讀取任務ID后,其他線程再繼續執行
__syncthreads();
// 階段2:將任務描述從全局內存拷貝到共享內存(線程塊內并行拷貝,提升訪問效率)
// 轉換任務描述的內存指針類型(按int粒度拷貝,適配線程并行)
int *smem_as_int = reinterpret_cast<int *>(&task_desc);
int const *dmem_as_int = reinterpret_cast<int *>(
config.all_tasks + get_task_position_index(cur_task_id));
// 線程塊內線程分工拷貝:每個線程負責部分數據,步長為線程塊大小
for (int i = threadIdx.x; i * sizeof(int) < sizeof(TaskDesc);
i += blockDim.x) {
smem_as_int[i] = dmem_as_int[i];
}
// 線程塊內同步:確保任務描述拷貝完成后,再執行后續邏輯
__syncthreads();
// 階段3:檢查并等待任務依賴的事件完成(僅第0個線程執行)
if (threadIdx.x == 0) {
// 若任務存在依賴事件(非無效事件ID)
if (task_desc.dependent_event != EVENT_INVALID_ID) {
// 等待依賴事件觸發足夠次數
EventId event_id = task_desc.dependent_event;
assert(!is_nvshmem_event(event_id)); // 斷言:非NVSHMEM遠程事件
assert(get_event_gpu_id(event_id) == config.my_gpu_id); // 斷言:事件屬于當前GPU
size_t event_index = get_event_position_index(event_id); // 獲取事件全局索引
// 計算當前任務所需的事件觸發次數(總觸發次數 × 任務迭代次數)
EventCounter needed_counts =
static_cast<EventCounter>(
config.all_event_num_triggers[event_index]) *
get_task_iteration_num(cur_task_id);
EventCounter actual_counts = 0;
// 循環等待,直至事件觸發次數滿足需求
while (actual_counts < needed_counts) {
actual_counts =
ld_acquire_gpu_u64(&config.all_event_counters[event_index]);
__nanosleep(10);
}
}
}
// 線程塊內同步:確保依賴事件處理完成后,所有線程同步執行任務
__syncthreads();
// 性能分析:記錄任務執行開始(非終止任務才記錄)
#ifdef MPK_ENABLE_PROFILING
if (task_desc.task_type != TASK_TERMINATE) {
PROFILER_EVENT_START(task_desc.task_type, task_counter);
}
#endif
// 階段4:執行具體任務(根據任務類型分支處理)
if (task_desc.task_type == TASK_TERMINATE) {
// 終止任務:退出工作單元循環
return;
} else if (task_desc.task_type == TASK_BEGIN_TASK_GRAPH) {
// 任務圖開始任務:無實際執行邏輯(僅作為流程標記)
} else if (task_desc.task_type == TASK_NVSHMEM_COPY) {
// NVSHMEM拷貝任務(僅啟用USE_NVSHMEM時執行,用于多GPU數據傳輸)
#ifdef USE_NVSHMEM
// 計算拷貝數據的字節大小(基于輸入張量維度)
size_t size_in_bytes = 2;
for (int i = 0; i < task_desc.inputs[0].num_dims; i++) {
size_in_bytes *= task_desc.inputs[0].dim[i];
}
size_t event_index = get_event_position_index(task_desc.trigger_event); // 觸發事件索引
int gpu_id = static_cast<int>(get_event_gpu_id(task_desc.trigger_event)); // 目標GPU ID
assert(gpu_id < config.num_gpus); // 斷言:目標GPU ID有效
assert(gpu_id != config.my_gpu_id); // 斷言:目標為遠程GPU
// 調用NVSHMEM接口執行塊拷貝,并觸發事件信號
nvshmemx_putmem_signal_block(
task_desc.outputs[0].base_ptr, // 目標地址(遠程GPU)
task_desc.inputs[0].base_ptr, // 源地址(本地GPU)
size_in_bytes, // 拷貝字節數
reinterpret_cast<uint64_t *>(&config.all_event_counters[event_index]), // 事件計數器
1 /*信號值*/,
NVSHMEM_SIGNAL_ADD, // 信號操作類型(累加)
gpu_id // 目標GPU ID
);
#endif
} else if (task_desc.task_type == TASK_REDUCE) {
// 歸約任務:支持二維歸約,輸入緩沖區包含多GPU維度
assert(task_desc.inputs[0].num_dims == 2); // 斷言:輸入1為2維張量
assert(task_desc.inputs[1].num_dims == 3); // 斷言:輸入2為3維張量(含GPU維度)
// 調用歸約內核執行計算(數據類型為bfloat16)
kernel::reduction_kernel<bfloat16>(
task_desc.inputs[0].base_ptr, // 輸入1數據指針
task_desc.inputs[1].base_ptr, // 輸入2數據指針
task_desc.outputs[0].base_ptr, // 輸出數據指針
config.num_gpus, // GPU總數
config.my_gpu_id, // 當前GPU ID
task_desc.inputs[0].dim[0], // 輸入1維度0大小
task_desc.inputs[0].dim[1], // 輸入1維度1大小
task_desc.inputs[0].stride[0]); // 輸入1維度0步長
} else {
// 其他類型任務:調用通用任務執行函數
#ifdef MPK_ENABLE_VERBOSE
// 調試日志:僅第0個線程塊的第0個線程打印任務執行信息
if (threadIdx.x == 0 && blockIdx.x == 0) {
printf("[worker] _execute_task EXECUTE_TASK %d\n", task_desc.task_type);
}
#endif
_execute_task(task_desc, config);
}
// 線程塊內同步:確保任務執行完成后,再處理后續事件觸發邏輯
__syncthreads();
// 性能分析:記錄任務執行結束
#ifdef MPK_ENABLE_PROFILING
if (task_desc.task_type != TASK_TERMINATE) {
PROFILER_EVENT_END(task_desc.task_type, task_counter++);
}
#endif
// 階段5:任務執行完成后,觸發對應的事件(僅第0個線程執行)
if (threadIdx.x == 0) {
EventId event_id = task_desc.trigger_event; // 任務對應的觸發事件ID
size_t event_index = get_event_position_index(event_id); // 事件全局索引
// 分支1:處理本地事件(非NVSHMEM事件)
if (!is_nvshmem_event(event_id)) {
size_t gpu_id = get_event_gpu_id(event_id);
assert(gpu_id == config.my_gpu_id); // 斷言:事件屬于當前GPU
// 原子操作累加事件計數器(使用release語義確保可見性)
EventCounter count = atom_add_release_gpu_u64(
&config.all_event_counters[event_index], 1);
int num_triggers = config.all_event_num_triggers[event_index]; // 事件總需觸發次數
// 若事件觸發次數達到當前迭代所需總量,將事件加入調度器隊列
if ((count + 1) == static_cast<EventCounter>(num_triggers) *
get_task_iteration_num(cur_task_id)) {
#ifdef MPK_ENABLE_PROFILING
PROFILER_EVENT_START(TASK_SCHD_EVENTS, task_counter); // 記錄調度事件開始
#endif
EventDesc event_desc = config.all_events[event_index]; // 獲取事件描述
// 空事件不做處理,其他事件加入調度器隊列
if (event_desc.event_type != EVENT_EMPTY) {
bool use_bcast_queue = false;
// 大規模任務、依賴任務事件使用全局廣播隊列
if (event_desc.event_type == EVENT_LAUNCH_MASSIVE_TASKS ||
event_desc.event_type == EVENT_LAUNCH_DEPENDENT_TASKS) {
use_bcast_queue = true;
}
// 確定事件對應的調度器ID
int sched_id =
use_bcast_queue
? config.num_local_schedulers + config.num_remote_schedulers // 全局調度器
: get_rand_sched_id(event_index,
worker_id,
config.num_workers,
config.num_local_schedulers); // 隨機分配本地調度器
// 原子操作獲取調度器隊列的下一個空閑位置
size_t last_event_pos = atom_add_release_gpu_u64(
&config.sched_queue_next_free_event_id[sched_id], 1);
// 將事件索引寫入調度器隊列
st_relaxed_gpu_u64(
&config.sched_queues[sched_id][last_event_pos %
config.per_sched_queue_len],
event_index);
// 使用CAS操作更新調度器隊列的最新就緒事件ID(確保原子性)
size_t old;
do {
old = atom_cas_release_gpu_u64(
&config.sched_queue_last_ready_event_id[sched_id],
last_event_pos,
last_event_pos + 1);
} while (old != last_event_pos);
}
#ifdef MPK_ENABLE_PROFILING
PROFILER_EVENT_END(TASK_SCHD_EVENTS, task_counter++); // 記錄調度事件結束
#endif
}
} else {
// 分支2:處理NVSHMEM遠程事件(僅NVSHMEM拷貝任務觸發)
assert(task_desc.task_type == TASK_NVSHMEM_COPY); // 斷言:任務類型為NVSHMEM拷貝
// 注:NVSHMEM拷貝任務在數據傳輸時已觸發信號計數器,此處無需額外操作
}
// 移動到當前隊列的下一個任務位置
cur_task_pos[queue_idx] += 1;
}
}
}
_execute_task
_execute_task 代碼定義在print_task_graph中,此處從task.second[variant_id]獲取可以執行代碼。具體如下:
code.e("__device__ __forceinline__");
code.e("void _execute_task(TaskDesc const& task_desc,");
code.e(" RuntimeConfig const &runtime_config) {");
TaskRegister *task_register = TaskRegister::get_instance();
bool first_task = true;
for (auto const &task : task_register->all_task_variants) {
for (size_t variant_id = 0; variant_id < task.second.size(); variant_id++) {
std::string cond = first_task ? "if" : "else if";
assert(task_type_to_name.find(task.first) != task_type_to_name.end());
code.e("$ (task_desc.task_type == $ && task_desc.variant_id == $) {",
cond,
task_type_to_name[task.first],
variant_id);
code.e("$", task.second[variant_id]);
code.e("}");
first_task = false;
}
}
code.e("}");
2.5 并發執行
總體并發策略如下:
-
工作線程與調度線程分離。
-
任務流水線:計算任務和通信任務可以流水線執行,當一個任務在等待通信完成時,其它任務可以繼續執行計算。
-
事件驅動:通過事件機制實現任務間的依賴管理,不需要阻塞等待。
__device__ __forceinline__ bool is_nvshmem_event(EventId event_id) { return (event_id & EVENT_NVSHMEM_TAG) > 0; } -
多隊列管理:使用多個工作隊列和調度隊列來管理不同類型的任務。
-
異步通信:利用NVSHMEM的異步通信能力,在執行計算的同時進行數據傳輸。任務為TASK_NVSHMEM_COPY。
這種設計允許計算任務和通信操作重疊執行,從而提高總體執行效率。
流程如下:
- 初始化階段:通過NVSHMEM初始化多GPU環境。
- 任務分發:調度器將大型任務分片分配給不同GPU。
- 本地計算:每個GPU處理分配給自己的數據分片。
- 跨GPU通信:使用NVSHMEM進行數據同步和通信。
- 結果規約:通過TASK_REDUCE任務合并各個GPU的計算結果。
0xFF 參考
如何評價CMU將LLM轉化為巨型內核的Mirage Persistent Kernel(MPK)工作?
Mirage: A Multi-Level Superoptimizer for Tensor Programs 簡記 塵伊光
OSDI2025論文筆記:Mirage: A Multi-Level Superoptimizer for Tensor Programs 畫餅充饑
Mirage: A Compiler for High-Performance Tensor Programs on GPUs
https://mirage-project.readthedocs.io/en/latest/mugraph.html
https://mirage-project.readthedocs.io/en/latest/transpiler.html
浙公網安備 33010602011771號