[轉] 封裝并發任務方法
作者:謝杰
該文章是并發異步操作系列文章第五篇。
有了前面幾篇文章所介紹的知識鋪墊后,本系列最終篇,我們來封裝一個能夠指定并發上限的方法。
需求
先來過一下需求,封裝一個異步方法 runWithConcurrency,如下:
async function runWithConcurrency(items, worker, maxConcurrency) {}
該方法接收 3 個參數:
- items:要處理的任務列表
- worker:處理單個任務的異步函數
- maxConcurrency:最大并發量
先假設這個方法我們已經寫好了,那么外部在使用的時候,大概是這么用的:
// 示例 1:模擬任務
const items = Array.from({ length: 8 }, (_, i) => i + 1);
const worker = async (n) => {
// 模擬耗時 200~800ms 的異步任務
await new Promise((r) => setTimeout(r, 200 + Math.random() * 600));
console.log(`處理完成:task=${n}`);
return n * n; // 返回結果
};
const results = await runWithConcurrency(items, worker, 3);
console.log('結果(順序與輸入一致):', results);
// 示例 2:真實網絡請求(批量拉取)
const urls = [
'/api/user/1',
'/api/user/2',
'/api/user/3',
'/api/user/4',
'/api/user/5',
];
const fetchUser = async (url) => {
const res = await fetch(url);
if (!res.ok) throw new Error(`請求失敗:${res.status}`);
const data = await res.json();
console.log('已獲取:', url);
return data;
};
const users = await runWithConcurrency(urls, fetchUser, 2);
console.log('所有用戶:', users);
分析與實現
明確了需求后,接下來我們來逐步實現。假設外界調用的時候,是這么調用的:
await runWithConcurrency(urls, fetchUser, 2);
那么這里傳遞的 2 是什么?
沒錯,是并發的上限,我們可以將其想象成有兩個工人,如下圖
┌──────────────────────────────────────────────────┐
│ [任務五] [任務四] [任務三] [任務二] [任務一] │ ← i(下一個)
└──────────────────────────────────────────────────┘
│ │
│ │
▼ ▼
工人 2 工人 1
工人1 先領取了任務一,然后開始執行任務一
工人2 領取任務二,然后開始執行任務二
那么任務三交給誰呢?究竟由工人1 領取還是工人2 領取呢?
那得看哪一個工人的工作先完成,假設工人1 先完成了任務一,那么這個工人就去領取任務三;反之,如果是工人2 先完成了任務二,則是由工人2 去領取任務三。依此類推,后面的任務四、任務五也是這樣,哪個工人手上沒活了,就去領取下一個任務。這和我們現實生活中的場景,也是一致的。
這種模式在代碼里對應的是:固定數量的工人 + 一個共享的“下一個任務”指針。
接下來落地到具體的代碼。首先,我們來確定工人的人數:
async function runWithConcurrency(items, worker, maxConcurrency) {
const n = Math.max(1, Math.min(maxConcurrency, items.length));
}
這里的 n 是實際工人的人數。有的同學會覺得很奇怪,為什么不直接用 maxConcurrency?因為:
- 工人數不能超過任務總數(否則有些工人一上來就沒活干)
- 至少要有 1 個工人
接下來這 n 個工人開始干活:
const workers = [];
for (let k = 0; k < n; k++) workers.push(spawn());
這里的 workers 用來收集每個工人(spawn())返回的 Promise。一旦調用 spawn(),工人就會立刻去領取任務并開始執行;數組里收集的是“工人何時收工”的 Promise,而不是每個任務的 Promise。
緊接著是“工人如何領活兒”。該方法需要不斷地把任務分配給工人,直到任務列表為空:
let i = 0; // 共享任務指針:指向“下一個要處理的下標”
async function spawn() {
// 每個工人都需要不斷地從任務隊列中領取任務
while (i < items.length) {
const idx = i++; // 領取當前要處理的任務下標
await worker(items[idx], idx); // 處理單個任務(工人內部串行)
// 處理完繼續回到 while,再領下一單;直到任務列表為空
}
// 跳出 while:說明任務已經被領完,這個工人可以正常收工
}
這里要強調兩點:
- 每個工人內部是串行的:比如工人 1 領了任務一,必須等這單完成(
await返回)后才能去領下一單。并發來自“有n個工人同時在干”,而不是讓單個工人內部再并發。 - 共享指針是安全的:
const idx = i++這一步在同一次調用棧里是同步完成的(讀取→使用→自增),不會被別的工人中途打斷,因此不會出現兩個工人領到同一單。真正“讓出執行權”的地方發生在await worker(...)之后。
最后,等待所有工人完工,這里使用 Promise.allSettled:
await Promise.allSettled(workers);
這一步不會啟動任務,只是“在門口等所有工人跑完自己手上的任務”。
用 allSettled 的好處是:即便某個工人因為某一單失敗而變成 rejected,也不會短路,其他工人仍會把剩下的活干完,更符合“批量任務盡量跑完”的訴求。
最終完整的代碼如下:
async function runWithConcurrency(items, worker, maxConcurrency) {
if (!items?.length) return;
// i 是“下一個要被領取的任務索引”。多個工人共享這個變量。
// 在 JS 的單線程事件循環中,“讀取 i -> 使用 i -> i++”這一小段代碼在一次
// 宏/微任務中是原子的(不會被其他 JS 執行棧打斷),因此可作為簡單的任務分發指針。
let i = 0;
// workers 用來收集每個“工人”的 Promise,后面用 allSettled 等待他們全部結束。
const workers = [];
// spawn = 啟動一個工人:不停從任務池里領取下一個索引并處理,直到沒有任務可領
async function spawn() {
// 當還有未處理的任務(i < items.length)就繼續循環
while (i < items.length) {
// 領取當前要處理的任務索引,然后自增以留給下一個任務
const idx = i++;
// 執行該任務:必須 await,保證這個工人在“串行處理自己的任務隊列”
// 如果不 await,就會在單個工人內部產生更高的并發,超出 maxConcurrency 的約束
await worker(items[idx], idx);
// 若 worker 拋錯(reject),該 spawn() 的 Promise 會變為 rejected;
// 但我們外層會用 Promise.allSettled,所以不會影響其它工人繼續工作。
}
// 退出 while:說明任務已經被領完,這個工人可以正常收工(resolve)
}
// 計算實際需要啟動的工人數:
// - 不能超過任務總數(否則有些工人一上來就沒活干)
// - 至少 1 個
const n = Math.max(1, Math.min(maxConcurrency, items.length));
// 啟動 n 個工人,每個工人都是一個獨立的異步執行體(Promise)
for (let k = 0; k < n; k++) workers.push(spawn());
// 等待所有工人“收工”。使用 allSettled:
// - 不會因為某個工人失敗而中斷等待(all 會短路,allSettled 不會)
// - 適合“多個任務相互獨立,允許部分失敗也要跑完”的場景
await Promise.allSettled(workers);
// 函數到這里 resolve,表示全部任務都已嘗試完成(成功或失敗),所有工人均結束。
}
至此,帶并發上限的并發執行方法就寫完了。
寫在最后
這一次我們實現了通用的 runWithConcurrency(items, worker, maxConcurrency),用固定數量的工人配合共享游標分發任務,工人內部串行、整體受控并發;配套示例也證明它能穩妥跑完批量請求和本地批處理場景。
使用時有幾處要點值得牢牢記住:
- 并發從調用
spawn()的那一刻就開始了,Promise.allSettled(workers)只是在門口把所有工人等到收工,并不會啟動任務、更不會決定順序; - 共享指針用的是
const idx = i++,在單線程調用棧里是同步完成的,不會出現兩個人領到同一單; - 工人內部必須
await worker(...),否則單個工人會把并發繼續堆高,超出你的maxConcurrency。 - 另外,
workers裝的是“工人的promise”而不是“每個任務的promise”,若需要拿回結果,可以在工人里按下標寫回一個out[idx]再返回。空列表可以直接早退,maxConcurrency也最好做下限與取整的校驗。
落到真實項目,還方法可以繼續打磨:
- 給它包上超時與取消,必要時做重試與指數退避;
- 按需暴露進度回調與觀測埋點,用數據調參;
- 在高壓場景里引入優先級、背壓和動態并發,保護上下游。
掌握并靈活運用這套基建,小到腳本批處理,大到服務側限流,你的異步就能既穩又快。
-EOF-

浙公網安備 33010602011771號