<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【gRPC】C++異步服務端客戶端API實例及代碼解析

      對于同步API而言,程序的吞吐量并不高。因為在每次發送一個gRPC請求時,會阻塞整個線程,必須等待服務端的ack回到客戶端才能繼續運行或者發送下一個請求,因此異步API是提升程序吞吐量的必要手段。

      gRPC異步操作依賴于完成隊列CompletionQueue
      官網教程:https://grpc.io/docs/languages/cpp/async/
      參考博客1:https://www.luozhiyun.com/archives/671
      參考博客2:https://blog.miigon.net/posts/cn-so-difference-between-sync-and-async-grpc/

      整體思路概述:

      • 將一個完成隊列CompletionQueue綁定到RPC調用
      • 在客戶端與服務器兩端執行寫入或讀取之類的操作,同時帶有唯一的void*標簽
      • 調用CompletionQueue::Next以等待操作完成。如果出現標簽,則表示相應的操作完成

      異步客戶端:

      要點

      • 就像同步客戶端一樣,我們需要創建一個存根stub用于進行gRPC方法的調用,但是此時我們需要調用的是異步方法,需要為其綁定一個完成隊列
      //客戶端的類實例在初始化時就需要創建Channel和Stub了,具體看官方實例代碼中的greeter_async_client.cc
      CompletionQueue cq;//創建完成隊列
      std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
          stub_->AsyncSayHello(&context, request, &cq));//將完成隊列綁定到存根,進而創建出客戶端異步響應讀取器
      
      • rpc->StartCall()初始化RPC請求
      • rpc->Finish()有三個參數,分別是gRPC響應、最終狀態、唯一標簽。一旦RPC請求完成,響應的結果、最終狀態會被封裝到前兩個傳入的參數中,同時會附帶唯一標簽。唯一標簽可以使用RPC請求的地址,這樣一來,當最終的響應到來時,我們可以拿著該地址進行相應處理。
      • 異步處理響應,拿著唯一標簽可以獲取對應RPC請求的地址,進而進行相關處理。
      • 為了使得請求的發出與響應的處理相互之間不阻塞,需要把響應的處理獨立出一個線程

      代碼示例

      class GreeterClient {
       public:
        explicit GreeterClient(std::shared_ptr<Channel> channel)
            : stub_(Greeter::NewStub(channel)) {}
      
        // 客戶端SayHello
        void SayHello(const std::string& user) {
          // RPC請求數據封裝
          HelloRequest request;
          request.set_name(user);
      
          // 異步客戶端請求,存儲請求響應的狀態和數據的結構體等,在下方進行的定義
          AsyncClientCall* call = new AsyncClientCall;
      
          // 初始化response_reader
          // stub_->PrepareAsyncSayHello()創建一個RPC對象,但是不會立即啟動RPC調用
          call->response_reader =
              stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
      
          // StartCall()方法發起真正的RPC請求
          call->response_reader->StartCall();
      
          // Finish()方法前兩個參數用于指定響應數據的存儲位置,第三個參數指定了該次RPC異步請求的地址
          call->response_reader->Finish(&call->reply, &call->status, (void*)call);
        }
      
        // 不斷循環監聽完成隊列,對響應進行處理
        void AsyncCompleteRpc() {
          void* got_tag;
          bool ok = false;
      
          // 在隊列為空時阻塞,隊列中有響應結果時讀取到got_tag和ok兩個參數
          // 前者是結果對應的RPC請求的地址,后者是響應的狀態
          while (cq_.Next(&got_tag, &ok)) {
            // 類型轉換,獲取到的實際上是此響應結果對應的RPC請求的地址,在這個地址下保存了實際的響應結果數據
            AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
      
            // 驗證請求是否真的完成了
            GPR_ASSERT(ok);
      
            if (call->status.ok())
              std::cout << "Greeter received: " << call->reply.message() << std::endl;
            else
              std::cout << "RPC failed" << std::endl;
      
            // 完成了響應的處理后,清除該RPC請求
            delete call;
          }
        }
      
       private:
        // 異步客戶端通話,存儲一次RPC通話的信息,里面包含響應的狀態和數據的結構
        struct AsyncClientCall {
          // 服務器返回的響應數據
          HelloReply reply;
      
          // 客戶端的上下文信息,可以被用于向服務器傳達額外信息或調整某些RPC行為
          ClientContext context;
      
          // RPC響應的狀態
          Status status;
      
          // 客戶端異步響應讀取器
          std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
        };
      
        // 存根,在我們的視角里就是服務器端暴露的服務接口
        std::unique_ptr<Greeter::Stub> stub_;
      
        // 完成隊列,一個用于gRPC異步處理的生產者消費者隊列
        CompletionQueue cq_;
      };
      
      int main(int argc, char** argv) {
        // 實例化一個客戶端,需要一個信道,第二個參數表明該通道未經過身份驗證
        GreeterClient greeter(grpc::CreateChannel(
            "localhost:50051", grpc::InsecureChannelCredentials()));
      
        // 獨立的異步響應處理線程
        // 由于該方法是客戶端類內的非靜態方法,所以需要傳入客戶端類的實例表明歸屬
        std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
      
        //發送異步的請求SayHello()
        for (int i = 0; i < 100; i++) {
          std::string user("world " + std::to_string(i));
          greeter.SayHello(user);  // The actual RPC call!
        }
      
        std::cout << "Press control-c to quit" << std::endl << std::endl;
        thread_.join();  //永遠會阻塞,因為異步響應處理線程永遠不會停止,必須ctrl+c才能退出
      
        return 0;
      }
      

      異步服務器

      要點

      • 客戶端在發起請求時附帶了標簽(此次RPC請求會話的地址),因此服務器端也需要將該標簽妥善處理再返回
      • 官方API中是準備一個CallData對象作為容器,gRPC通過ServerCompletionQueue將各種事件發送到CallData對象中,然后讓這兒對象根據自身的狀態進行處理。處理完成后還需要再手動創建一個CallData對象,這個對象是為下一個Client請求準備的,整個過程就像流水線一樣
        • CREATE:CallData對象被創建處理之前處于CREATE狀態
        • PROCESS:請求到達后,轉換為PROCESS狀態
        • FINISH:響應完成后,轉換為FINISH狀態
      • 整體服務器端流程
        • 啟動服務時,預分配 一個 CallData 實例供未來客戶端請求使用。
        • 該 CallData 對象構造時,service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this) 將被調用,通知gRPC開始準備接收恰好是一個SayHello請求。
        • 這時候我們還不知道請求會由誰發出,何時到達,我們只是告訴 gRPC 說我們已經準備好接收了,讓 gRPC 在真的接收到時通知我們。
        • 供給 RequestSayHello的參數告訴了gRPC將上下文信息、請求體以及回復器放在哪里、使用哪個完成隊列來通知、以及通知的時候,用于鑒別請求的 tag(在這個例子中,this 被作為 tag 使用)。
          HandleRpcs() 運行到 cq->Next() 并阻塞。等待下一個事件發生
        • 客戶端發送一個 SayHello 請求到服務器,gRPC 開始接收并解碼該請求(IO 操作)
        • 一段時間后….
        • gRPC接收請求完成了。它將請求體放入CallData對象的request_成員中(通過我們之前提供的指針),然后創建一個事件(使用指向CallData 對象的指針作為 tag),并 將該事件放到完成隊列 cq_ 中.
        • HandleRpcs() 中的循環接收到了該事件(之前阻塞住的 cq->Next() 調用此時也返回),并調用 CallData::Proceed() 來處理請求。
        • CallData 的 status_ 屬性此時是 PROCESS,它做了如下事情:
          • 創建一個新的 CallData 對象,這樣在這個請求后的新請求才能被新對象處理。
          • 生成當前請求的回復,告訴 gRPC 我們處理完成了,將該回復發送回客戶端
          • gRPC 開始回復的傳輸 (IO 操作)
          • HandleRpcs() 中的循環迭代一次,再次阻塞在 cq->Next(),等待新事件的發生。
        • 一段時間后….
        • gRPC 完成了回復的傳輸,再次通過在完成隊列里放入一個以 CallData 指針為 tag 的事件的方式通知我們。
        • cq->Next() 接收到該事件并返回,CallData::Proceed() 將 CallData 對象釋放(使用 delete this;)。HandleRpcs() 循環并重新阻塞在 cq->Next() 上,等待新事件的發生。
          整個過程看似和同步 API 很相似,只是多了對完成隊列的控制。然而,通過這種方式,每一個 一段時間后.... (通常是在等待 IO 操作的完成或等待一個請求出現) cq->Next() 不僅可以接收到當前處理的請求的完成事件,還可以接收到其他請求的事件。
          所以假設第一個請求正在等待它的回復數據傳輸完成時,一個新的請求到達了,cq->Next() 可以獲得新請求產生的事件,并開始并行處理新請求,而不用等待第一個請求的傳輸完成

      代碼示例

      //服務器實現類
      class ServerImpl final {
       public:
        ~ServerImpl() {
          server_->Shutdown();
          // 關閉服務器后也要關閉完成隊列
          cq_->Shutdown();
        }
      
        // There is no shutdown handling in this code.
        void Run() {
          // 服務器地址和端口
          std::string server_address("0.0.0.0:50051");
          // 服務器構建器
          ServerBuilder builder;
          // 服務器IP與端口指定,第二個參數表示該通道未經過身份驗證
          builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
          // 注冊服務
          builder.RegisterService(&service_);
          // 為當前服務器創建完成隊列
          cq_ = builder.AddCompletionQueue();
          // 構建并啟動服務器
          server_ = builder.BuildAndStart();
          std::cout << "Server listening on " << server_address << std::endl;
      
          // 運行服務器的主流程
          HandleRpcs();
        }
      
       private:
        // 處理一個請求所需要保存的狀態、邏輯、數據被封裝成了CallData類
        class CallData {
         public:
          // 傳入service實例和服務器端的完成隊列,創建后status_是CREATE狀態
          CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
              : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
            // 立即調用服務邏輯
            Proceed();
          }
      
          void Proceed() {
            if (status_ == CREATE) {
              // 轉換為PROCESS狀態
              status_ = PROCESS;
      
              // 作為初始化的一部分,請求系統開始處理SayHello請求。
              // 此處的this指代此CallData實例
              service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                        this);
            } else if (status_ == PROCESS) {
              // 在執行當前CallData的任務時,創建一個新的CallData實例去為新的請求服務
              // 當前CallData會在FINISH階段自行銷毀
              new CallData(service_, cq_);
      
              // 實際的邏輯處理
              std::string prefix("Hello ");
              reply_.set_message(prefix + request_.name());
      
              // 在完成后將狀態置為FINISH。使用當前CallData的地址作為該事件的標簽
              status_ = FINISH;
              responder_.Finish(reply_, Status::OK, this);
            } else {
              GPR_ASSERT(status_ == FINISH);
              // 銷毀當前CallData
              delete this;
            }
          }
      
         private:
          // 異步服務
          Greeter::AsyncService* service_;
          //服務器端的完成隊列,是一個生產者-消費者隊列
          ServerCompletionQueue* cq_;
          // 服務器端上下文信息,可以被用于向客戶端傳達額外信息、數據或調整某些RPC行為
          ServerContext ctx_;
      
          // 客戶端發來的請求
          HelloRequest request_;
          // 服務端的響應
          HelloReply reply_;
      
          // 發送服務端響應的工具
          ServerAsyncResponseWriter<HelloReply> responder_;
      
          // 狀態機定義
          enum CallStatus { CREATE, PROCESS, FINISH };
          CallStatus status_;  // 當前的狀態
        };
      
        // 如果有需求的話,服務器的處理可以是多線程的
        void HandleRpcs() {
          // 創建一個新的CallData,將完成隊列中的數據封裝進去
          new CallData(&service_, cq_.get());
          void* tag;  // 請求特有的標簽,實際上是請求的RPC會話對象在客戶端的地址信息
          bool ok;
          while (true) {
            // 阻塞等待讀取完成隊列中的事件,每個事件使用一個標簽進行標識,該標簽是CallData實例的地址
            // 完成隊列的Next()方法應該每次都檢查返回值,來確保事件和完成隊列的狀態正常
            GPR_ASSERT(cq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            // 從標簽轉換為CallData*類型,進而訪問CallData中的方法
            static_cast<CallData*>(tag)->Proceed();
          }
        }
      
        // 當前服務器的完成隊列
        std::unique_ptr<ServerCompletionQueue> cq_;
        // 當前服務器的異步服務
        Greeter::AsyncService service_;
        // 服務器實例
        std::unique_ptr<Server> server_;
      };
      
      int main(int argc, char** argv) {
        ServerImpl server;
        server.Run();
        return 0;
      }
      
      
      posted @ 2022-09-06 23:43  縉云燒餅  閱讀(3557)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国精品无码一区二区三区左线| 亚洲天码中文字幕第一页| 亚洲国产高清精品线久久| 中文字幕在线无码一区二区三区| 久热天堂在线视频精品伊人| 久视频久免费视频久免费| 丰满熟女人妻一区二区三| 精品一区二区三区四区色| 精品人妻中文字幕在线| 国产不卡免费一区二区| 精品久久久久无码| 国产盗摄xxxx视频xxxx| 99久久er热在这里只有精品99| 亚洲av成人无网码天堂| 999福利激情视频| 天堂a无码a无线孕交| 国产精品播放一区二区三区| 国产97色在线 | 免| 亚洲一区二区色情苍井空| 国产精品一区二区在线欢| 亚洲欧美国产免费综合视频| 亚洲天堂亚洲天堂亚洲色图| 亚洲欧美成人一区二区在线电影| 麻豆一区二区中文字幕| 国产精品一二二区视在线| 亚洲第一区二区快射影院| 国产在线精品中文字幕| 国产不卡一区二区在线| 日韩有码av中文字幕| 国产精品户外野外| 又湿又紧又大又爽a视频| 国产中文三级全黄| 日韩一区二区三区高清视频| 亚洲综合无码一区二区| 亚洲日本韩国欧美云霸高清 | 国产第一区二区三区精品| 欧美成人免费一区二区三区视频| 亚洲一区二区av偷偷| 亚洲成av人片乱码色午夜| 91精品国产免费人成网站| 成人国产精品一区二区不卡|