<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【gRPC】C++異步服務端優化版,多服務接口樣例

      官方的C++異步服務端API樣例可讀性并不好,理解起來非常的費勁,各種狀態機也并不明了,整個運行過程也容易讀不懂,因此此處參考網上的博客進行了重寫,以求順利讀懂。

      C++異步服務端實例,詳細注釋版

      gRPC使用C++實現異步服務端的基本邏輯:

      • 構建數據結構來存儲需要處理的請求及其上下文信息,此處使用HandlerContext,相當于對到達的請求的封裝
      • 首先注冊各個接口的HandlerContext,放入完成隊列CompletionQueue中,當請求到達時,根據類型封裝進對應的HandlerContext,由于是異步客戶端,需要保證后面到達的請求也有HandlerContext用,所以用一個就要再創建一個空的放回去
      • 運行完的接口,其HandlerContext需要銷毀

      以下代碼的關鍵為run()方法中的邏輯以及HandlerContext的設置,每一步都有注釋,可以詳細觀看

      //官方樣例的異步服務代碼可讀性太差了,狀態機繞來繞去不直觀,在這里參考網上的博客進行重寫,類名隨意
      class AsyncTestServiceImplNew final
      {
      private:
      
        // 當前服務器的地址
        std::string server_address_;
        // 當前服務器的完成隊列
        std::unique_ptr<ServerCompletionQueue> cq_;
        // 當前服務器的異步服務
        TestService::AsyncService service_;
        // 服務器實例
        std::unique_ptr<Server> server_;
        
        struct HandlerContextBase
        {
          int type_;          //請求的接口是哪個,1表示http,2表示download,3表示upload,后續有需要可以再添加
          int status_;        //當前處理狀態,1表示處理請求構建響應,2表示發送響應
          ServerContext ctx_; // rpc服務的上下文信息
        };
        //請求的上下文結構
        template <typename RequestType, typename ResponseType>
        struct HandlerContext : public HandlerContextBase
        {
          RequestType req_;                                   //請求數據類型
          ResponseType resp_;                                 //響應數據類型
          ServerAsyncResponseWriter<ResponseType> responder_; //響應器
          HandlerContext() : responder_(&ctx_) {}             //構造方法
        };
        // 定義好各個接口的上下文
        typedef HandlerContext<HttpRequest, HttpResponse> HandlerHttpContext;
        typedef HandlerContext<DownloadRequest, DownloadResponse> HandlerDownloadContext;
        typedef HandlerContext<UploadRequest, UploadResponse> HandlerUploadContext;
        
      public:
        ~AsyncTestServiceImplNew()
        {
          server_->Shutdown();
          // 關閉服務器后也要關閉完成隊列
          cq_->Shutdown();
        }
      
        //構造時傳入IP:Port即可
        AsyncTestServiceImplNew(std::string server_address) : server_address_(server_address) {}
      
        // 服務器與隊列的關閉放入了析構函數中
        void Run()
        {
          // std::string server_address = "localhost:50052";
          // 服務器構建器
          ServerBuilder builder;
          // 服務器IP與端口指定,第二個參數表示該通道不經過身份驗證
          builder.AddListeningPort(server_address_, grpc::InsecureServerCredentials());
          // 注冊服務
          builder.RegisterService(&service_);
          // 為當前服務器創建完成隊列
          cq_ = builder.AddCompletionQueue();
          // 構建并啟動服務器
          server_ = builder.BuildAndStart();
          std::cout << "AysncTestServer_New is listening on " << server_address_ << std::endl;
      
          // 為各個接口創建請求上下文,然后注冊請求到服務端
          HandlerHttpContext *http_context = new HandlerHttpContext;
          http_context->type_ = 1;
          http_context->status_ = 1;
          HandlerDownloadContext *download_context = new HandlerDownloadContext;
          download_context->type_ = 2;
          download_context->status_ = 1;
          HandlerUploadContext *upload_context = new HandlerUploadContext;
          upload_context->type_ = 3;
          upload_context->status_ = 1;
      
          // 注冊服務,參數從前到后分別是:rpc服務上下文,rpc請求對象,異步響應器,新的rpc請求使用的完成隊列,通知完成使用的完成隊列,唯一標識tag標識當前這次請求的上下文
          service_.Requesthttp(&http_context->ctx_, &http_context->req_, &http_context->responder_, cq_.get(), cq_.get(), http_context);
          service_.Requestdownload(&download_context->ctx_, &download_context->req_, &download_context->responder_, cq_.get(), cq_.get(), download_context);
          service_.Requestupload(&upload_context->ctx_, &upload_context->req_, &upload_context->responder_, cq_.get(), cq_.get(), upload_context);
      
          //創建線程池,用于運行請求的接口
          ThreadPool pool(THREAD_POOL_SIZE);//THTREAD_POOL_SIZE自行定義
          //不斷從完成隊列中取出請求,這里的請求都是在上面注冊過的
          while (true)
          {
            HandlerContextBase *handler_context = nullptr;
            bool ok = false;
            GPR_ASSERT(cq_->Next((void **)&handler_context, &ok));
            GPR_ASSERT(ok);
      
            //請求接口的類型,1是http,2是download,3是upload
            int type = handler_context->type_;
            //根據狀態分別處理,1表示要進行接口調用,2表示已經完成,可以銷毀該請求上下文了
            if (handler_context->status_ == 2)
            {
              switch (type)
              {
              case 1:
                delete (HandlerHttpContext *)handler_context;
                break;
              case 2:
                delete (HandlerDownloadContext *)handler_context;
                break;
              case 3:
                delete (HandlerUploadContext *)handler_context;
                break;
              }
              continue;
            }
            //從完成隊列中取出來了一個請求上下文來處理當前請求,就需要再放回去一個給后續到達的請求用
            switch (type)
            {
            case 1:
            {
              HandlerHttpContext *http_context = new HandlerHttpContext;
              http_context->type_ = 1;
              http_context->status_ = 1;
              // 注冊服務,參數從前到后分別是:rpc服務上下文,rpc請求對象,異步響應器,新的rpc請求使用的完成隊列,通知完成使用的完成隊列,唯一標識tag標識當前這次請求的上下文
              service_.Requesthttp(&http_context->ctx_, &http_context->req_, &http_context->responder_, cq_.get(), cq_.get(), http_context);
            }
            break;
            case 2:
            {
              HandlerDownloadContext *download_context = new HandlerDownloadContext;
              download_context->type_ = 2;
              download_context->status_ = 1;
              service_.Requestdownload(&download_context->ctx_, &download_context->req_, &download_context->responder_, cq_.get(), cq_.get(), download_context);
            }
            break;
            case 3:
            {
              HandlerUploadContext *upload_context = new HandlerUploadContext;
              upload_context->type_ = 3;
              upload_context->status_ = 1;
              service_.Requestupload(&upload_context->ctx_, &upload_context->req_, &upload_context->responder_, cq_.get(), cq_.get(), upload_context);
            }
            break;
            }
      
            //當前請求上下文的任務進行執行,放入線程池中去運行
            pool.enqueue([type, handler_context, this]()
                         {
                           switch (type)
                           {
                           case 1:
                           {
                             HandlerHttpContext *h = (HandlerHttpContext *)handler_context;
                             Status status = http(&h->ctx_, &h->req_, &h->resp_);
                             h->status_ = 2; //設置狀態為完成接口調用,準備進行響應
                             //調用responder_進行異步的響應發送,三個參數分別為發送的響應、狀態碼、請求處理在服務端的唯一tag
                             h->responder_.Finish(h->resp_, status, handler_context);
                           }
                           break;
                           case 2:
                           {
                             HandlerDownloadContext *h = (HandlerDownloadContext *)handler_context;
                             Status status = download(&h->ctx_, &h->req_, &h->resp_);
                             h->status_ = 2;
                             h->responder_.Finish(h->resp_, status, handler_context);
                           }
                           break;
                           case 3:
                           {
                             HandlerUploadContext *h = (HandlerUploadContext *)handler_context;
                             Status status = upload(&h->ctx_, &h->req_, &h->resp_);
                             h->status_ = 2;
                             h->responder_.Finish(h->resp_, status, handler_context);
                           }
                           break;
                           }
                         });
          }
        }
      
      private:
        Status http(ServerContext *context, const HttpRequest *request,
                    HttpResponse *response)
        {
          response->set_httpresult("http is ok");
          return Status::OK;
        }
      
        Status download(ServerContext *context, const DownloadRequest *request,
                        DownloadResponse *response)
        {
          response->set_downloadresult("download is ok");
          return Status::OK;
        }
        Status upload(ServerContext *context, const UploadRequest *request,
                      UploadResponse *response)
        {
          response->set_uploadresult("upload is ok");
          return Status::OK;
        }
      };
      

      參考博文:http://www.rzrgm.cn/oloroso/p/11345266.html

      線程池源碼

      其中可以使用線程池同時運行多個RPC請求的接口,線程池的代碼此處也一并放出來了,來源于github
      github地址:https://github.com/progschj/ThreadPool.git

      #ifndef THREAD_POOL_H
      #define THREAD_POOL_H
      
      #include <vector>
      #include <queue>
      #include <memory>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <future>
      #include <functional>
      #include <stdexcept>
      
      class ThreadPool
      {
      public:
          ThreadPool(size_t);
          template <class F, class... Args>
          auto enqueue(F &&f, Args &&...args)
              -> std::future<typename std::result_of<F(Args...)>::type>;
          ~ThreadPool();
      
      private:
          // need to keep track of threads so we can join them
          std::vector<std::thread> workers;
          // the task queue
          std::queue<std::function<void()>> tasks;
      
          // synchronization
          std::mutex queue_mutex;
          std::condition_variable condition;
          bool stop;
      };
      
      // the constructor just launches some amount of workers
      inline ThreadPool::ThreadPool(size_t threads)
          : stop(false)
      {
          for (size_t i = 0; i < threads; ++i)
              workers.emplace_back(
                  [this]
                  {
                      for (;;)
                      {
                          std::function<void()> task;
      
                          {
                              std::unique_lock<std::mutex> lock(this->queue_mutex);
                              this->condition.wait(lock,
                                                   [this]
                                                   { return this->stop || !this->tasks.empty(); });
                              if (this->stop && this->tasks.empty())
                                  return;
                              task = std::move(this->tasks.front());
                              this->tasks.pop();
                          }
      
                          task();
                      }
                  });
      }
      
      // add new work item to the pool
      template <class F, class... Args>
      auto ThreadPool::enqueue(F &&f, Args &&...args)
          -> std::future<typename std::result_of<F(Args...)>::type>
      {
          using return_type = typename std::result_of<F(Args...)>::type;
      
          auto task = std::make_shared<std::packaged_task<return_type()>>(
              std::bind(std::forward<F>(f), std::forward<Args>(args)...));
      
          std::future<return_type> res = task->get_future();
          {
              std::unique_lock<std::mutex> lock(queue_mutex);
      
              // don't allow enqueueing after stopping the pool
              if (stop)
                  throw std::runtime_error("enqueue on stopped ThreadPool");
      
              tasks.emplace([task]()
                            { (*task)(); });
          }
          condition.notify_one();
          return res;
      }
      
      // the destructor joins all threads
      inline ThreadPool::~ThreadPool()
      {
          {
              std::unique_lock<std::mutex> lock(queue_mutex);
              stop = true;
          }
          condition.notify_all();
          for (std::thread &worker : workers)
              worker.join();
      }
      
      #endif
      
      posted @ 2022-09-15 16:33  縉云燒餅  閱讀(2022)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 在线a级毛片无码免费真人| 久久亚洲国产精品五月天| 国产精品亚洲二区在线看| 亚洲综合色丁香婷婷六月图片 | 亚洲欧美日韩综合一区在线 | 久久亚洲国产精品久久| 国产av永久无码天堂影院| 亚洲熟妇丰满多毛xxxx| 亚洲综合网一区中文字幕| 最近免费中文字幕大全| 国产亚洲精品综合99久久| 四房播色综合久久婷婷| 麻豆精品一区二区视频在线| 91网站在线看| 亚洲成人av综合一区| 欧美人成在线播放网站免费| 伊大人香蕉久久网欧美| 久久99精品久久久大学生| 全部免费毛片在线播放| 国产一区二区三区黄色片| 欧美肥老太牲交大战| 97无码人妻福利免费公开在线视频| 亚洲国产av区一区二| 国产久免费热视频在线观看| 人体内射精一区二区三区| 国产无遮挡免费视频免费| 亚洲国产v高清在线观看| 国产欧美日韩精品丝袜高跟鞋| 东方av四虎在线观看| 国产精品久久久久7777| 在线观看免费网页欧美成| 婷婷开心深爱五月天播播| 怀宁县| 日本中文字幕在线播放| 大地资源中文第二页日本| 龙门县| 深夜免费av在线观看| 人妻av无码系列一区二区三区| 泸定县| 国产99在线 | 欧美| 潘金莲高清dvd碟片|