【gRPC】C++異步服務端客戶端API實例及代碼解析
對于同步API而言,程序的吞吐量并不高。因為在每次發送一個gRPC請求時,會阻塞整個線程,必須等待服務端的ack回到客戶端才能繼續運行或者發送下一個請求,因此異步API是提升程序吞吐量的必要手段。
gRPC異步操作依賴于完成隊列CompletionQueue
官網教程:https://grpc.io/docs/languages/cpp/async/
參考博客1:https://www.luozhiyun.com/archives/671
參考博客2:https://blog.miigon.net/posts/cn-so-difference-between-sync-and-async-grpc/
整體思路概述:
- 將一個完成隊列CompletionQueue綁定到RPC調用
- 在客戶端與服務器兩端執行寫入或讀取之類的操作,同時帶有唯一的void*標簽
- 調用CompletionQueue::Next以等待操作完成。如果出現標簽,則表示相應的操作完成
異步客戶端:
要點
- 就像同步客戶端一樣,我們需要創建一個存根stub用于進行gRPC方法的調用,但是此時我們需要調用的是異步方法,需要為其綁定一個完成隊列
//客戶端的類實例在初始化時就需要創建Channel和Stub了,具體看官方實例代碼中的greeter_async_client.cc
CompletionQueue cq;//創建完成隊列
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));//將完成隊列綁定到存根,進而創建出客戶端異步響應讀取器
- rpc->StartCall()初始化RPC請求
- rpc->Finish()有三個參數,分別是gRPC響應、最終狀態、唯一標簽。一旦RPC請求完成,響應的結果、最終狀態會被封裝到前兩個傳入的參數中,同時會附帶唯一標簽。唯一標簽可以使用RPC請求的地址,這樣一來,當最終的響應到來時,我們可以拿著該地址進行相應處理。
- 異步處理響應,拿著唯一標簽可以獲取對應RPC請求的地址,進而進行相關處理。
- 為了使得請求的發出與響應的處理相互之間不阻塞,需要把響應的處理獨立出一個線程
代碼示例
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// 客戶端SayHello
void SayHello(const std::string& user) {
// RPC請求數據封裝
HelloRequest request;
request.set_name(user);
// 異步客戶端請求,存儲請求響應的狀態和數據的結構體等,在下方進行的定義
AsyncClientCall* call = new AsyncClientCall;
// 初始化response_reader
// stub_->PrepareAsyncSayHello()創建一個RPC對象,但是不會立即啟動RPC調用
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall()方法發起真正的RPC請求
call->response_reader->StartCall();
// Finish()方法前兩個參數用于指定響應數據的存儲位置,第三個參數指定了該次RPC異步請求的地址
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
// 不斷循環監聽完成隊列,對響應進行處理
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// 在隊列為空時阻塞,隊列中有響應結果時讀取到got_tag和ok兩個參數
// 前者是結果對應的RPC請求的地址,后者是響應的狀態
while (cq_.Next(&got_tag, &ok)) {
// 類型轉換,獲取到的實際上是此響應結果對應的RPC請求的地址,在這個地址下保存了實際的響應結果數據
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// 驗證請求是否真的完成了
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// 完成了響應的處理后,清除該RPC請求
delete call;
}
}
private:
// 異步客戶端通話,存儲一次RPC通話的信息,里面包含響應的狀態和數據的結構
struct AsyncClientCall {
// 服務器返回的響應數據
HelloReply reply;
// 客戶端的上下文信息,可以被用于向服務器傳達額外信息或調整某些RPC行為
ClientContext context;
// RPC響應的狀態
Status status;
// 客戶端異步響應讀取器
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
};
// 存根,在我們的視角里就是服務器端暴露的服務接口
std::unique_ptr<Greeter::Stub> stub_;
// 完成隊列,一個用于gRPC異步處理的生產者消費者隊列
CompletionQueue cq_;
};
int main(int argc, char** argv) {
// 實例化一個客戶端,需要一個信道,第二個參數表明該通道未經過身份驗證
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// 獨立的異步響應處理線程
// 由于該方法是客戶端類內的非靜態方法,所以需要傳入客戶端類的實例表明歸屬
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
//發送異步的請求SayHello()
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //永遠會阻塞,因為異步響應處理線程永遠不會停止,必須ctrl+c才能退出
return 0;
}
異步服務器
要點
- 客戶端在發起請求時附帶了標簽(此次RPC請求會話的地址),因此服務器端也需要將該標簽妥善處理再返回
- 官方API中是準備一個CallData對象作為容器,gRPC通過ServerCompletionQueue將各種事件發送到CallData對象中,然后讓這兒對象根據自身的狀態進行處理。處理完成后還需要再手動創建一個CallData對象,這個對象是為下一個Client請求準備的,整個過程就像流水線一樣
- CREATE:CallData對象被創建處理之前處于CREATE狀態
- PROCESS:請求到達后,轉換為PROCESS狀態
- FINISH:響應完成后,轉換為FINISH狀態
- 整體服務器端流程
- 啟動服務時,預分配 一個 CallData 實例供未來客戶端請求使用。
- 該 CallData 對象構造時,service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this) 將被調用,通知gRPC開始準備接收恰好是一個SayHello請求。
- 這時候我們還不知道請求會由誰發出,何時到達,我們只是告訴 gRPC 說我們已經準備好接收了,讓 gRPC 在真的接收到時通知我們。
- 供給 RequestSayHello的參數告訴了gRPC將上下文信息、請求體以及回復器放在哪里、使用哪個完成隊列來通知、以及通知的時候,用于鑒別請求的 tag(在這個例子中,this 被作為 tag 使用)。
HandleRpcs() 運行到 cq->Next() 并阻塞。等待下一個事件發生 - 客戶端發送一個 SayHello 請求到服務器,gRPC 開始接收并解碼該請求(IO 操作)
- 一段時間后….
- gRPC接收請求完成了。它將請求體放入CallData對象的request_成員中(通過我們之前提供的指針),然后創建一個事件(使用指向CallData 對象的指針作為 tag),并 將該事件放到完成隊列 cq_ 中.
- HandleRpcs() 中的循環接收到了該事件(之前阻塞住的 cq->Next() 調用此時也返回),并調用 CallData::Proceed() 來處理請求。
- CallData 的 status_ 屬性此時是 PROCESS,它做了如下事情:
- 創建一個新的 CallData 對象,這樣在這個請求后的新請求才能被新對象處理。
- 生成當前請求的回復,告訴 gRPC 我們處理完成了,將該回復發送回客戶端
- gRPC 開始回復的傳輸 (IO 操作)
- HandleRpcs() 中的循環迭代一次,再次阻塞在 cq->Next(),等待新事件的發生。
- 一段時間后….
- gRPC 完成了回復的傳輸,再次通過在完成隊列里放入一個以 CallData 指針為 tag 的事件的方式通知我們。
- cq->Next() 接收到該事件并返回,CallData::Proceed() 將 CallData 對象釋放(使用 delete this;)。HandleRpcs() 循環并重新阻塞在 cq->Next() 上,等待新事件的發生。
整個過程看似和同步 API 很相似,只是多了對完成隊列的控制。然而,通過這種方式,每一個 一段時間后.... (通常是在等待 IO 操作的完成或等待一個請求出現) cq->Next() 不僅可以接收到當前處理的請求的完成事件,還可以接收到其他請求的事件。
所以假設第一個請求正在等待它的回復數據傳輸完成時,一個新的請求到達了,cq->Next() 可以獲得新請求產生的事件,并開始并行處理新請求,而不用等待第一個請求的傳輸完成
代碼示例
//服務器實現類
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// 關閉服務器后也要關閉完成隊列
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
// 服務器地址和端口
std::string server_address("0.0.0.0:50051");
// 服務器構建器
ServerBuilder builder;
// 服務器IP與端口指定,第二個參數表示該通道未經過身份驗證
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// 注冊服務
builder.RegisterService(&service_);
// 為當前服務器創建完成隊列
cq_ = builder.AddCompletionQueue();
// 構建并啟動服務器
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// 運行服務器的主流程
HandleRpcs();
}
private:
// 處理一個請求所需要保存的狀態、邏輯、數據被封裝成了CallData類
class CallData {
public:
// 傳入service實例和服務器端的完成隊列,創建后status_是CREATE狀態
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// 立即調用服務邏輯
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// 轉換為PROCESS狀態
status_ = PROCESS;
// 作為初始化的一部分,請求系統開始處理SayHello請求。
// 此處的this指代此CallData實例
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// 在執行當前CallData的任務時,創建一個新的CallData實例去為新的請求服務
// 當前CallData會在FINISH階段自行銷毀
new CallData(service_, cq_);
// 實際的邏輯處理
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// 在完成后將狀態置為FINISH。使用當前CallData的地址作為該事件的標簽
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
// 銷毀當前CallData
delete this;
}
}
private:
// 異步服務
Greeter::AsyncService* service_;
//服務器端的完成隊列,是一個生產者-消費者隊列
ServerCompletionQueue* cq_;
// 服務器端上下文信息,可以被用于向客戶端傳達額外信息、數據或調整某些RPC行為
ServerContext ctx_;
// 客戶端發來的請求
HelloRequest request_;
// 服務端的響應
HelloReply reply_;
// 發送服務端響應的工具
ServerAsyncResponseWriter<HelloReply> responder_;
// 狀態機定義
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // 當前的狀態
};
// 如果有需求的話,服務器的處理可以是多線程的
void HandleRpcs() {
// 創建一個新的CallData,將完成隊列中的數據封裝進去
new CallData(&service_, cq_.get());
void* tag; // 請求特有的標簽,實際上是請求的RPC會話對象在客戶端的地址信息
bool ok;
while (true) {
// 阻塞等待讀取完成隊列中的事件,每個事件使用一個標簽進行標識,該標簽是CallData實例的地址
// 完成隊列的Next()方法應該每次都檢查返回值,來確保事件和完成隊列的狀態正常
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
// 從標簽轉換為CallData*類型,進而訪問CallData中的方法
static_cast<CallData*>(tag)->Proceed();
}
}
// 當前服務器的完成隊列
std::unique_ptr<ServerCompletionQueue> cq_;
// 當前服務器的異步服務
Greeter::AsyncService service_;
// 服務器實例
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}

浙公網安備 33010602011771號