【gRPC】C++異步服務端優化版,多服務接口樣例
官方的C++異步服務端API樣例可讀性并不好,理解起來非常的費勁,各種狀態機也并不明了,整個運行過程也容易讀不懂,因此此處參考網上的博客進行了重寫,以求順利讀懂。
C++異步服務端實例,詳細注釋版
gRPC使用C++實現異步服務端的基本邏輯:
- 構建數據結構來存儲需要處理的請求及其上下文信息,此處使用HandlerContext,相當于對到達的請求的封裝
- 首先注冊各個接口的HandlerContext,放入完成隊列CompletionQueue中,當請求到達時,根據類型封裝進對應的HandlerContext,由于是異步客戶端,需要保證后面到達的請求也有HandlerContext用,所以用一個就要再創建一個空的放回去
- 運行完的接口,其HandlerContext需要銷毀
以下代碼的關鍵為run()方法中的邏輯以及HandlerContext的設置,每一步都有注釋,可以詳細觀看
//官方樣例的異步服務代碼可讀性太差了,狀態機繞來繞去不直觀,在這里參考網上的博客進行重寫,類名隨意
class AsyncTestServiceImplNew final
{
private:
// 當前服務器的地址
std::string server_address_;
// 當前服務器的完成隊列
std::unique_ptr<ServerCompletionQueue> cq_;
// 當前服務器的異步服務
TestService::AsyncService service_;
// 服務器實例
std::unique_ptr<Server> server_;
struct HandlerContextBase
{
int type_; //請求的接口是哪個,1表示http,2表示download,3表示upload,后續有需要可以再添加
int status_; //當前處理狀態,1表示處理請求構建響應,2表示發送響應
ServerContext ctx_; // rpc服務的上下文信息
};
//請求的上下文結構
template <typename RequestType, typename ResponseType>
struct HandlerContext : public HandlerContextBase
{
RequestType req_; //請求數據類型
ResponseType resp_; //響應數據類型
ServerAsyncResponseWriter<ResponseType> responder_; //響應器
HandlerContext() : responder_(&ctx_) {} //構造方法
};
// 定義好各個接口的上下文
typedef HandlerContext<HttpRequest, HttpResponse> HandlerHttpContext;
typedef HandlerContext<DownloadRequest, DownloadResponse> HandlerDownloadContext;
typedef HandlerContext<UploadRequest, UploadResponse> HandlerUploadContext;
public:
~AsyncTestServiceImplNew()
{
server_->Shutdown();
// 關閉服務器后也要關閉完成隊列
cq_->Shutdown();
}
//構造時傳入IP:Port即可
AsyncTestServiceImplNew(std::string server_address) : server_address_(server_address) {}
// 服務器與隊列的關閉放入了析構函數中
void Run()
{
// std::string server_address = "localhost:50052";
// 服務器構建器
ServerBuilder builder;
// 服務器IP與端口指定,第二個參數表示該通道不經過身份驗證
builder.AddListeningPort(server_address_, grpc::InsecureServerCredentials());
// 注冊服務
builder.RegisterService(&service_);
// 為當前服務器創建完成隊列
cq_ = builder.AddCompletionQueue();
// 構建并啟動服務器
server_ = builder.BuildAndStart();
std::cout << "AysncTestServer_New is listening on " << server_address_ << std::endl;
// 為各個接口創建請求上下文,然后注冊請求到服務端
HandlerHttpContext *http_context = new HandlerHttpContext;
http_context->type_ = 1;
http_context->status_ = 1;
HandlerDownloadContext *download_context = new HandlerDownloadContext;
download_context->type_ = 2;
download_context->status_ = 1;
HandlerUploadContext *upload_context = new HandlerUploadContext;
upload_context->type_ = 3;
upload_context->status_ = 1;
// 注冊服務,參數從前到后分別是:rpc服務上下文,rpc請求對象,異步響應器,新的rpc請求使用的完成隊列,通知完成使用的完成隊列,唯一標識tag標識當前這次請求的上下文
service_.Requesthttp(&http_context->ctx_, &http_context->req_, &http_context->responder_, cq_.get(), cq_.get(), http_context);
service_.Requestdownload(&download_context->ctx_, &download_context->req_, &download_context->responder_, cq_.get(), cq_.get(), download_context);
service_.Requestupload(&upload_context->ctx_, &upload_context->req_, &upload_context->responder_, cq_.get(), cq_.get(), upload_context);
//創建線程池,用于運行請求的接口
ThreadPool pool(THREAD_POOL_SIZE);//THTREAD_POOL_SIZE自行定義
//不斷從完成隊列中取出請求,這里的請求都是在上面注冊過的
while (true)
{
HandlerContextBase *handler_context = nullptr;
bool ok = false;
GPR_ASSERT(cq_->Next((void **)&handler_context, &ok));
GPR_ASSERT(ok);
//請求接口的類型,1是http,2是download,3是upload
int type = handler_context->type_;
//根據狀態分別處理,1表示要進行接口調用,2表示已經完成,可以銷毀該請求上下文了
if (handler_context->status_ == 2)
{
switch (type)
{
case 1:
delete (HandlerHttpContext *)handler_context;
break;
case 2:
delete (HandlerDownloadContext *)handler_context;
break;
case 3:
delete (HandlerUploadContext *)handler_context;
break;
}
continue;
}
//從完成隊列中取出來了一個請求上下文來處理當前請求,就需要再放回去一個給后續到達的請求用
switch (type)
{
case 1:
{
HandlerHttpContext *http_context = new HandlerHttpContext;
http_context->type_ = 1;
http_context->status_ = 1;
// 注冊服務,參數從前到后分別是:rpc服務上下文,rpc請求對象,異步響應器,新的rpc請求使用的完成隊列,通知完成使用的完成隊列,唯一標識tag標識當前這次請求的上下文
service_.Requesthttp(&http_context->ctx_, &http_context->req_, &http_context->responder_, cq_.get(), cq_.get(), http_context);
}
break;
case 2:
{
HandlerDownloadContext *download_context = new HandlerDownloadContext;
download_context->type_ = 2;
download_context->status_ = 1;
service_.Requestdownload(&download_context->ctx_, &download_context->req_, &download_context->responder_, cq_.get(), cq_.get(), download_context);
}
break;
case 3:
{
HandlerUploadContext *upload_context = new HandlerUploadContext;
upload_context->type_ = 3;
upload_context->status_ = 1;
service_.Requestupload(&upload_context->ctx_, &upload_context->req_, &upload_context->responder_, cq_.get(), cq_.get(), upload_context);
}
break;
}
//當前請求上下文的任務進行執行,放入線程池中去運行
pool.enqueue([type, handler_context, this]()
{
switch (type)
{
case 1:
{
HandlerHttpContext *h = (HandlerHttpContext *)handler_context;
Status status = http(&h->ctx_, &h->req_, &h->resp_);
h->status_ = 2; //設置狀態為完成接口調用,準備進行響應
//調用responder_進行異步的響應發送,三個參數分別為發送的響應、狀態碼、請求處理在服務端的唯一tag
h->responder_.Finish(h->resp_, status, handler_context);
}
break;
case 2:
{
HandlerDownloadContext *h = (HandlerDownloadContext *)handler_context;
Status status = download(&h->ctx_, &h->req_, &h->resp_);
h->status_ = 2;
h->responder_.Finish(h->resp_, status, handler_context);
}
break;
case 3:
{
HandlerUploadContext *h = (HandlerUploadContext *)handler_context;
Status status = upload(&h->ctx_, &h->req_, &h->resp_);
h->status_ = 2;
h->responder_.Finish(h->resp_, status, handler_context);
}
break;
}
});
}
}
private:
Status http(ServerContext *context, const HttpRequest *request,
HttpResponse *response)
{
response->set_httpresult("http is ok");
return Status::OK;
}
Status download(ServerContext *context, const DownloadRequest *request,
DownloadResponse *response)
{
response->set_downloadresult("download is ok");
return Status::OK;
}
Status upload(ServerContext *context, const UploadRequest *request,
UploadResponse *response)
{
response->set_uploadresult("upload is ok");
return Status::OK;
}
};
參考博文:http://www.rzrgm.cn/oloroso/p/11345266.html
線程池源碼
其中可以使用線程池同時運行多個RPC請求的接口,線程池的代碼此處也一并放出來了,來源于github
github地址:https://github.com/progschj/ThreadPool.git
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool
{
public:
ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F &&f, Args &&...args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]
{ return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&...args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]()
{ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}
#endif

浙公網安備 33010602011771號