<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] Pytorch 如何實現(xiàn)后向傳播 (3)---- 引擎動態(tài)邏輯

      [源碼解析] Pytorch 如何實現(xiàn)后向傳播 (3)---- 引擎動態(tài)邏輯

      0x00 摘要

      前文我們提到了 autograd 引擎的靜態(tài)架構(gòu),本文開始我們從動態(tài)角度看看引擎是如何運作的。

      本系列其他文章如下:

      深度學(xué)習(xí)利器之自動微分(1)

      深度學(xué)習(xí)利器之自動微分(2)

      [源碼解析]深度學(xué)習(xí)利器之自動微分(3) --- 示例解讀

      [源碼解析]PyTorch如何實現(xiàn)前向傳播(1) --- 基礎(chǔ)類(上)

      [源碼解析]PyTorch如何實現(xiàn)前向傳播(2) --- 基礎(chǔ)類(下)

      [源碼解析] PyTorch如何實現(xiàn)前向傳播(3) --- 具體實現(xiàn)

      [源碼解析] Pytorch 如何實現(xiàn)后向傳播 (1)---- 調(diào)用引擎

      [源碼解析] Pytorch 如何實現(xiàn)后向傳播 (2)---- 引擎靜態(tài)結(jié)構(gòu)

      0x01 前文回顧

      前文我們從靜態(tài)角度來分析了引擎,最后得出一個動態(tài)圖如下,本文會對這個圖進(jìn)行進(jìn)一步細(xì)化。

      • 1)主線程往CPU ReadyQueue放入一個 NodeTask。
      • 2)工作線程 1 從 CPU ReadyQueue 取出 NodeTask,開始執(zhí)行。
      • 3)工作線程 1 結(jié)束之后,往 device_ready_queues_ 的 某一個 ReadyQueue 插入一個 NodeTask。
      • 4)ReadyQueue 對應(yīng)的 工作線程 2 取出 NodeTask,開始執(zhí)行。
                                  +-------------------------+
                                  | GraphTask               |
                                  |                         |
                                  |        cpu_ready_queue_ |
                                  |            +            |
                                  |            |            |
                                  +-------------------------+
                                               |
      +--------------+                         |                           +-----------------+
      | Main Thread  |                         v                           | Worker Thread 1 |
      |              |       1         +-------+---------+       2         |                 |
      |              | push(NodeTask)  |                 |  pop(NodeTask)  |                 |
      |          +-------------------> | CPU ReadyQueue  +----------------------->           |
      |              |                 |                 |                 |                 |
      |              |                 +-----------------+                 |                 |
      |              |              +----------------------+               |                 |
      |              |              | Device ReadyQueues   |               |                 |
      |              |              |                      |               |                 |
      |              |              |                      |    3          |                 |
      |              |              |    +-------------+   | push(NodeTask)|                 |
      |              |              |    | ReadyQueue 1| <-----------------------+           |
      |              |              |    +------+------+   |               |                 |
      |              |              |           |          |               |                 |
      +--------------+              |           |          |               +-----------------+
                                    |           +------------------+
                                    |                      |       |       +-----------------+
      +------------------------+    |          .           |       |       | Worker Thread 2 |
      | Engine                 |    |          .           |       |       |                 |
      |                        |    |          .           |       |       |                 |
      |                        |    |                      |       |       |                 |
      |  device_ready_queues_ +---> |    +-------------+   |       +------------->           |
      |                        |    |    | ReadyQueue 2|   | pop(NodeTask) |                 |
      |                        |    |    +-------------+   |     4         |                 |
      +------------------------+    |                      |               |                 |
                                    |                      |               |                 |
                                    |    +-------------+   |               |                 |
                                    |    | ReadyQueue 3|   |               |                 |
                                    |    +-------------+   |               |                 |
                                    |                      |               |                 |
                                    +----------------------+               +-----------------+
      

      0x02 引擎總體架構(gòu)

      我們從動態(tài)運行的角度看看引擎的總體架構(gòu)。Engine::execute 其總體邏輯如下:

      • 啟動引擎。
        • 初始化local ready_queue。
        • 構(gòu)建一個GraphTask。
        • 構(gòu)建GraphRoot,就是根節(jié)點。
        • 計算最小拓?fù)鋽?shù)。
        • 計算每個節(jié)點的依賴,目的是計算出所有的節(jié)點的依賴個數(shù)。
        • 如果輸出不為空,則調(diào)用 graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr) 將graph_task初始化。
        • 配置工作線程的各種輸入。
        • 啟動工作線程。
      • 運行引擎,即使用 execute_with_graph_task(graph_task, graph_root, ...) 啟動工作線程。
        • 每個線程對應(yīng)一個ReadyQueue,把 Root 節(jié)點放入queue。
        • 初始化完成之后,子線程調(diào)用thread_main(nullptr)開始工作。
        • 在thread_main中反復(fù)調(diào)用evaluate_function(task)計算每個Node的梯度,通過 next_edges 的不斷查找下一個Edge,直到所有節(jié)點的梯度都計算完成,最終完成了整個圖的計算。
          • 進(jìn)行 NodeTask 的計算。
          • 遍歷 當(dāng)前 Function 的 所有 next_function, 將它們的 dependencies 減一, 看看他們是否已經(jīng) ready。
          • 如果 ready, 通過 InputBuffer 的 device 來確定 將其 放入到 那個 ReadyQueue 中。
          • 如果沒有準(zhǔn)備好, 就放在 GraphTask 中的 not_ready 中。
          • 如果 graph_task->outstanding_tasks <= 0 則退出循環(huán)。即執(zhí)行完了 GraphTask 所有的 Node。
      • 主進(jìn)程進(jìn)行阻塞等待,等待 graph_task->future_result_,即工作線程結(jié)束。

      具體代碼如下:

      auto Engine::execute(const edge_list& roots, // 反向傳播的根節(jié)點
                           const variable_list& inputs, // 根節(jié)點的梯度
                           bool keep_graph, // 計算圖是否需要保留
                           bool create_graph, // 是否需要構(gòu)建微分圖以進(jìn)行高階求導(dǎo)
                           bool accumulate_grad,
                           const edge_list& outputs // 需要輸出梯度的節(jié)點
                          ) -> variable_list {
        validate_outputs(roots, const_cast<variable_list&>(inputs), [](const std::string& msg) {
          return msg;
        });
      
        // A fresh first time Engine::execute call should start on the CPU device, initialize
        // a new thread local ready queue on CPU or reuse the existing one (if there is one
        // allocated already, i.e. consecutive backward calls, re-entrant backward calls),
        // then memoize the local_ready_queue in GraphTask
        init_local_ready_queue(); // 初始化local ready_queue
        bool not_reentrant_backward_call = worker_device == NO_DEVICE;
      	// 構(gòu)建一個GraphTask
        auto graph_task = std::make_shared<GraphTask>(
            /* keep_graph */ keep_graph,
            /* create_graph */ create_graph,
            /* depth */ not_reentrant_backward_call ? 0 : total_depth + 1,
            /* cpu_ready_queue */ local_ready_queue);
      
        // 構(gòu)建GraphRoot
        // If we receive a single root, skip creating extra root node
        bool skip_dummy_node = roots.size() == 1;
        auto graph_root = skip_dummy_node ?
          roots.at(0).function :
          std::make_shared<GraphRoot>(roots, inputs);
      
        // 計算最小拓?fù)鋽?shù)
        auto min_topo_nr = compute_min_topological_nr(outputs);
        // Now compute the dependencies for all executable functions
        // 計算依賴
        compute_dependencies(graph_root.get(), *graph_task, min_topo_nr);
      
        // 如果輸出不為空,則調(diào)用 *graph_root, outputs 將graph_task初始化
        if (!outputs.empty()) {
          graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr);
        }
      
        // Queue the root
        if (skip_dummy_node) {
          // 配置工作進(jìn)程的各種輸入
          InputBuffer input_buffer(roots.at(0).function->num_inputs());
          auto input = inputs.at(0);
      
          const auto input_stream = InputMetadata(input).stream();
          const auto opt_next_stream = roots.at(0).function->stream(c10::DeviceType::CUDA);
          input_buffer.add(roots.at(0).input_nr,
                            std::move(input),
                            input_stream,
                            opt_next_stream);
          // 啟動工作進(jìn)程
          execute_with_graph_task(graph_task, graph_root, std::move(input_buffer));
        } else {
          // 啟動工作進(jìn)程
          execute_with_graph_task(graph_task, graph_root, InputBuffer(variable_list()));
        }
        // Avoid a refcount bump for the Future, since we check for refcount in
        // DistEngine (see TORCH_INTERNAL_ASSERT(futureGrads.use_count() == 1)
        // in dist_engine.cpp).
        // 主進(jìn)程進(jìn)行阻塞等待,等待 graph_task->future_result_。
        auto& fut = graph_task->future_result_;
        fut->wait();
        return fut->value().toTensorVector();
      }
      

      我們接下來按照步驟來分析。

      0x03 啟動引擎

      啟動引擎部分包括:

      • 初始化local ready_queue。
      • 構(gòu)建一個GraphTask。
      • 構(gòu)建GraphRoot,就是根節(jié)點。
      • 計算最小拓?fù)鋽?shù)。
      • 計算每個節(jié)點的依賴。
      • 如果輸出不為空,則調(diào)用 graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr) 將graph_task初始化。
      • 配置工作線程的各種輸入。

      我們接下來一一分析。

      3.1 初始化local ready queue

      前文提到,每個autogard 工作線程都與一個就緒隊列相關(guān)聯(lián),該隊列指定該線程要執(zhí)行的工作流,這個隊列定義如下。

      static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;
      

      這個shared_ptr是一個thread_local指針,其指向每個線程的ready_queue,在執(zhí)行之前,應(yīng)該通過每個對應(yīng)線程中的 Engine::init_local_ready_queue() 調(diào)用對其進(jìn)行初始化。

      另外,GraphTask 也有一個 CPU queue 成員變量 cpu_ready_queue_,專用于處理反向傳播相關(guān)CPU工作。

      init_local_ready_queue 代碼有兩個執(zhí)行路徑:

      • 主線程執(zhí)行路徑 :參數(shù) ready_queue 沒有配置,則此時 Engine::execute 是全新調(diào)用,則應(yīng)該在CPU設(shè)備上啟動。所以需要在CPU上初始化一個新的線程本地就緒隊列或重用現(xiàn)有的線程本地就緒隊列(比如可重入的后向傳播),然后在工作線程的 local_ready_queue 之上保存。這就通過如下代碼完成,此時 local_ready_queue 是主線程的線程本地變量
      • 工作線程執(zhí)行路徑 :參數(shù) ready_queue 有配置,是通過 std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true),這時候 local_ready_queue 就是工作線程的本地變量。

      我們這個階段介紹的就是主線程執(zhí)行路徑,init_local_ready_queue 調(diào)用沒有參數(shù),生成工作線程的 local_ready_queue。

      void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
        if (ready_queue) {
          // 工作線程執(zhí)行路徑
          // if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
          local_ready_queue = std::move(ready_queue);
        } else if (!local_ready_queue){
          // 主線程執(zhí)行路徑。
          // otherwise if local_ready_queue not allocated, allocate a new ready_queue
          local_ready_queue = std::make_shared<ReadyQueue>();
        }
      }
      

      3.2 構(gòu)建GraphTask

      接下來就構(gòu)建了 GraphTask,這時候就把主線程的 local_ready_queue 傳入。

      	// 構(gòu)建一個GraphTask
        auto graph_task = std::make_shared<GraphTask>(
            /* keep_graph */ keep_graph,
            /* create_graph */ create_graph,
            /* depth */ not_reentrant_backward_call ? 0 : total_depth + 1,
            /* cpu_ready_queue */ local_ready_queue);
      

      在構(gòu)建函數(shù)內(nèi)部,就把 local_ready_queue 賦值給了 GraphTask 的成員變量 cpu_ready_queue_。

      GraphTask(
          bool keep_graph,
          bool grad_mode,
          int reentrant_depth,
          std::shared_ptr<ReadyQueue> cpu_ready_queue,
          bool exit_on_error = false)
          : keep_graph_(keep_graph),
            grad_mode_(grad_mode),
            owner_(NO_DEVICE),
            reentrant_depth_(reentrant_depth),
            exit_on_error_(exit_on_error),
            cpu_ready_queue_(std::move(cpu_ready_queue)),
            future_result_(std::make_shared<at::ivalue::Future>(c10::ListType::create(c10::TensorType::get()))) {}
      

      目前邏輯如下,生成了 GraphTask 內(nèi)部的 queue,但是引擎內(nèi)部的 device_ready_queues_ 還沒有生成

      +------------------------+                                     +-----------------------+
      | GraphTask              |                                     | Main Thread           |
      |                        |       |-----------------|           |                       |
      |     cpu_ready_queue_+------->  | CPU ReadyQueue  | <-----------+ local_ready_queue   |
      |                        |       +-----------------+           |                       |
      |                        |                                     |                       |
      +------------------------+                                     +-----------------------+
      
      
      +------------------------+
      | Engine                 |
      |                        |
      |                        |
      |  device_ready_queues_  |
      |                        |
      |                        |
      +------------------------+
      

      3.3 構(gòu)建根節(jié)點

      接下來構(gòu)建根節(jié)點。關(guān)于構(gòu)建根節(jié)點我們有一個問題:如果在前向計算中,有多個輸出怎么辦?就是后向傳播時候有多個輸入根,這時候怎么辦?答案如下:

      • 如果只有一個root節(jié)點,就跳過創(chuàng)建其他根,直接返回 roots.at(0).function 作為 GraphRoot,就是一個Node節(jié)點。

      • 如果有多個root輸入根,就構(gòu)造一個GraphRoot,用它來驅(qū)動后向傳播。

        • 把這些 root 作為參數(shù)構(gòu)建一個GraphRoot,這個 GraphRoot 作為真正的根節(jié)點。
        • root 就是 Node 的邊。即,把這些根對應(yīng)的 edge_list 轉(zhuǎn)換為 Node 里面的 next_edges_,這個GraphRoot可以認(rèn)為是一個虛擬Root。

      具體代碼如下:

        // If we receive a single root, skip creating extra root node
        bool skip_dummy_node = roots.size() == 1;
        auto graph_root = skip_dummy_node ?
          roots.at(0).function : // 單個root,直接使用
          std::make_shared<GraphRoot>(roots, inputs); // 多個root輸入根,就構(gòu)造一個GraphRoot,用它來驅(qū)動后向傳播
      

      回憶一下 GraphRoot 的定義,大家可以印證一下。

      using edge_list = std::vector<Edge>;
      
      struct TORCH_API GraphRoot : public Node {
        GraphRoot(edge_list functions, variable_list inputs)
            : Node(std::move(functions)),
            outputs(std::move(inputs)) {
          // Ensures calls to stream() on a GraphRoot instance reflect current stream(s)
          // on devices of root grad tensors at the time the instance is constructed.
          for (const auto& t : outputs) {
            add_input_metadata(t);
          }
        }
      
        variable_list apply(variable_list&& inputs) override {
          return outputs; // 直接把梯度透傳給后續(xù)節(jié)點
        }
      
        variable_list outputs; // 這個是梯度
      };
      

      inline static uint64_t compute_min_topological_nr(const edge_list& outputs) { // Computes the mininum topological number among all the outputs if (outputs.empty()) { return 0; } auto min_topo_nr = std::numeric_limits<uint64_t>::max(); for (auto & output_edge : outputs) { auto topo_nr = output_edge.function.get()->topological_nr(); min_topo_nr = (min_topo_nr < topo_nr) ? min_topo_nr : topo_nr; } return min_topo_nr; }

      topological_nr_ 是 “節(jié)點”的拓?fù)漤樞蛱?,表示從該?jié)點到任何葉節(jié)點的最長可能路徑的長度。如果某個節(jié)點是葉節(jié)點,即AccumulateGrad,topological_nr_ 將是零。topological_nr_ 用于在autograd發(fā)現(xiàn)期間對DAG中的分支進(jìn)行修剪,維護(hù)拓?fù)?topological_nr_有助于我們在兩個節(jié)點之間不存在有向路徑時,在O(1) 時間完成檢查。

      topological_nr_ 具有以下屬性:

      • 對于G中的每一對節(jié)點X,Y,如果存在從X到Y(jié)的有向路徑,則意味著 topo_nr(X) > topo_nr(Y)。然而,事實并非如此,因此我們無法證明從X到Y(jié)的路徑的存在性,只能證明不存在。

      • 我們在使用 topological_nr_ 時所做的一個假設(shè)是:一旦使用了一個節(jié)點,即,它有一個父節(jié)點,那么它自己的topological_nr_ 就不會改變。我們在“has_parent_”字段中添加了一些檢查來強(qiáng)制執(zhí)行這一點。

      具體大家也可以通過代碼中的注釋來印證。

      // NOTE [ Topological Number ]
      //
      // topological_nr is used to prune branches in the DAG during autograd discovery as
      // maintaining topological_nr helps us check in O(1) if there does NOT exist
      // a directed path between two nodes.
      //
      // The topological order number of this `Node` representing the length of the
      // longest possible path from this Node to any leaf node. If you are leaf node,
      // aka AccumulateGrad, this will be zero. This value has the property that
      // For every pair of nodes X, Y in G, existence of a directed path from X to Y
      // implies topo_nr(X) > topo_nr(Y). The converse is not true, however, so we
      // cannot prove existence of a path from X to Y, only non-existence.
      //
      // One assumption we make when using topo_nr is that once a node
      // has been used, i.e., has a parent node, its own topo_nr does not change
      // we have added some checks with the `has_parent_` field to enforce this.
      //
      // What NOT to do:
      //
      //   1) 2 -> 1 -> 0               In this diagram we label nodes with their topo_nr.
      //      2 -> 1 -> 0               We have two simple graphs that can each arise from
      //                                `t.exp().exp()`, for example.
      //   2)        2 -> 1 -> 0
      //            /
      //      2 -> 1 -> 0               We add 2 as a next edge to 1 even though 1 already
      //                                has a parent.
      //   3)        2 -> 1 -> 0
      //            /
      //      2 -> 3 -> 0               2 < 3, yet there exists a path from 2 to 3!
      //
      uint64_t topological_nr() const noexcept {
        has_parent_ = true;
        return topological_nr_;
      }
      
      

      3.5 計算依賴

      GraphTask 的 dependencies_ 成員變量 用來判斷后續(xù)節(jié)點是否已經(jīng)可以被執(zhí)行,其類型如下:

      std::unordered_map<Node*, int> dependencies_;
      

      dependencies 的 key 數(shù)目就是微分圖中Node數(shù)目,dependencies 計算的就是每一個Node的入度。

      dependencies成員在compute_dependencies調(diào)用中被初始化,只要某一個grad_fn函數(shù)在其他人的 next_edges() 中出現(xiàn)過一次,那么dependencies[this_grad_fn] 就自增1。如果dependencies[this_grad_fn]大于0,說明this_grad_fn有一個后向傳播的依賴,即 this_grad_fn 需要等 被依賴者 完成,才能進(jìn)行自己的反向傳播相關(guān)計算。

      compute_dependencies 就是計算GraphTask的dependencies_。其邏輯是:從 graph_root 開始,對微分圖中每個node的依賴進(jìn)行計算,計算從根節(jié)點開始,通過廣度優(yōu)先的算法進(jìn)行。如果一個grad_fn函數(shù)在別人的next_edges()中出現(xiàn)過一次,那么dependencies[grad_fn] 就自增1。具體代碼如下:

      auto Engine::compute_dependencies(Node* root, GraphTask& task, uint64_t min_topo_nr) -> void {
        // Computes the number of dependencies for each function which requires grad
        std::unordered_set<Node*> seen;
        std::vector<Node*> queue { root };
      
        // Queue contains all nodes that will start propagating gradients.
        // We no longer have to expand functions that don't require grad.
        auto& dependencies = task.dependencies_;
        while (!queue.empty()) {
          auto fn = queue.back(); queue.pop_back();
          if (fn->topological_nr() < min_topo_nr) {
            continue;
          }
          for (const auto& edge : fn->next_edges()) {
            if (auto next_ptr = edge.function.get()) { 
              dependencies[next_ptr] += 1;
              const bool was_inserted = seen.insert(next_ptr).second;
              if (was_inserted) queue.push_back(next_ptr);
            }
          }
        }
      }
      
      

      比如我們的代碼

      a = torch.tensor(2., requires_grad=True)
      b = torch.tensor(6., requires_grad=True)
      Q = 3*a**3 - b**2
      
      

      對應(yīng)計算圖是

      img

      得到依賴是:

      dependencies[PowBackward0] = 1 # 被 MulBackward0 的 next_edges 引用
      dependencies[MulBackward0] = 1 # 被 SubBackward0 引用
      dependencies[PowBackward0_2] = 1 # 被 SubBackward0 引用
      

      如果某個節(jié)點的 dependencies 為0,才能執(zhí)行。

      3.6 初始化GraphTask ExecInfo

      如果出邊不為空,則會調(diào)用 init_to_execute 對GraphTask.exec_info_進(jìn)行初始化。

        if (!outputs.empty()) {
          graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr);
        }
      

      GraphTask.exec_info_ 的作用就是給 GraphTask 的每一個 Node 配置一個 ExecInfo,就是該 Node 的執(zhí)行信息。

      • 如果exec_info_為空,說明該task運行在默認(rèn)模式,即,所有遇到的 next_edges 都需要執(zhí)行。

      • 如果 exec_info_ 非空,說明只有特定 functions 才會被執(zhí)行,這些 Functions 的特點是:擁有 entry,并且這個 entry 的 “has needed == True”。

      exec_info_ 何時為空?何時非空?

      • 當(dāng)圖被用 .backward() 執(zhí)行,并且沒有傳遞輸入?yún)?shù),則 exec_info 為空。
      • 如果只是使用用 .grad() 執(zhí)行,或者使用.backward() 執(zhí)行時候并且給定輸入?yún)?shù),那么 exec_info_ 非空。

      所以,exec 和 captured_vars_ 就是針對 grad() 和指定參數(shù)的 backward(),就是標(biāo)注在這種情況下需要計算哪些梯度。在這種情況下,只有某些節(jié)點需要執(zhí)行,從這些節(jié)點開始,有一條路徑通向 outpus

      init_to_execute 的作用是給 exec_info 填充數(shù)據(jù),目的是對于應(yīng)該執(zhí)行的節(jié)點,設(shè)置其成員變量exec_info[node].needed_ = true。只有某些特定節(jié)點會得到執(zhí)行,這些節(jié)點有一條出邊,出邊的另一端在“outputs”之中。

      其主要算法邏輯為:使用遞歸思路來填充 exec_info,但是對于實際代碼是使用iterative方式進(jìn)行。在iterative版本中,當(dāng)你操作當(dāng)前節(jié)點時候,在你所有孩子節(jié)點被更新之后,你需要更新你父親節(jié)點。

      void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs, bool accumulate_grad, uint64_t min_topo_nr) {
        // Populates exec_info so nodes that should be executed have `exec_info[node].needed_ = true`
        // Only nodes that have a path to any edge in `outputs` should be executed.
        // The code below populates exec_info using recursion, but the actual code does this
        // iteratively. Refer to the numbering to see how the actual code corresponds.
        // A difference to note is that in the iterative version, when you are working with
        // the current Node, you are reponsible to update your parent's is_needed after all your
        // children have been updated.
        //
        // is_needed = {fn: True for fn in outputs}             # (0)
        // seen = {}
        // def compute_is_needed(fn):
        //   for next_edge in fn.next_edges:
        //     child_fn = next_edge.fn
        //     if child_fn in seen and is_needed[child_fn]:     # (1)
        //       is_needed[fn] = true
        //     else:
        //       seen.add(child_fn)
        //       if compute_is_needed(child_fn):
        //         is_needed[fn] = true                         # (2)
        //                                                      # (3) exit for-loop
        //   return is_needed[fn]
        // compute_is_needed(graph_root)
        //
        // NB: you might be wondering why we don't populate `seen` with outputs. We cannot
        // because in the case where two outputs lie on the same path, we still need to explore past
        // the first output or we would miss the nodes that are required to compute the second output.
        int output_idx = 0;
        for (auto & output_edge : outputs) {
          // (0) `is_needed` above corresponds to `exec_info_[fn].needed_`
          Node *output = output_edge.function.get();
          auto & info = exec_info_[output];
          if (accumulate_grad) {
            // if called through `.backward()` we directly set `needed_` for all the outputs to true
            info.needed_ = true;
          } else {
            // otherwise it is `.grad()` and we set exec_info[fn].captures_ instead
            // In terms of populating the rest of exec_info though, you can basically
            // think of this as the same as setting `needed_` is true directly.
            if (!info.captures_) {
              info.captures_ = make_unique<std::vector<ExecInfo::Capture>>();
            }
            // 第 i 個輸入對應(yīng)的輸出
            info.captures_->emplace_back(output_edge.input_nr, output_idx++); 
          }
        }
        captured_vars_.resize(output_idx);
      
        struct Frame {
          Frame (Node *fn) : fn_(fn), next_next_fn_(0) {}
          Node *fn_;
          size_t next_next_fn_;
      
          Node* get_next_fn() {
            const auto & next = fn_->next_edges();
            auto num_next = next.size();
            while (next_next_fn_ < num_next) {
              auto fn = next[next_next_fn_++].function.get();
              if (fn) return fn;
            }
            return nullptr;
          }
        };
      
        auto nodeShouldExecute = [this](Node *fn) {
          auto it = exec_info_.find(fn);
          return it != exec_info_.end() && it->second.should_execute();
        };
      
        std::vector<Frame> stack;
        std::unordered_set<Node*> seen;
        stack.emplace_back(&graph_root);
        exec_info_.emplace(stack.back().fn_, ExecInfo()); //有多個exec_info
      
        while (!stack.empty()) {
          auto &frame = stack.back();
          const auto fn = frame.fn_;
      
          Node *child_fn = nullptr;
          while((child_fn = frame.get_next_fn()) && !seen.emplace(child_fn).second) {
            // (1) next child exists AND has already been seen
            if (nodeShouldExecute(child_fn)) {
              exec_info_[fn].needed_ = true;
            }
          }
      
          if (child_fn) {
            // (2) next child exists but has not been seen
            if (child_fn->topological_nr() < min_topo_nr) {
              // child created before the first output means this child cannot have
              // an edge to output
              continue;
            }
            stack.emplace_back(child_fn);
          } else {
            // (3) no next child exists for `fn` means its `needed` has already been
            // finalized. pop stack and update parent
            stack.pop_back();
            if (nodeShouldExecute(fn) && !stack.empty()) {
              exec_info_[stack.back().fn_].needed_ = true;
            }
          }
        }
      }
      

      3.7 配置工作線程輸入

      接下來在主線程中,會配置工作線程的輸入,就是構(gòu)建了 InputMetadata。

        // Queue the root
        if (skip_dummy_node) {
          // 如果是單節(jié)點,則直接使用 CUDA queue
          InputBuffer input_buffer(roots.at(0).function->num_inputs());
          auto input = inputs.at(0);
          // 構(gòu)建InputMetadata
          const auto input_stream = InputMetadata(input).stream();
          const auto opt_next_stream = roots.at(0).function->stream(c10::DeviceType::CUDA);
          input_buffer.add(roots.at(0).input_nr,
                            std::move(input),
                            input_stream,
                            opt_next_stream);
      
          execute_with_graph_task(graph_task, graph_root, std::move(input_buffer));
        } else {
          // 如果是多輸入根節(jié)點,之前構(gòu)建了虛擬根節(jié)點,后續(xù)就對應(yīng)了 CPU queue
          execute_with_graph_task(graph_task, graph_root, InputBuffer(variable_list()));
        }
      

      3.8 開始運行

      接下來會調(diào)用 execute_with_graph_task。execute_with_graph_task具體做就是啟動后續(xù)各種線程,關(guān)于 線程 具體我們后續(xù)詳解。

      這里就是知道,根據(jù)當(dāng)前設(shè)備來走不同路徑,具體邏輯如下:

      • 如果 worker_device == NO_DEVICE,說明這是主線程,則:
        • 獲取到相關(guān)queue,具體是利用 input_buffer.device() 獲得的,這就用到了上節(jié)的 InputBuffer。如果獲取到的設(shè)備是 CPU,就獲取 GraphTask.cpu_ready_queue_,如果是 GPU,就用到了對應(yīng)的GPU設(shè)備對應(yīng)的 queue,具體我們后文詳述。
        • 在 queue 之中插入 NodeTask。
        • 調(diào)用 thread_main 運行 GraphTask。
      • 否則是可重入反向傳播情況下的主線程,則:
        • 利用 graph_task->owner_ = worker_device 指定當(dāng)前設(shè)備是哪個設(shè)備,GPU 或者 CPU。
          • 如果已經(jīng)達(dá)到了最大遞歸深度,則采用add_thread_pool_task 啟動 GPU線程 或者 CPU線程。
          • 否則運行 thread_main。

      具體代碼如下:

      std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
          const std::shared_ptr<GraphTask>& graph_task,
          std::shared_ptr<Node> graph_root,
          InputBuffer&& input_buffer) {
        
        initialize_device_threads_pool(); // 啟動設(shè)備工作線程
        
        // Lock mutex for GraphTask.
        std::unique_lock<std::mutex> lock(graph_task->mutex_);
      
        // 獲取到相關(guān)queue,具體是利用 input_buffer.device() 獲得的。
        auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());
      
        // worker_device == NO_DEVICE it's a CPU thread and it's trying to drive the
        // autograd engine with corresponding GraphTask, and its NOT a re-entrant call
        if (worker_device == NO_DEVICE) {
          // 如果到了這里,必然是沒有活躍工作設(shè)備  
          
          // 主線程
            
          // We set the worker_device to CPU_DEVICE only if worker_device was previously
          // NO_DEVICE. Setting it to CPU afterwards allow us to detect whether this is
          // a re-entrant call or not.
          set_device(CPU_DEVICE);
      
          // set the graph_task owner to the current device
          graph_task->owner_ = worker_device; // 就是 CPU 設(shè)備
      
          // Now that all the non-thread safe fields of the graph_task have been populated,
          // we can enqueue it.
          // 給 queue 之上插入 NodeTask,這樣就會喚醒對應(yīng)線程開始工作
          queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
      
          // The owning thread start to drive the engine execution for any CPU task that
          // was just pushed or will be added later from other worker threads
          lock.unlock();
          thread_main(graph_task); // 在主線程運行,這里會在queue之上阻塞
          
          TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
          // reset the worker_device after the completion of the graph_task, this is so
          // that the initial state of the engine remains the same across every backward()
          // or grad() call, we don't need to reset local_ready_queue as we could possibly
          // reuse it for new backward calls.
          worker_device = NO_DEVICE;
          
          // 如果到了這里,必然是沒有活躍工作設(shè)備,就是所有 GraphTask都結(jié)束了,如果沒有結(jié)束,就是reentrant,必須走下面的case
          
          // 主線程  
        } else {
            
          // 重入后向傳播狀況下的主線程
            
          // If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
          //    backward call from that device.
          graph_task->owner_ = worker_device;
      
          // Now that all the non-thread safe fields of the graph_task have been populated,
          // we can enqueue it.
          // 向 queue 插入第一個NodeTrask,就是 graph_root
          queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
      
          if (current_depth >= max_recursion_depth_) {
            // See Note [Reentrant backwards]
            // If reached the max depth, switch to a different thread
            // 達(dá)到最大重入深度,這里會啟動一個新的線程  
            add_thread_pool_task(graph_task); // 啟動GPU或者CPU線程
          } else {
            // Total depth needs to be updated only in this codepath, since it is
            // not used in the block above (when we call add_thread_pool_task).
            // In the codepath above, GraphTask.reentrant_depth_ is used to
            // bootstrap total_depth in the other thread.
            ++total_depth;
      
            // Get back to work while we wait for our new graph_task to
            // complete!
            ++current_depth;
            lock.unlock();
            thread_main(graph_task); // 在主線程運行,這里會在queue之上阻塞
            --current_depth;
            --total_depth;
      
            // The graph task should have completed and the associated future should
            // be marked completed as well since 'thread_main' above is a call
            // blocking an autograd engine thread.
            TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
          }
            
           // 重入后向傳播狀況下的主線程  
        }
        // graph_task_exec_post_processing is done when the Future is marked as
        // completed in mark_as_completed_and_run_post_processing.
        return graph_task->future_result_;
      }
      
      

      3.9 配置設(shè)備和ReadyQueue

      上節(jié)代碼之中,有如下代碼進(jìn)行配置設(shè)備。

      set_device(CPU_DEVICE);
      

      3.9.1 CPU_DEVICE

      可以看到,在 set_device 時候,如果不是 CPU_DEVICE,就設(shè)置 impl->setDevice

      void set_device(int device) {
        // NB: We MUST NOT construct the guard for device CPU,
        // as in some settings we compile with cuda, but
        // have lazy stubs for CUDA functionality (so actually
        // attempting to setup a guard(CPU_DEVICE) will cause an
        // error, because it will still query cudaGetDevice).
        //
        // Don't use DeviceGuard here because its destructor may be called before the
        // device is reset. This is fine because the device is thread local.
        if (device != CPU_DEVICE) {
          for (size_t i = 0; i < static_cast<size_t>(c10::DeviceType::COMPILE_TIME_MAX_DEVICE_TYPES); i++) {
            auto* impl = c10::impl::device_guard_impl_registry[i].load();
            if (impl && device < impl->deviceCount()) {
              impl->setDevice(at::Device(static_cast<c10::DeviceType>(i), device));
            }
          }
        }
        worker_device = device;
      }
      

      除了初始化時候調(diào)用 set_device(CPU_DEVICE),在Engine::thread_init也會調(diào)用,就是啟動設(shè)備線程時候用到的,設(shè)置了設(shè)備序列號。這個序列號就可以和 ReadyQueue 對應(yīng)

      auto Engine::start_device_threads() -> void {
        for (int i = 0; i < num_devices; ++i) {
          std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true);
          t.detach();
        }
      }
      
      void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) {
        ...
      	set_device(device);
        ...
      }
      

      3.9.2 Ready Queue

      上節(jié)代碼之中,有如下代碼獲取 queue。

      // 獲取到cpu相關(guān)queue,具體是利用 input_buffer.device() 獲得的。
      auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());
      

      我們具體看看是如何獲取當(dāng)前queue的,這是根據(jù)本GraphTask的 CPU queue 和 配置的輸入device 一起計算得出的,即:

      • 調(diào)用 InputBuffer::device() 從輸入獲取設(shè)定的設(shè)備,如果設(shè)定了,就使用這個設(shè)備,否則使用 CPU。
      • 如果是CPU,就使用 cpu_ready_queue,否則使用 device_ready_queues_。

      進(jìn)一步解析如下:

      每個GraphTask都有自己的CPU queue,但是 GPU Queues 被所有GraphTask共享。

      // CPU ready queue is per GraphTask, but CUDA device ready queues are shared across all graph tasks
      auto Engine::ready_queue(std::shared_ptr<ReadyQueue> cpu_ready_queue, at::Device device) -> std::shared_ptr<ReadyQueue>{
        if (device.type() == at::kCPU || device.type() == at::DeviceType::Meta) {
          // return the cpu ready queue passed in
          return cpu_ready_queue;
        } else {
          // See Note [Allocating GPUs to autograd threads]
          return device_ready_queues_.at(device.index());
        }
      }
      

      InputBuffer::device 是從輸入?yún)?shù)中獲取設(shè)備。這里如果有配置設(shè)備,就返回,否則返回 at::kCPU。

      auto InputBuffer::device() const -> at::Device {
        // Since we pick the first non-CPU tensor, this won't work with
        // mixed device-type operations (e.g., an op that is both CUDA
        // and XLA).  This is *incredibly* unlikely, so we don't worry
        // about it.
        for (auto& var : buffer) {
          if (var.defined()) {
            auto device = var.device();
            if (device.type() != at::kCPU) {
              return device;
            }
          }
        }
        // Only report to the CPU thread if there really were no tensors
        // from other devices.
        return at::kCPU;
      }
      

      3.10 主進(jìn)程等待

      在 Engine::execute 最后,通過如下代碼,主進(jìn)程進(jìn)入了等待狀態(tài)

        // Avoid a refcount bump for the Future, since we check for refcount in
        // DistEngine (see TORCH_INTERNAL_ASSERT(futureGrads.use_count() == 1)
        // in dist_engine.cpp).
        // 主進(jìn)程進(jìn)行阻塞等待,等待 graph_task->future_result_。
        auto& fut = graph_task->future_result_;
        fut->wait();
        return fut->value().toTensorVector();
      
      

      這里使用 std::shared_ptr<at::ivalue::Future> future_result_; 來進(jìn)行線程間通信。

      主線程用wait等待,工作線程用markCompleted來通知主進(jìn)程結(jié)束。

      0x04 啟動線程

      因為線程部分比較復(fù)雜,所以我們從啟動部分提出來單獨分析。

      因為大型模型往往梯度數(shù)目太多,所以PyTorch 采用了多線程處理。為了應(yīng)對各種情況,PyTorch 定義了一個線程變量 worker_device。引擎生成的線程都被分配一個 "worker_device",指定它們?yōu)槟膫€設(shè)備處理工作。此變量在以下位置初始化:

      • 在CUDA,XLA設(shè)備線程的創(chuàng)建時間初始化,因為他們正等待在自己的設(shè)備上工作。
      • 在CPU線程的圖任務(wù)執(zhí)行之前初始化,因為對于每個后向調(diào)用,我們使用調(diào)用線程(caller thread)來驅(qū)動引擎執(zhí)行。
      static constexpr int NO_DEVICE = -2;
      static constexpr int CPU_DEVICE = -1;
      
      // Threads spawned by the engine are assigned a 'worker_device' specifying
      // what device they process work for. This variable is initialized at:
      // 1. thread creation time for CUDA, XLA device threads, as they are
      //    spinning threads waiting for works on their device.
      // 2. before the graph task execution for CPU threads, as for each
      //    backward call we use the caller thread to drive engine execution.
      // This is used when handling reentrant backwards calls;
      // See Note [Reentrant backwards]
      static thread_local int worker_device = NO_DEVICE;
      

      4.1 啟動設(shè)備線程

      4.1.1 調(diào)用

      execute_with_graph_task 方法之中,使用 initialize_device_threads_pool 啟動 start_device_threads。

      std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
          const std::shared_ptr<GraphTask>& graph_task,
          std::shared_ptr<Node> graph_root,
          InputBuffer&& input_buffer) {
        
        // 這里首先會啟動工作線程
        initialize_device_threads_pool();
      

      這里使用std::call_once來確保在整個進(jìn)程周期之內(nèi),start_device_threads成員函數(shù)只被調(diào)用了一次,即設(shè)備線程只生成一次。

      void Engine::initialize_device_threads_pool() {
        track_bad_autograd_forks();
        std::call_once(start_device_threads_flag_, &Engine::start_device_threads, this);
      }
      

      4.1.2 線程數(shù)目

      在引擎之中,工作線程的數(shù)目是依據(jù)設(shè)備數(shù)量決定的。如果有n個設(shè)備,就會啟動n個設(shè)備工作線程。比如,如果有2個GPU,則啟動2個設(shè)備工作線程。但是每一個GraphTask都有自己的CPU工作線程(我們接下來馬上介紹)。

      GPU工作線程對應(yīng)的 ReadyTask 是 Engine 之中的 成員變量。

      std::vector<std::shared_ptr<ReadyQueue>> device_ready_queues_;
      

      此時兩個GPU工作線程對應(yīng)的 ready queue index 分別是 0, 1。

      device_ready_queues_ 定義在引擎之中,也說明 devices queues 是在所有 GraphTask 之間共享,而CPU queue是每個 GraphTask 獨有。設(shè)備線程的啟動是在 start_device_threads 函數(shù),可以看到其調(diào)用了 std::thread 啟動了線程,并且用 detach 讓其獨立運行。

      4.1.3 啟動設(shè)備線程

      start_device_threads 用來啟動設(shè)備線程,設(shè)備線程數(shù)目與設(shè)備數(shù)目相關(guān),這樣 NUM_DEVICES 個線程在后臺一起處理 GraphTask 中的任務(wù)。

      • 使用deviceCount得到設(shè)備數(shù)量 num_devices。

      • 然后根據(jù)設(shè)備的數(shù)量來決定要啟動的設(shè)備線程數(shù)量。

      • 創(chuàng)建多個ReadyQueue,ReadyQueue數(shù)目和工作線程數(shù)目一樣。這些ReadyQueue 在 Engine 的 device_ready_queues_ 之上被管理。device_ready_queues_ 就是管理GPU。

      • 創(chuàng)建設(shè)備線程。每個線程都是std::thread,構(gòu)建時候,把對應(yīng)的ReadyQueue,就是device_ready_queues_[i] 和 Engine(整個進(jìn)程生命周期只有一個Engine實例)都傳遞進(jìn)去。這樣Queue可以依靠Engine對于device_ready_queues_的共享來實現(xiàn)線程間工作對象傳輸。

      • 作為對比,GraphTask 專門有一個ReadyQueue(cpu_ready_queue_)是用來跑CPU相關(guān)工作線程。因為CPU工作線程專門用來處理反向傳播的CPU工作。當(dāng) GraphTask 在某一個GPU之上的工作結(jié)束了,下一個 NodeTask 應(yīng)該切換到 CPU 之上,所以GraphTask需要記住自己的cpu_ready_queue_ ,從而給 cpu_ready_queue_ 發(fā)送消息。

        注意,cpu_ready_queue_ 這是 GraphTask 專有的 ReadyQueue。

      auto Engine::start_device_threads() -> void {
        // See Note [Allocating GPUs to autograd threads]
        // 使用deviceCount得到 設(shè)備數(shù)量 num_devices。  
        c10::DeviceIndex num_devices = 0;
        for (const auto& impl_atomic : c10::impl::device_guard_impl_registry) {
          auto* impl = impl_atomic.load();
          if (impl) {
            num_devices = std::max(num_devices, impl->deviceCount());
          }
        }
      
        // allocate one thread for every GPU device (but colocate GPUs of different
        // types), and pre-allocate the device_ready_queues_ to ensure safe reading on it.
        // 創(chuàng)建多個ReadyQueue,ReadyQueue數(shù)目和工作線程數(shù)目一樣
        device_ready_queues_ = std::vector<std::shared_ptr<ReadyQueue>>(num_devices);
        for (auto& queue : device_ready_queues_)    {
          queue.reset(new ReadyQueue());
        }
      
        thread_pool_shared_ = std::make_shared<ThreadPoolShared>();
        // 創(chuàng)建設(shè)備線程
        for (int i = 0; i < num_devices; ++i) {
          std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true);
          t.detach(); // 讓工作線程獨立運行
        }
        // Wait for the threads to start
        {
          std::unique_lock<std::mutex> lk(non_reentrant_device_thread_mutex_);
          while(non_reentrant_device_thread_count_.load() != static_cast<uint32_t>(num_devices)) {
            non_reentrant_device_thread_condvar_.wait(lk);
          }
        }
      }
      

      設(shè)備線程啟動之后,都使用 wait 阻塞在自己對應(yīng)的 ReadyQueue 之中,主線程或者其他worker線程通過對 某一個設(shè)備線程的ReadyQueue 采用 push(NodeTask) 操作來喚醒該設(shè)備線程進(jìn)行工作。

      4.2 線程初始化

      線程會調(diào)用 thread_init 進(jìn)行初始化,這里會:

      • 配置線程的設(shè)備。
      • 調(diào)用 init_local_ready_queue 來初始化局部queue。
      • 調(diào)用 thread_main 作為線程主體函數(shù)來執(zhí)行。

      初始化本地queue的代碼如下:

      void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
        if (ready_queue) {
          // if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
          local_ready_queue = std::move(ready_queue);
        } else if (!local_ready_queue){
          // otherwise if local_ready_queue not allocated, allocate a new ready_queue
          local_ready_queue = std::make_shared<ReadyQueue>();
        }
      }
      

      每個autograd工作線程都會與一個ready queue關(guān)聯(lián),這個queue就是這個線程的工作流。local_ready_queue 使用 std::shared_ptr 來作為 本地線程指針。

      // Every autograd worker thread is associated with a ready queue, which specifies
      // the stream of work of this thread to do. This shared_ptr is a thread_local
      // pointer to each thread's ready_queue, and it should be initialized via the
      // Engine::init_local_ready_queue() call in each corresponding thread before execution.
      //
      // The CUDA, XLA threads are shared among all invocations of backwards via
      // device_ready_queues_, while CPU threads are dedicated to processing CPU work for
      // the backward they invoked. So any given graph task maintains its own cpu_ready_queue_
      // where you should send work for it to be done
      //
      // For reentrant backward calls, if we spawn new thread from the current thread
      // because we reached the maximum depth, the new thread will just reuse the same
      // ReadyQueue with the parent thread for performance improvement.
      // see Note [Reentrant backwards] for more details.
      // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
      static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;
      

      具體初始化的代碼如下:

      void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) {
        if (should_increment) {
          increment_non_reentrant_thread_count();
        }
      
        at::init_num_threads();
      
        // Note [Allocating GPUs to autograd threads]
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        // What's our strategy here?  Originally, the autograd engine was written
        // with only CUDA in mind.  We allocate one thread to handle all CPU
        // operations, and a thread per CUDA device.
        //
        // But what if we have OTHER devices?  There are two plausible
        // strategies:
        //
        //  - We can allocate threads equal to max(num_cuda_devices, num_xla_devices,
        //    ...) and colocate cuda device 0 with xla device 0
        //  - We can allocate threads equal to sum(num_cuda_devices, num_xla_devices,
        //    ...) keeping everyone separate.
        //
        // We don't have any good reason to prefer one or the other, so we've
        // arbitrarily picked to colocate devices.  Maybe the other approach is
        // better.
        set_device(device);
      
        // initialize each device thread's thread local ready queue with the ready queue
        // that is created before the thread initialization
        init_local_ready_queue(ready_queue);
      
        std::shared_ptr<GraphTask> graph_task = nullptr;
        thread_main(graph_task);
        if (should_increment) {
          // Decrement the count during shutdown if we incremented earlier.
          decrement_non_reentrant_thread_count();
        }
      }
      

      目前邏輯如下,生成了一系列工作線程,也生成了device_ready_queues_:

      +------------------------+                                     +-----------------------+
      | GraphTask              |                                     | Main Thread           |
      |                        |       |-----------------|           |                       |
      |     cpu_ready_queue_+------->  | CPU ReadyQueue  | <-----------+ local_ready_queue   |
      |                        |       +-----------------+           |                       |
      |                        |                                     |                       |
      +------------------------+                                     +-----------------------+
      
      
      
      
      +------------------------+
      | Engine                 |    +----------------------+         +-----------------------+
      |                        |    | Device ReadyQueues   |         | Worker Thread 1       |
      |                        |    |                      |         |                       |
      |  device_ready_queues_ +---> |                      |         |                       |
      |                        |    |    +-------------+   |         |                       |
      |                        |    |    | ReadyQueue 1| <-------------+ local_ready_queue   |
      +------------------------+    |    +-------------+   |         |                       |
                                    |                      |         |                       |
                                    |                      |         |                       |
                                    |                      |         +-----------------------+
                                    |                      |
                                    |          .           |
                                    |          .           |
                                    |          .           |         +-----------------------+
                                    |                      |         | Worker Thread 2       |
                                    |    +-------------+   |         |                       |
                                    |    | ReadyQueue 2| <-------------+ local_ready_queue   |
                                    |    +-------------+   |         |                       |
                                    |                      |         |                       |
                                    |                      |         +-----------------------+
                                    |    +-------------+   |
                                    |    | ReadyQueue 3|   |
                                    |    +-------------+   |
                                    |                      |
                                    +----------------------+
      
      

      然后會調(diào)用 thread_main 進(jìn)行線程主體,我們下文會分析。

      4.3 可重入反向傳播

      4.3.1 示例

      從PyTorch 的測試代碼之中可以看到,在反向傳播之中也會調(diào)用反向傳播。

      在這種情況下,Engine 之內(nèi)會存在多個 GraphTask。

             class MyFunction(Function):
                  @staticmethod
                  def forward(ctx, x):
                      return x
      
                  @staticmethod
                  def backward(ctx, x):
                      order.append("MyFunction")
                      return x
      
              class Reentrant(Function):
                  @staticmethod
                  def forward(ctx, x):
                      with torch.enable_grad():
                          ctx.x = Variable(x.detach(), requires_grad=True)
                          ctx.x = ctx.x - 1
                      return ctx.x.detach()
      
                  @staticmethod
                  def backward(ctx, x):
                      order.append("Reentrant")
                      if ctx.x < 0:
                          return x
                      with torch.enable_grad():
                          Reentrant.apply(ctx.x).backward()
                      return x
      
              a = MyFunction.apply(torch.tensor(6.0, requires_grad=True))
              b = Reentrant.apply(torch.tensor(9.0, requires_grad=True))
              v = a * b
              v.backward()
      
      

      4.3.2 設(shè)計理念

      以下是 PyTorch 的設(shè)計理念

      為了理解可重入向后問題,我們必須注意autograd引擎目前的實現(xiàn)方式的兩個方面:

        1. 當(dāng)您調(diào)用 Engine::execute() 時,您希望阻塞直到微分完成,以便可以獲得向后傳遞的最終結(jié)果變量。
        1. 引擎運行時,每個工作隊列之上有一個工作線程來運行,每個工作隊列固定到執(zhí)行操作的特定設(shè)備上。

      問題是,假設(shè)您在工作線程內(nèi)部調(diào)用 backward()。

      根據(jù)屬性 (1),我們應(yīng)該阻塞,直到嵌套任務(wù)完成。但是,根據(jù)屬性(2),這個工作線程負(fù)責(zé)處理分配給它的任務(wù);我們最好不要被阻塞。因為那種情況下,我們所有的反向執(zhí)行(包括我們剛剛開始的那一次)都會死鎖!

      所以,我們維護(hù)一個等待分配工作的線程池。

      當(dāng)發(fā)生可重入向后調(diào)用時,當(dāng)前線程將被阻塞,池中的一個線程將被喚醒,以完成阻塞任務(wù)和分配給該輔助線程的任何其他任務(wù)。如果沒有可用的線程,將生成一個新線程。新線程將繼續(xù)處理與父工作線程同一個ReadyQueue中的任務(wù)。當(dāng)目前GraphTask完成后,將通知正在等待任務(wù)的父工作線程,并且當(dāng)前線程將被返回給線程池。

      4.3.3 實現(xiàn)

      當(dāng)發(fā)現(xiàn)是可重入后向傳播時,而且超出最大遞歸深度,Engine::execute_with_graph_task 會調(diào)用如下代碼向線程池加入一個新線程。

          if (current_depth >= max_recursion_depth_) {
            // See Note [Reentrant backwards]
            // If reached the max depth, switch to a different thread
            add_thread_pool_task(graph_task);
          }
      

      相關(guān)數(shù)據(jù)結(jié)構(gòu)如下:

        struct ThreadPoolShared {
          // Data structures used by the threads for executing reentrant backwards
          // tasks. See Note [Reentrant backwards]
          // Number of available threads for processing new GraphTasks.
          unsigned int num_workers_;
          // The threads will wait on work_ to be notified of GraphTasks
          std::condition_variable work_;
          // To protect reads and writes to graphtask_queue_ and num_workers_
          // and for synchronizing creating new threads when needed
          std::mutex mutex_;
          // Workers will process the GraphTasks added to this queue. A GraphTask is
          // allocated inside Engine::execute and lives for the duration of execute
          std::queue<std::weak_ptr<GraphTask>> graphtasks_queue_;
      
          ThreadPoolShared() : num_workers_(0) {}
       };
      

      add_thread_pool_task 代碼如下。

      • 這里判斷是否 graphtask 隊列達(dá)到最大值,如果沒有達(dá)到,則建立一個新線程。
      • 把 graph_task 放入隊列 graphtasks_queue_。
      • 新線程的執(zhí)行函數(shù)是 reentrant_thread_init,其會等待在 thread_pool_shared_->work_ 之上。
      • 這里會 thread_pool_shared_->work_.notify_one() 讓新線程運行。
      void Engine::add_thread_pool_task(const std::weak_ptr<GraphTask>& graph_task) {
        std::unique_lock<std::mutex> lck(thread_pool_shared_->mutex_);
        // There may already be some items on the graphtasks_queue_ added by other
        // threads but not enough workers to get to the new task that will be
        // added
        bool create_thread = (thread_pool_shared_->num_workers_ <= thread_pool_shared_->graphtasks_queue_.size());
        thread_pool_shared_->graphtasks_queue_.push(graph_task);
        // Don't need to be holding the lock while actually creating the thread
        lck.unlock();
        if (create_thread) {
          std::thread t(&Engine::reentrant_thread_init, this);
          t.detach();
        }
        // This works even if new thread is created because wait() will test the
        // predicate before waiting
        thread_pool_shared_->work_.notify_one();
      }
      

      新線程執(zhí)行函數(shù) reentrant_thread_init 如下:

      • 與graph_task's 原線程共享 cpu_ready_queue。
      • 其從 graphtasks_queue_ 獲取 GraphTask,賦值給 graph_task。
      • 然后用 thread_main(graph_task) 來執(zhí)行。
      // Reentrant call will re-use the graph_task's owner thread ready_queue for
      // queueing tasks (NOTE: this is not true in the async_mode of the engine).
      // While we can create separate ready queue for each new reentrant
      // thread, but sharing the same cpu_ready_queue with parent thread is a
      // performance improvement and cuda thread still have to do the same thing.
      void Engine::reentrant_thread_init() {
        at::init_num_threads();
        auto tp_shared = thread_pool_shared_;
        while(true) {
          std::unique_lock<std::mutex> lk(tp_shared->mutex_);
          ++thread_pool_shared_->num_workers_;
          tp_shared->work_.wait(lk, [&tp_shared]{ return !tp_shared->graphtasks_queue_.empty();});
          --thread_pool_shared_->num_workers_;
          auto task = tp_shared->graphtasks_queue_.front();
          tp_shared->graphtasks_queue_.pop();
          lk.unlock();
          std::shared_ptr<GraphTask> graph_task;
          if (!(graph_task = task.lock())) {
            continue;
          }
          set_device(graph_task->owner_);
          // set the local_ready_queue to the ready queue on the graph_task->owner_ device
          local_ready_queue = ready_queue_by_index(graph_task->cpu_ready_queue_, graph_task->owner_);
          total_depth = graph_task->reentrant_depth_;
          thread_main(graph_task); // 這里調(diào)用了線程函數(shù)
        }
      }
      

      4.4 主線程

      除了上述線程之外,引擎還有一個主線程。這里使用 NO_DEVICE 來標(biāo)識。如前所示,也會用 CPU_DEVICE 來臨時做重入判別,但是依然是主線程。

      static constexpr int CPU_DEVICE = -1;
      static constexpr int NO_DEVICE = -2;
      

      4.5 流程解析

      我們通過 execute_with_graph_task 來解析一下線程之間的生成關(guān)系。

      std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
          const std::shared_ptr<GraphTask>& graph_task,
          std::shared_ptr<Node> graph_root,
          InputBuffer&& input_buffer) {
          
        initialize_device_threads_pool(); // 這里生成了設(shè)備線程
      
        // 這里指定了后續(xù)究竟是GPU還是CPU上運行,因為 input_buffer.device() 里面指定了運行的設(shè)備,所以依據(jù)這個設(shè)備,獲取到了對應(yīng)的 queue
        auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());
      
        if (worker_device == NO_DEVICE) { // 判斷是否已經(jīng)運行了反向傳播
      
          // 主線程  
            
          set_device(CPU_DEVICE);
          graph_task->owner_ = worker_device; // set the graph_task owner to the current device
          queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
       
          thread_main(graph_task); // thread_main 依然是被主線程執(zhí)行,內(nèi)部通過 pop 阻塞等待
      
          // 主線程  
          worker_device = NO_DEVICE;
        } else {
          // 主線程,可重入的反向傳播  
      
          // If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
          //    backward call from that device.     
          graph_task->owner_ = worker_device; // 指定是哪個設(shè)備,是 GPU 或者 CPU
          queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
      
          if (current_depth >= max_recursion_depth_) {
            // 達(dá)到最大重入深度,這里會啟動一個新的線程  
            add_thread_pool_task(graph_task); // add_thread_pool_task 里面是GPU線程 或者 CPU線程取決于 worker_device
          } else {
            ++total_depth;
            ++current_depth;
            thread_main(graph_task); // thread_main 依然是被主線程執(zhí)行,內(nèi)部通過 pop 阻塞等待
            --current_depth;
            --total_depth;
          }
           
          // 主線程,可重入的反向傳播   
        }
      
        return graph_task->future_result_;
      }
      

      具體線程關(guān)系如下:

      1. 主線程使用 push(NodeTask) 往 GraphTask.cpu_ready_queue_ 插入 NodeTask 0。
      2. 主線程使用 thread_main 從 GraphTask.cpu_ready_queue_ 取出 NodeTask 0,假設(shè)這個 NodeTask 0 的設(shè)備index 是 1。
      3. 主線程使用 thread_main 往 device 1 對應(yīng)的 ReadyQueue 插入 NodeTask 1。
      4. 設(shè)備線程 1 阻塞在 device 1 對應(yīng)的 ReadyQueue 1,這時候被喚醒,取出 NodeTask 1。
      5. 設(shè)備線程 1 處理 NodeTask 1,得到其后續(xù)的邊,如果這個邊的設(shè)備是 device 2, 那么生成一個 NodeTask 2,這個NodeTask 2 設(shè)備就是 2。然后把 NodeTask 2 插入 ReadyQueue 2。
      6. 設(shè)備線程 2 阻塞在 device 2 對應(yīng)的 ReadyQueue 2,這時候被喚醒,取出 NodeTask 2,繼續(xù)處理。
                                     +-------------------------+
                                     | GraphTask               |
                                     |                         |
                                     |        cpu_ready_queue_ |
                                     |            +            |
                                     |            |            |
                                     +-------------------------+
                                                  |
      +------------------+                        |
      | Main Thread      |                        v
      |                  |      1        +--------+---------+
      |                  |push(NodeTask0)|                  |                 +-------------------+
      |           +--------------------->+  CPU ReadyQueue  |                 | Worker Thread 1   |
      |                  |               |                  |      4          |                   |
      | local_ready_queue|               +---+--------------+ pop(NodeTask1)  |                   |
      |                  |   2 pop()         |                   +------------------>             |
      |           +--------------------------+                   |            |                   |
      |           |      |               +--------------------+  |            |                   |
      |           |      |               | Device ReadyQueues |  |            | local_ready_queue |
      |           |      |               |                    |  |            |                   |
      |           |      |    3          |                    |  |            |                   |
      |           |      |push(NodeTask1)|   +-------------+  |  |            |                   |
      |           +------------------------> | ReadyQueue 1+-----+            |                   |
      |                  |               |   +-------------+  |      5        |                   |
      |                  |               |                    |push(NodeTask2)|                   |
      +------------------+               |                    |     +---------------+             |
                                         |                    |     |         |                   |
                                         |                    |     |         +-------------------+
                                         |                    |     |
                                         |              +-----------+         +-------------------+
       +---------------------------+     |              |     |               | Worker Thread 2   |
       | Engine                    |     |              |     |               |                   |
       |                           |     |              v     |               |                   |
       |                           |     |   +----------+--+  |               |                   |
       |   device_ready_queues_ +----->  |   | ReadyQueue 2+------------------------>             |
       |                           |     |   +-------------+  |pop(NodeTask2) |                   |
       |                           |     |                    |    6          | local_ready_queue |
       +---------------------------+     |                    |               |                   |
                                         |                    |               |                   |
                                         |   +-------------+  |               |                   |
                                         |   | ReadyQueue 3|  |               |                   |
                                         |   +-------------+  |               |                   |
                                         |                    |               |                   |
                                         +--------------------+               +-------------------+
      
      

      手機(jī)如下:

      0xFF 參考

      詳解Pytorch中的網(wǎng)絡(luò)構(gòu)造

      PyTorch的優(yōu)化器

      PyTorch的分布式

      PyTorch的Tensor(下)

      PyTorch的Tensor(中)

      PyTorch的Tensor(上)

      PyTorch的動態(tài)圖(下)

      PyTorch的動態(tài)圖(上)

      posted @ 2021-10-29 16:48  羅西的思考  閱讀(1965)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 不卡乱辈伦在线看中文字幕| 国产卡一卡二卡三免费入口| 亚洲中文日韩一区二区三区| 亚洲乱理伦片在线观看中字| 国产情侣一区二区三区| 国产伦精品一区二区亚洲| 五月天中文字幕mv在线| 亚洲色成人网站www永久下载| 日韩AV片无码一区二区不卡| 丁青县| 西盟| 欧美成人免费一区二区三区视频 | 亚洲香蕉网久久综合影视| 日韩有码国产精品一区| 亚洲日本国产精品一区| 无码人妻丰满熟妇啪啪网不卡| 国产日韩欧美| 国产成人精品无码免费看| 毛茸茸性xxxx毛茸茸毛茸茸| 无码福利写真片视频在线播放| 精品亚洲一区二区三区四区| 久久不卡精品| 国产亚洲精品AA片在线爽| 亚洲国产综合精品2020| 日韩一区二区三区精彩视频| 蜜桃av亚洲精品一区二区| 真实国产老熟女无套中出| 人妻少妇偷人无码视频| 久草热大美女黄色片免费看| 在线中文字幕国产一区| 9久9久热精品视频在线观看| 午夜在线观看成人av| 天堂…中文在线最新版在线| 日韩av高清在线看片| 韩国V欧美V亚洲V日本V| 根河市| 91精品国产老熟女在线| 成年女人碰碰碰视频播放| 97色伦97色伦国产| 精品中文字幕人妻一二| 亚洲色婷婷久久精品av蜜桃久久|