<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [轉] 封裝并發任務方法

      作者:謝杰

      該文章是并發異步操作系列文章第五篇。

      有了前面幾篇文章所介紹的知識鋪墊后,本系列最終篇,我們來封裝一個能夠指定并發上限的方法。

      需求

      先來過一下需求,封裝一個異步方法 runWithConcurrency,如下:

      async function runWithConcurrency(items, worker, maxConcurrency) {}
      

      該方法接收 3 個參數:

      • items:要處理的任務列表
      • worker:處理單個任務的異步函數
      • maxConcurrency:最大并發量

      先假設這個方法我們已經寫好了,那么外部在使用的時候,大概是這么用的:

      // 示例 1:模擬任務
      const items = Array.from({ length: 8 }, (_, i) => i + 1);
      
      const worker = async (n) => {
        // 模擬耗時 200~800ms 的異步任務
        await new Promise((r) => setTimeout(r, 200 + Math.random() * 600));
        console.log(`處理完成:task=${n}`);
        return n * n; // 返回結果
      };
      
      const results = await runWithConcurrency(items, worker, 3);
      console.log('結果(順序與輸入一致):', results);
      
      // 示例 2:真實網絡請求(批量拉取)
      const urls = [
        '/api/user/1',
        '/api/user/2',
        '/api/user/3',
        '/api/user/4',
        '/api/user/5',
      ];
      
      const fetchUser = async (url) => {
        const res = await fetch(url);
        if (!res.ok) throw new Error(`請求失敗:${res.status}`);
        const data = await res.json();
        console.log('已獲取:', url);
        return data;
      };
      
      const users = await runWithConcurrency(urls, fetchUser, 2);
      console.log('所有用戶:', users);
      

      分析與實現

      明確了需求后,接下來我們來逐步實現。假設外界調用的時候,是這么調用的:

      await runWithConcurrency(urls, fetchUser, 2);
      

      那么這里傳遞的 2 是什么?

      沒錯,是并發的上限,我們可以將其想象成有兩個工人,如下圖

      ┌──────────────────────────────────────────────────┐
      │ [任務五]  [任務四]  [任務三]  [任務二]  [任務一] │  ← i(下一個)
      └──────────────────────────────────────────────────┘
                                      │         │
                                      │         │
                                      ▼         ▼
                                    工人 2     工人 1
      

      工人1 先領取了任務一,然后開始執行任務一

      工人2 領取任務二,然后開始執行任務二

      那么任務三交給誰呢?究竟由工人1 領取還是工人2 領取呢?

      那得看哪一個工人的工作先完成,假設工人1 先完成了任務一,那么這個工人就去領取任務三;反之,如果是工人2 先完成了任務二,則是由工人2 去領取任務三。依此類推,后面的任務四、任務五也是這樣,哪個工人手上沒活了,就去領取下一個任務。這和我們現實生活中的場景,也是一致的。

      這種模式在代碼里對應的是:固定數量的工人 + 一個共享的“下一個任務”指針

      接下來落地到具體的代碼。首先,我們來確定工人的人數:

      async function runWithConcurrency(items, worker, maxConcurrency) {
        const n = Math.max(1, Math.min(maxConcurrency, items.length));
      }
      

      這里的 n 是實際工人的人數。有的同學會覺得很奇怪,為什么不直接用 maxConcurrency?因為:

      • 工人數不能超過任務總數(否則有些工人一上來就沒活干)
      • 至少要有 1 個工人

      接下來這 n 個工人開始干活:

      const workers = [];
      for (let k = 0; k < n; k++) workers.push(spawn());
      

      這里的 workers 用來收集每個工人(spawn())返回的 Promise。一旦調用 spawn(),工人就會立刻去領取任務并開始執行;數組里收集的是“工人何時收工”的 Promise,而不是每個任務的 Promise。

      緊接著是“工人如何領活兒”。該方法需要不斷地把任務分配給工人,直到任務列表為空:

      let i = 0; // 共享任務指針:指向“下一個要處理的下標”
      
      async function spawn() {
        // 每個工人都需要不斷地從任務隊列中領取任務
        while (i < items.length) {
          const idx = i++;                 // 領取當前要處理的任務下標
          await worker(items[idx], idx);   // 處理單個任務(工人內部串行)
          // 處理完繼續回到 while,再領下一單;直到任務列表為空
        }
        // 跳出 while:說明任務已經被領完,這個工人可以正常收工
      }
      

      這里要強調兩點:

      • 每個工人內部是串行的:比如工人 1 領了任務一,必須等這單完成(await 返回)后才能去領下一單。并發來自“有 n 個工人同時在干”,而不是讓單個工人內部再并發。
      • 共享指針是安全的const idx = i++ 這一步在同一次調用棧里是同步完成的(讀取→使用→自增),不會被別的工人中途打斷,因此不會出現兩個工人領到同一單。真正“讓出執行權”的地方發生在 await worker(...) 之后。

      最后,等待所有工人完工,這里使用 Promise.allSettled

      await Promise.allSettled(workers);
      

      這一步不會啟動任務,只是“在門口等所有工人跑完自己手上的任務”。

      allSettled 的好處是:即便某個工人因為某一單失敗而變成 rejected,也不會短路,其他工人仍會把剩下的活干完,更符合“批量任務盡量跑完”的訴求。

      最終完整的代碼如下:

      async function runWithConcurrency(items, worker, maxConcurrency) {
        if (!items?.length) return;
        
        // i 是“下一個要被領取的任務索引”。多個工人共享這個變量。
        // 在 JS 的單線程事件循環中,“讀取 i -> 使用 i -> i++”這一小段代碼在一次
        // 宏/微任務中是原子的(不會被其他 JS 執行棧打斷),因此可作為簡單的任務分發指針。
        let i = 0;
      
        // workers 用來收集每個“工人”的 Promise,后面用 allSettled 等待他們全部結束。
        const workers = [];
      
        // spawn = 啟動一個工人:不停從任務池里領取下一個索引并處理,直到沒有任務可領
        async function spawn() {
          // 當還有未處理的任務(i < items.length)就繼續循環
          while (i < items.length) {
            // 領取當前要處理的任務索引,然后自增以留給下一個任務
            const idx = i++;
            // 執行該任務:必須 await,保證這個工人在“串行處理自己的任務隊列”
            // 如果不 await,就會在單個工人內部產生更高的并發,超出 maxConcurrency 的約束
            await worker(items[idx], idx);
            // 若 worker 拋錯(reject),該 spawn() 的 Promise 會變為 rejected;
            // 但我們外層會用 Promise.allSettled,所以不會影響其它工人繼續工作。
          }
          // 退出 while:說明任務已經被領完,這個工人可以正常收工(resolve)
        }
      
        // 計算實際需要啟動的工人數:
        // - 不能超過任務總數(否則有些工人一上來就沒活干)
        // - 至少 1 個
        const n = Math.max(1, Math.min(maxConcurrency, items.length));
      
        // 啟動 n 個工人,每個工人都是一個獨立的異步執行體(Promise)
        for (let k = 0; k < n; k++) workers.push(spawn());
      
        // 等待所有工人“收工”。使用 allSettled:
        // - 不會因為某個工人失敗而中斷等待(all 會短路,allSettled 不會)
        // - 適合“多個任務相互獨立,允許部分失敗也要跑完”的場景
        await Promise.allSettled(workers);
        // 函數到這里 resolve,表示全部任務都已嘗試完成(成功或失敗),所有工人均結束。
      }
      

      至此,帶并發上限的并發執行方法就寫完了。

      寫在最后

      這一次我們實現了通用的 runWithConcurrency(items, worker, maxConcurrency),用固定數量的工人配合共享游標分發任務,工人內部串行、整體受控并發;配套示例也證明它能穩妥跑完批量請求和本地批處理場景。

      使用時有幾處要點值得牢牢記住:

      1. 并發從調用 spawn() 的那一刻就開始了,Promise.allSettled(workers)只是在門口把所有工人等到收工,并不會啟動任務、更不會決定順序;
      2. 共享指針用的是 const idx = i++,在單線程調用棧里是同步完成的,不會出現兩個人領到同一單;
      3. 工人內部必須 await worker(...),否則單個工人會把并發繼續堆高,超出你的 maxConcurrency
      4. 另外,workers 裝的是“工人的promise”而不是“每個任務的promise”,若需要拿回結果,可以在工人里按下標寫回一個 out[idx] 再返回。空列表可以直接早退,maxConcurrency 也最好做下限與取整的校驗。

      落到真實項目,還方法可以繼續打磨:

      1. 給它包上超時與取消,必要時做重試與指數退避;
      2. 按需暴露進度回調與觀測埋點,用數據調參;
      3. 在高壓場景里引入優先級、背壓和動態并發,保護上下游。

      掌握并靈活運用這套基建,小到腳本批處理,大到服務側限流,你的異步就能既穩又快。


      -EOF-

      posted @ 2025-10-30 14:43  Zhentiw  閱讀(8)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产一级r片内射免费视频 | 中文字幕自拍偷拍福利视频| 国内自拍偷拍一区二区三区| 国产精品亚洲二区亚瑟| A级毛片无码久久精品免费| 疯狂做受XXXX高潮国产| 国产亚洲无线码一区二区| 人妻精品动漫H无码中字| 久久精品丝袜高跟鞋| 国产成人精品1024免费下载| 久久久久久久久毛片精品| 五月天久久综合国产一区二区| 亚洲成人av在线综合| 强奷乱码中文字幕| 中文字幕亚洲综合久久2020| 亚洲日韩一区二区| 国产精品国产片在线观看| 午夜欧美日韩在线视频播放| 在线看无码的免费网站| 8050午夜二级无码中文字幕| 亚洲成人av免费一区| 国产成人小视频| 日本道高清一区二区三区| 福利一区二区在线观看| 丰满人妻熟妇乱又伦精品软件| 一本色道久久加勒比综合| 四虎永久免费影库二三区| 午夜激情福利一区二区| 人人人澡人人肉久久精品| 国产裸体无遮挡免费精品| 国产高清在线A免费视频观看| 亚洲精品一区二区麻豆| 国产亚洲精品久久久久久久软件| 国产成人精品午夜二三区| 伊人久久大香线蕉AV网禁呦| 欧美丰满熟妇性xxxx| 国产精品久久香蕉免费播放| 国产精品一区在线蜜臀| 午夜福利国产一区二区三区 | 久久国产精品老女人| 國產尤物AV尤物在線觀看|