<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      grpc使用記錄(三)簡單異步服務實例

      grpc使用記錄(三)簡單異步服務實例

      編寫異步服務和編寫同步服務的基本流程都差不多,稍有點區別。

      同步服務你只需要實現相關服務接口的實現即可,不需要管理太多東西。異步服務GRPC運行時會把讀取到的客戶端請求放入CompletionQueue中,需要主動從中取出,然后進行相關的處理,可以多線程也可以單線程。

      1、編寫proto文件,定義服務

      這里和grpc使用記錄(二)簡單同步服務實例中的一樣,這里就不多說了。

      2、編譯proto文件,生成代碼

      這里也是和grpc使用記錄(二)簡單同步服務實例中的一樣的。

      3、編寫服務端代碼

      這里可以復用前面同步服務的代碼,只需要做簡單的修改即可。

      簡單說一下創建一個GRPC異步服務的要點:

      • 1、創建服務對象的時候要創建AsyncService,而不是Service
      • 2、至少需要添加一個grpc::ServerCompletionQueue用于異步任務操作。
      • 3、必須要通過AsyncService::RequestXXXX來注冊XXXX接口的處理。
      • 4、一個客戶端請求的處理可簡單的分為兩個步驟:1、構建返回給客戶端的響應數據;2、發送響應數據給客戶端。
      • 5、完成隊列和注冊請求處理都可以有多個,不一定非得是一個。

      async_service.cpp

      下面代碼簡單的創建了3個HandlerContext的結構體類型,用于保存三個接口請求處理過程中的數據,實際的請求處理還是和之前同步服務的一樣,這里只是寫成了Test1Test2Test3三個函數的形式。

      // > g++ -o aservice async_service.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated
      
      #include "simple.grpc.pb.h"
      #include <grpcpp/grpcpp.h>
      
      #include <memory>
      #include <iostream>
      #include <strstream>
      
      struct HandlerContext {
        // 當前處理狀態(處理分為兩步:1處理請求構建響應數據;2發送響應)
        // 這里記錄一下完成到哪一步了,以便進行相關操作
        int                 status_; // (1構建響應完成;2發送完成)
        // rpc的上下文,允許通過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
        grpc::ServerContext ctx_;
      };
      
      struct HandlerTest1Context:public HandlerContext {
        // 用于接收客戶端發送的請求
        Simple::TestRequest req_;
        // 用于發送響應給客戶端
        Simple::TestNull    rep_;
      
        // 發送到客戶端的方法對象
        grpc::ServerAsyncResponseWriter<Simple::TestNull> responder_;
        // 構造函數
        HandlerTest1Context()
          :responder_(&ctx_)
        {}
      };
      
      struct HandlerTest2Context:public HandlerContext  {
        // 用于接收客戶端發送的請求
        Simple::TestNull req_;
        // 用于發送響應給客戶端
        Simple::TestReply   rep_;
      
        // 發送到客戶端的方法對象
        grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
        // 構造函數
        HandlerTest2Context()
          :responder_(&ctx_)
        {}
      };
      
      struct HandlerTest3Context:public HandlerContext {
        // 用于接收客戶端發送的請求
        Simple::TestRequest req_;
        // 用于發送響應給客戶端
        Simple::TestReply   rep_;
      
        // 發送到客戶端的方法對象
        grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
        // 構造函數
        HandlerTest3Context()
          :responder_(&ctx_)
        {}
      };
      
      
      // Test1 實現都是差不都的,這里只是為了測試,就隨便返回點數據了
      grpc::Status Test1(grpc::ServerContext*       context,
                         const Simple::TestRequest* request,
                         Simple::TestNull*          response)
      {
        printf("%s %d\n",__func__,__LINE__);
        std::ostrstream os;
        os << "Client Name = " << request->name() << '\n';
        os << "Clinet ID   = " << request->id()   << '\n';
        os << "Clinet Value= " << request->value()<< '\n';
        std::string message = os.str();
        // grpc狀態可以設置message,所以也可以用來返回一些信息
        return grpc::Status(grpc::StatusCode::OK,message);
      }
      // Test2
      grpc::Status Test2(grpc::ServerContext*       context,
                         const Simple::TestNull*    request,
                         Simple::TestReply*         response)
      {
        printf("%s %d\n",__func__,__LINE__);
        response->set_tid(100);
        response->set_svrname("Simple Server");
        response->set_takeuptime(0.01);
        return grpc::Status::OK;
      }
      // Test3
      grpc::Status Test3(grpc::ServerContext*       context,
                         const Simple::TestRequest* request,
                         Simple::TestReply*         response)
      {
        printf("%s %d\n",__func__,__LINE__);
        std::ostrstream os;
        os << "Client Name = " << request->name() << '\n';
        os << "Clinet ID   = " << request->id()   << '\n';
        os << "Clinet Value= " << request->value()<< '\n';
        std::string message = os.str();
      
        response->set_tid(__LINE__);
        response->set_svrname(__FILE__);
        response->set_takeuptime(1.234);
        // grpc狀態可以設置message
        return grpc::Status(grpc::StatusCode::OK,std::move(message));
      }
      
      int main()
      {
        // 服務構建器,用于構建同步或者異步服務
        grpc::ServerBuilder builder;
        // 添加監聽的地址和端口,后一個參數用于設置認證方式,這里選擇不認證
        builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
        // 創建一個異步服務對象
        Simple::Server::AsyncService service;
        // 注冊服務
        builder.RegisterService(&service);
      
        // 添加一個完成隊列,用于與 gRPC 運行時異步通信
        std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();
      
        // 構建服務器
        std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
        std::cout<<"Server Runing"<<std::endl;
        // 這里用一個map來記錄一下下面要進行處理的請求
        // 因為這里也是單線程的,所以不加鎖了
        std::map<HandlerContext*,int> handlerMap; // value用于記錄是Test1還是2、3
        {
          // 先創建三個類型接口的請求處理上下文對象
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1; // 設置狀態為1(因為只需要區分是否已經發送響應完成)
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;
      
          // 將三個上下文對象存入map中
          handlerMap[htc1] = 1; // 值用于區分是哪個類型
          handlerMap[htc2] = 2;
          handlerMap[htc3] = 3;
      
          // 進入下面死循環前需要先注冊一下請求
          service.RequestTest1(
              &htc1->ctx_         /*服務上下文對象*/,
              &htc1->req_         /*用于接收請求的對象*/,
              &htc1->responder_   /*異步寫響應對象*/,
              cq_ptr.get()        /*新的調用使用的完成隊列*/,
              cq_ptr.get()        /*通知使用的完成隊列*/,
              htc1                /*唯一標識tag*/);
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
        }
        // 異步服務這里不能使用 server.Wait() 來等待處理,因為是異步服務
        // 服務器會把到達的請求放入隊列,需要自己從完成隊列取出請求進行處理
        // 所以這里需要一個死循環來獲取請求并進行處理
        while(true){
          // 前面已經注冊了請求處理,這里阻塞從完成隊列中取出一個請求進行處理
          HandlerContext* htc = NULL;
          bool ok = false; 
          GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
          GPR_ASSERT(ok);
          // 根據tag判斷是哪一個請求
          // 因為前面注冊請求處理的時候使用的就是對象地址
          // 所以這里直接從map里面取出來判斷即可
          int type = handlerMap[htc];
          // 判斷狀態,看是不是已經響應發送了
          if(htc->status_ == 2) {
            // 從map中移除
            handlerMap.erase(htc);
            // 因為這里并不是多態類,必須根據類型操作
            switch(type) {
              case 1:
                {
                  // 釋放對象(這里未對這個對象進行復用)
                  delete (HandlerTest1Context*)htc;
                }
                break;
              case 2:
                {
                  delete (HandlerTest2Context*)htc;
                }
                break;
              case 3:
                {
                  delete (HandlerTest3Context*)htc;
                }
                break;
            }
            continue; // 回到從完成隊列獲取下一個
          }
      
          // 根據type進行相應的處理
          switch(type) {
            case 1: /*Test1的處理*/
              {
                // 重新創建一個請求處理上下文對象(以便不影響下一個請求的處理)
                HandlerTest1Context* htc1 = new HandlerTest1Context;
                htc1->status_ = 1;    // 設置狀態為1
                handlerMap[htc1] = 1; // 保存到handlerMap中
                service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc1);
      			
                HandlerTest1Context* h = (HandlerTest1Context*)htc;
                grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
            case 2: /*Test2的處理*/
              {
                HandlerTest2Context* htc2 = new HandlerTest2Context;
                htc2->status_ = 1;    // 設置狀態為1
                handlerMap[htc2] = 2; // 保存到handlerMap中
                service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc2);
      			
                HandlerTest2Context* h = (HandlerTest2Context*)htc;
                grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
            case 3: /*Test3的處理*/
              {
                HandlerTest3Context* htc3 = new HandlerTest3Context;
                htc3->status_ = 1;    // 設置狀態為1
                handlerMap[htc3] = 3; // 保存到handlerMap中
                service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc3);
      			
                HandlerTest3Context* h = (HandlerTest3Context*)htc;
                grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
          }
        }
        return 0;
      }
      

      async_service2.cpp

      上面雖然是使用到了grpc的異步服務機制,但是只是為了描述清楚異步服務的創建過程,是一個單線程的簡陋實現。下面寫一個使用線程池的實現。

      // > g++ -o aservice2 async_service2.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated
      
      // 線程池的代碼可見 http://www.rzrgm.cn/oloroso/p/5881863.html
      #include "threadpool.h"
      #include "simple.grpc.pb.h"
      #include <grpcpp/grpcpp.h>
      
      #include <memory>
      #include <iostream>
      #include <strstream>
      #include <chrono>
      
      struct HandlerContextBase {
        // 當前對象類型,用于確定是Test1/2/3哪一個請求的
        int                 type_;
        // 當前處理狀態(處理分為兩步:1處理請求構建響應數據;2發送響應)
        // 這里記錄一下完成到哪一步了,以便進行相關操作
        int                 status_; // (1構建響應完成;2發送完成)
        // rpc的上下文,允許通過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
        grpc::ServerContext ctx_;
      };
      
      template<typename RequestType,typename ReplyType>
      struct HandlerContext:public HandlerContextBase {
        // 用于接收客戶端發送的請求
        RequestType         req_;
        // 用于發送響應給客戶端
        ReplyType           rep_;
        // 發送到客戶端的方法對象
        grpc::ServerAsyncResponseWriter<ReplyType> responder_;
        //================================================
        // 構造函數
        HandlerContext()
          :responder_(&ctx_)
        {}
      
      };
      typedef HandlerContext<Simple::TestRequest,Simple::TestNull>  HandlerTest1Context;
      typedef HandlerContext<Simple::TestNull,Simple::TestReply>    HandlerTest2Context;
      typedef HandlerContext<Simple::TestRequest,Simple::TestReply> HandlerTest3Context;
      
      unsigned long get_tid()
      {
        std::thread::id tid = std::this_thread::get_id();
        std::ostrstream os;
        os << tid;
        unsigned long tidx = std::stol(os.str());
        return tidx;
      }
      
      // Test1 實現都是差不都的,這里只是為了測試,就隨便返回點數據了
      grpc::Status Test1(grpc::ServerContext*       context,
                         const Simple::TestRequest* request,
                         Simple::TestNull*          response)
      {
        printf("%s %d\n",__func__,__LINE__);
        std::ostrstream os;
        os << "Client Name = " << request->name() << '\n';
        os << "Clinet ID   = " << request->id()   << '\n';
        os << "Clinet Value= " << request->value()<< '\n';
        std::string message = os.str();
        // grpc狀態可以設置message,所以也可以用來返回一些信息
        return grpc::Status(grpc::StatusCode::OK,message);
      }
      // Test2
      grpc::Status Test2(grpc::ServerContext*       context,
                         const Simple::TestNull*    request,
                         Simple::TestReply*         response)
      {
        printf("%s %d\n",__func__,__LINE__);
        response->set_tid(100);
        response->set_svrname("Simple Server");
        response->set_takeuptime(0.01);
        return grpc::Status::OK;
      }
      // Test3
      grpc::Status Test3(grpc::ServerContext*       context,
                         const Simple::TestRequest* request,
                         Simple::TestReply*         response)
      {
        printf("%s %d\n",__func__,__LINE__);
        int tid = get_tid();
        std::ostrstream os;
        os << "Client Name = " << request->name() << '\n';
        os << "Clinet ID   = " << request->id()   << '\n';
        os << "Clinet Value= " << request->value()<< '\n';
        os << "Server TID  = " << tid<<'\n';
        std::string message = os.str();
        
        // 休眠0.5秒,以便觀察異步執行的效果
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
      
        response->set_tid(tid);
        response->set_svrname(__FILE__);
        response->set_takeuptime(1.234);
        // grpc狀態可以設置message
        return grpc::Status(grpc::StatusCode::OK,std::move(message));
      }
      
      int main()
      {
        // 服務構建器,用于構建同步或者異步服務
        grpc::ServerBuilder builder;
        // 添加監聽的地址和端口,后一個參數用于設置認證方式,這里選擇不認證
        builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
        // 創建一個異步服務對象
        Simple::Server::AsyncService service;
        // 注冊服務
        builder.RegisterService(&service);
      
        // 添加一個完成隊列,用于與 gRPC 運行時異步通信
        std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();
      
        // 構建服務器
        std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
        std::cout<<"Server Runing"<<std::endl;
        // 下面可以有幾個工作線程就先注冊幾個,也可以僅注冊一個(至少一個)
        /*for(int i=0;i<4;++i)*/ {
          // 先創建三個類型接口的請求處理上下文對象
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1; // 設置狀態為1(因為只需要區分是否已經發送響應完成)
          htc1->type_   = 1; // 設置類型為1
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;
          htc2->type_   = 2;
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;
          htc3->type_   = 3;
      
          // 進入下面死循環前需要先注冊一下請求
          service.RequestTest1(
              &htc1->ctx_         /*服務上下文對象*/,
              &htc1->req_         /*用于接收請求的對象*/,
              &htc1->responder_   /*異步寫響應對象*/,
              cq_ptr.get()        /*新的調用使用的完成隊列*/,
              cq_ptr.get()        /*通知使用的完成隊列*/,
              htc1                /*唯一標識tag*/);
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
        }
      
        // 創建線程池,使用4個工作線程,用于構建請求的響應
        ThreadPool pool(4);
      
        // 異步服務這里不能使用 server->Wait() 來等待處理,因為是異步服務
        // 服務器會把到達的請求放入隊列,需要自己從完成隊列取出請求進行處理
        // 所以這里需要一個死循環來獲取請求并進行處理
        while(true){
          // 前面已經注冊了請求處理,這里阻塞從完成隊列中取出一個請求進行處理
          HandlerContextBase* htc = NULL;
          bool ok = false; 
          GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
          GPR_ASSERT(ok);
          // 根據tag判斷是哪一個請求
          // 因為前面注冊請求處理的時候使用的就是對象地址
          // 所以這里直接從map里面取出來判斷即可
          int type = htc->type_;
          // 判斷狀態,看是不是已經響應發送了
          if(htc->status_ == 2) {
            // 因為這里并不是多態類,必須根據類型操作
            switch(type) {
              case 1:
                {
                  // 釋放對象(這里未對這個對象進行復用)
                  delete (HandlerTest1Context*)htc;
                }
                break;
              case 2:
                {
                  delete (HandlerTest2Context*)htc;
                }
                break;
              case 3:
                {
                  delete (HandlerTest3Context*)htc;
                }
                break;
            }
            continue; // 回到從完成隊列獲取下一個
          }
          
          // 重新創建一個請求處理上下文對象(以便能夠接受下一個請求進行處理)
          switch(type) {
            case 1:
              {
                HandlerTest1Context* htc1 = new HandlerTest1Context;
                htc1->status_ = 1;    // 設置狀態為1
                htc1->type_   = 1;    // 設置類型為1
                service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc1);
              }
              break;
            case 2:
              {
                HandlerTest2Context* htc2 = new HandlerTest2Context;
                htc2->status_ = 1;    // 設置狀態為1
                htc2->type_   = 1;    // 設置類型為2
                service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc2);
              }
              break;
            case 3:
              {
                HandlerTest3Context* htc3 = new HandlerTest3Context;
                htc3->status_ = 1;    // 設置狀態為1
                htc3->type_   = 3;    // 設置類型為3
                service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                                     cq_ptr.get(),cq_ptr.get(),htc3);
              }
              break;
          }
      
          pool.enqueue([type,htc](){
          // 根據type進行相應的處理
          switch(type) {
            case 1: /*Test1的處理*/
              {
                HandlerTest1Context* h = (HandlerTest1Context*)htc;
                grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
            case 2: /*Test2的處理*/
              {
                HandlerTest2Context* h = (HandlerTest2Context*)htc;
                grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
            case 3: /*Test3的處理*/
              {
                HandlerTest3Context* h = (HandlerTest3Context*)htc;
                grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
                // 設置狀態為發送響應
                h->status_ = 2;
                // 調用responder_進行響應發送(異步)
                h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
              }
              break;
          }
        });
        }
        return 0;
      }
      
      posted @ 2019-08-13 21:22  烏合之眾  閱讀(11326)  評論(0)    收藏  舉報
      clear
      主站蜘蛛池模板: 亚洲成av人片乱码色午夜| 99久久精品国产一区二区蜜芽| 美女自卫慰黄网站| 国产成人不卡一区二区| 中文国产不卡一区二区| 精品国产色情一区二区三区| 亚洲国产av久久久| 久久一日本道色综合久久| 亚洲午夜香蕉久久精品| 国产精品无码aⅴ嫩草| 国产精品不卡一区二区视频| 国产午夜福利免费入口| 乌克兰丰满女人a级毛片右手影院| 成人午夜大片免费看爽爽爽| 国产午夜亚洲精品不卡网站| 婷婷五月综合丁香在线| 国产91特黄特色A级毛片| 玩弄放荡人妻少妇系列| 亚洲夜色噜噜av在线观看| 久久综合色最新久久综合色| 亚洲AVAV天堂AV在线网阿V| 377P欧洲日本亚洲大胆| 国产精品制服丝袜第一页| 久久香蕉国产线看观看亚洲片| 麻豆国产成人AV在线播放| 监利县| 九九热在线免费视频精品| 亚洲AV国产福利精品在现观看| 玛多县| 国产午夜精品无码一区二区| 2021国产在线视频| 在线观看潮喷失禁大喷水无码| 亚洲鸥美日韩精品久久| 美女又黄又免费的视频| 国产成人免费午夜在线观看| 国产一区二区精品久久呦| 忘忧草社区在线www| 色噜噜在线视频免费观看| 东京热人妻丝袜无码AV一二三区观| 久久精品国产中文字幕| 欧美巨大极度另类|