Thrift之TProcess類體系原理及源碼詳細(xì)解析
我的新浪微博:http://weibo.com/freshairbrucewoo。
歡迎大家相互交流,共同提高技術(shù)。
之前對(duì)Thrift自動(dòng)生成代碼的實(shí)現(xiàn)細(xì)節(jié)做了詳細(xì)的分析,下面進(jìn)行處理層的實(shí)現(xiàn)做詳細(xì)分析了!會(huì)利用到自動(dòng)代碼生成的知識(shí)。
這部分是協(xié)議層和用戶提供的服務(wù)實(shí)現(xiàn)之間的紐帶,定義了調(diào)用服務(wù)實(shí)現(xiàn)的接口框架,真正實(shí)現(xiàn)某種服務(wù)接口是通過上一章介紹的代碼生成工具生成的代碼。本章將介紹這個(gè)框架的基本原理,然后通過生成的一個(gè)實(shí)例來具體介紹怎樣完成一次完整的服務(wù),這個(gè)可能涉及到下面章節(jié)的一些知識(shí),對(duì)于這些知識(shí)不詳細(xì)分析其功能,只是介紹它在其中起什么作用。選擇的實(shí)例是Facebook內(nèi)部用這個(gè)框架實(shí)現(xiàn)的一個(gè)分布式日志收集系統(tǒng)scribe。下面是這部分相關(guān)類的類關(guān)系圖:

從上圖中可以看出TProcessor是這個(gè)部分的頂層基類,其他之類基本上都是通過Thrift代碼生成工具生成的,只有少數(shù)是為了擴(kuò)展一些功能而直接寫代碼實(shí)現(xiàn),如PeekProcessor類就增加了一些對(duì)原始數(shù)據(jù)處理的功能。scribeProcessor和FacebookServiceProcessor類就是用代碼生成器根據(jù)IDL文件生成的,也是我們后面需要分析的一個(gè)實(shí)例。
第一節(jié) 服務(wù)接口調(diào)用框架分析
這個(gè)基本的框架包括三個(gè)類,一個(gè)就是抽象類TProcessor,負(fù)責(zé)調(diào)用用戶定義的服務(wù)接口,從一個(gè)接口讀入數(shù)據(jù),寫入一個(gè)輸出接口。一個(gè)最主要的函數(shù)定義如下:
1 virtual bool process(boost::shared_ptr<protocol::TProtocol> in, 2 3 boost::shared_ptr<protocol::TProtocol> out, void* connectionContext) = 0;
這個(gè)函數(shù)是一個(gè)純虛函數(shù),所以繼承這個(gè)類的子類都必須實(shí)現(xiàn)這個(gè)函數(shù),這個(gè)函數(shù)就是最主要的數(shù)據(jù)傳輸功能。
第二個(gè)類就是負(fù)責(zé)處理TProcessor類產(chǎn)生的事件的類TProcessorEventHandler,主要定義了一些當(dāng)某事件發(fā)生時(shí)的處理函數(shù),例如當(dāng)讀取參數(shù)之前可以做一些處理功能。下面是這個(gè)類定義的各個(gè)成員函數(shù),每一個(gè)函數(shù)都處理一種事件發(fā)送時(shí)的情況:
|
函數(shù)名稱 |
函數(shù)功能 |
|
getContext |
調(diào)用其他回調(diào)函數(shù)之前調(diào)用,期望返回一些有序的上下文對(duì)象以便傳遞給其他回調(diào)函數(shù)使用 |
|
freeContext |
期望釋放一個(gè)上下文有關(guān)的資源 |
|
preRead |
在讀參數(shù)以前調(diào)用 |
|
postRead |
在讀參數(shù)和處理函數(shù)之間調(diào)用 |
|
preWrite |
在處理和寫響應(yīng)之間調(diào)用 |
|
postWrite |
在寫響應(yīng)之后調(diào)用 |
|
asyncComplete |
當(dāng)一個(gè)異步函數(shù)成功完成調(diào)用時(shí)調(diào)用 |
|
handlerError |
如果處理函數(shù)拋出沒有定義的異常就會(huì)調(diào)用此函數(shù) |
最后一個(gè)類就是TProcessorContextFreer類,這個(gè)類是一個(gè)幫助類,幫助生成的代碼來釋放上下文資源。
第二節(jié) 基于框架生成的服務(wù)實(shí)例分析
本節(jié)將對(duì)scribe服務(wù)器采用的服務(wù)實(shí)現(xiàn)進(jìn)行詳細(xì)分析。
1 接口定義語言文件(IDL)
(1)Facebook內(nèi)部共用服務(wù)協(xié)議
主要有兩個(gè)文件,一個(gè)是在Thrift中定義,是用于Facebook內(nèi)部的一些接口服務(wù)定義,這個(gè)不僅僅用于scribe服務(wù)器,可能還用于Facebook內(nèi)部其他系統(tǒng),這個(gè)文件內(nèi)容如下:
1 namespace java com.facebook.fb303 2 3 namespace cpp facebook.fb303 4 5 namespace perl Facebook.FB303 6 7 enum fb_status { 8 9 DEAD = 0, 10 11 STARTING = 1, 12 13 ALIVE = 2, 14 15 STOPPING = 3, 16 17 STOPPED = 4, 18 19 WARNING = 5, 20 21 } 22 23 service FacebookService { 24 25 string getName(), 26 27 string getVersion(), 28 29 fb_status getStatus(), 30 31 string getStatusDetails(), 32 33 map<string, i64> getCounters(), 34 35 i64 getCounter(1: string key), 36 37 void setOption(1: string key, 2: string value), 38 39 string getOption(1: string key), 40 41 map<string, string> getOptions(), 42 43 string getCpuProfile(1: i32 profileDurationInSec), 44 45 i64 aliveSince(), 46 47 oneway void reinitialize(), 48 49 oneway void shutdown(), 50 51 }
上面這個(gè)IDL文件定義了一個(gè)枚舉類型用于表示服務(wù)的狀態(tài),還定義了一個(gè)名位FacebookService的服務(wù),里面定義了各種操作,如獲取服務(wù)狀態(tài)的操作、得到計(jì)數(shù)的操作等等。
下面我們來看看根據(jù)這個(gè)IDL文件生成的C++代碼是什么樣的一個(gè)架構(gòu)。首先生成了一個(gè)基于上面服務(wù)定義的抽象類如下:
class FacebookServiceIf { public: virtual ~FacebookServiceIf() {} virtual void getName(std::string& _return) = 0; virtual void getVersion(std::string& _return) = 0; virtual fb_status getStatus() = 0; virtual void getStatusDetails(std::string& _return) = 0; virtual void getCounters(std::map<std::string, int64_t> & _return) = 0; virtual int64_t getCounter(const std::string& key) = 0; virtual void setOption(const std::string& key, const std::string& value) = 0; virtual void getOption(std::string& _return, const std::string& key) = 0; virtual void getOptions(std::map<std::string, std::string> & _return) = 0; virtual void getCpuProfile(std::string& _return, const int32_t profileDurationInSec) = 0; virtual int64_t aliveSince() = 0; virtual void reinitialize() = 0; virtual void shutdown() = 0; };
注意觀察,除了這個(gè)類多了一個(gè)虛析構(gòu)函數(shù),其他函數(shù)就是IDL中定義的。接著定義了類FacebookServiceNull,這個(gè)是上面那個(gè)抽象類的空實(shí)現(xiàn)(就是所有方法都沒有做具體的事情),這樣做的好處就是我們需要重寫一些函數(shù)的時(shí)候只需要關(guān)注我們需要寫的函數(shù),而不是重寫所有函數(shù)。接著又定義了封裝每一個(gè)函數(shù)參數(shù)的相應(yīng)類,就是一個(gè)函數(shù)的參數(shù)都用一個(gè)類來封裝定義,函數(shù)的返回值也是這樣處理。這樣做的目的是統(tǒng)一遠(yuǎn)程調(diào)用的實(shí)現(xiàn)接口,因?yàn)閭鬟f參數(shù)都只需要這個(gè)封裝類的對(duì)象就可以了。所以你會(huì)看到每一個(gè)服務(wù)里面定義的函數(shù)都有下面一組類的定義:
1 (1)class FacebookService_getName_args {…} 2 3 (2)class FacebookService_getName_pargs {…} 4 5 (3)typedef struct _FacebookService_getName_result__isset {…} _FacebookService_getName_result__isset; 6 7 (4)class FacebookService_getName_result{…} 8 9 (5)typedef struct _FacebookService_getName_presult__isset {…} _FacebookService_getName_presult__isset; 10 11 (6)class FacebookService_getName_presult{…}
上面這六個(gè)類定義就是為服務(wù)中的getName函數(shù)服務(wù)的,相應(yīng)的每一個(gè)函數(shù)都會(huì)有這種類似的定義和實(shí)現(xiàn)。接下來就會(huì)定義三個(gè)具體實(shí)現(xiàn)IDL定義的功能的類,一個(gè)客戶端的類,它繼承定義的服務(wù)抽象類,每一個(gè)具體的函數(shù)實(shí)現(xiàn)都是同樣的方式和思路,同樣我結(jié)合getName函數(shù)的實(shí)現(xiàn)來看看這個(gè)過程,其他函數(shù)都是這樣實(shí)現(xiàn)的,代碼如下:
1 send_getName(); 2 3 recv_getName(_return);
由上面代碼可以看出首先調(diào)用函數(shù)發(fā)送函數(shù)名稱及相關(guān)信息到遠(yuǎn)程,然后接受函數(shù)調(diào)用的返回值,發(fā)送函數(shù)send_getName()的代碼如下:
1 int32_t cseqid = 0; 2 3 oprot_->writeMessageBegin("getName", ::apache::thrift::protocol::T_CALL, cseqid);//寫一個(gè)函數(shù)調(diào)用消息RPC 4 5 FacebookService_getName_pargs args; 6 7 args.write(oprot_);//寫入?yún)?shù) 8 9 oprot_->writeMessageEnd(); 10 11 oprot_->getTransport()->writeEnd(); 12 13 oprot_->getTransport()->flush();//保證這次寫入過程立即生效
上面代碼就完成了函數(shù)名稱以及參數(shù)的傳輸,調(diào)用的是TProtocol相關(guān)的類的函數(shù)實(shí)現(xiàn),具體的實(shí)現(xiàn)內(nèi)容和方式會(huì)在TProtocol部分介紹。下面接著看一下接收返回值的函數(shù)recv_getName的代碼:
1 int32_t rseqid = 0;//接收的消息序列號(hào) 2 3 std::string fname;//函數(shù)名稱 4 5 ::apache::thrift::protocol::TMessageType mtype;//消息的類型(調(diào)用(T_CALL)、異常(T_EXCEPTION)等) 6 7 iprot_->readMessageBegin(fname, mtype, rseqid);//從返回消息讀取函數(shù)名稱、消息類型 8 9 if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {//處理異常消息 10 11 ::apache::thrift::TApplicationException x; 12 13 x.read(iprot_); 14 15 iprot_->readMessageEnd(); 16 17 iprot_->getTransport()->readEnd(); 18 19 throw x; 20 21 } 22 23 if (mtype != ::apache::thrift::protocol::T_REPLY) {//處理返回消息 24 25 iprot_->skip(::apache::thrift::protocol::T_STRUCT); 26 27 iprot_->readMessageEnd(); 28 29 iprot_->getTransport()->readEnd(); 30 31 } 32 33 if (fname.compare("getName") != 0) {//看是否是我們需要的函數(shù)名,不是就跳過消息讀取 34 35 iprot_->skip(::apache::thrift::protocol::T_STRUCT); 36 37 iprot_->readMessageEnd(); 38 39 iprot_->getTransport()->readEnd(); 40 41 } 42 43 FacebookService_getName_presult result; 44 45 result.success = &_return; 46 47 result.read(iprot_);//讀取函數(shù)返回值 48 49 iprot_->readMessageEnd(); 50 51 iprot_->getTransport()->readEnd(); 52 53 if (result.__isset.success) {//成功就返回結(jié)果(已經(jīng)在_return里面),否則拋出異常 54 55 return; 56 57 } 58 59 throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "getName failed: unknown result");
上面代碼就是處理遠(yuǎn)程調(diào)用的返回結(jié)果,代碼里面有注釋。一個(gè)服務(wù)函數(shù)的實(shí)現(xiàn)大概流程已經(jīng)展現(xiàn)在我們面前了,處理的過程也已經(jīng)清晰。這個(gè)只是用于客戶端的處理流程,必須通過有效的機(jī)制來通知服務(wù)器端調(diào)用相應(yīng)的函數(shù)(這就是RPC)在服務(wù)器端完成相應(yīng)功能并將結(jié)果返回。這種機(jī)制就是通過我們這部分介紹的TProcessor類實(shí)現(xiàn),這就是上面提到三個(gè)類中的第二個(gè)類,在這個(gè)實(shí)例中是FacebookServiceProcessor類,它從TProcessor類繼承,重點(diǎn)實(shí)現(xiàn)兩個(gè)函數(shù)process和process_fn,其中process會(huì)調(diào)用process_fn函數(shù)來處理客戶端具體調(diào)用的那個(gè)服務(wù)函數(shù),process函數(shù)定義如下:
1 bool FacebookServiceProcessor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, 2 3 boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot, void* callContext) { 4 5 ::apache::thrift::protocol::TProtocol* iprot = piprot.get(); 6 7 ::apache::thrift::protocol::TProtocol* oprot = poprot.get(); 8 9 std::string fname; 10 11 ::apache::thrift::protocol::TMessageType mtype; 12 13 int32_t seqid; 14 15 iprot->readMessageBegin(fname, mtype, seqid);//讀取得到函數(shù)名稱、消息類型和函數(shù)序列號(hào) 16 17 //處理不是函數(shù)調(diào)用消息的情況 18 19 if (mtype != ::apache::thrift::protocol::T_CALL && mtype != ::apache::thrift::protocol::T_ONEWAY) { 20 21 iprot->skip(::apache::thrift::protocol::T_STRUCT); 22 23 iprot->readMessageEnd(); 24 25 iprot->getTransport()->readEnd(); 26 27 ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE); 28 29 //寫入(返回)一個(gè)異常信息給調(diào)用客戶端,客戶端會(huì)根據(jù)返回結(jié)果處理異常 30 31 oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid); 32 33 x.write(oprot); 34 35 oprot->writeMessageEnd(); 36 37 oprot->getTransport()->writeEnd(); 38 39 oprot->getTransport()->flush(); 40 41 return true; 42 43 } 44 45 return process_fn(iprot, oprot, fname, seqid, callContext);//調(diào)用實(shí)際的函數(shù)處理 46 47 }
上面代碼有比較詳細(xì)的注釋,還需要說明一點(diǎn)的就是如果傳遞的不是函數(shù)調(diào)用的消息類型就會(huì)返回給客戶端一個(gè)異常的消息,客戶端的接收返回值的函數(shù)就會(huì)根據(jù)收到的異常消息做相應(yīng)處理,上面getName函數(shù)的接收返回值函數(shù)就是拋出一個(gè)服務(wù)器端給的異常信息。下面繼續(xù)看最終服務(wù)器端調(diào)用相應(yīng)映射函數(shù)的處理,這個(gè)是通過process_fn函數(shù)實(shí)現(xiàn):具體定義如下:
1 bool FacebookServiceProcessor::process_fn(::apache::thrift::protocol::TProtocol* iprot, 2 3 ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid, void* callContext) { 4 5 //定義個(gè)map的迭代器,用于接收在函數(shù)映射查找到的映射函數(shù) 6 7 std::map<std::string, void (FacebookServiceProcessor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, 8 9 ::apache::thrift::protocol::TProtocol*, void*)>::iterator pfn; 10 11 pfn = processMap_.find(fname);//根據(jù)函數(shù)名稱查找對(duì)應(yīng)的映射處理函數(shù) 12 13 if (pfn == processMap_.end()) {//如果沒有找到,做下面的處理 14 15 iprot->skip(::apache::thrift::protocol::T_STRUCT); 16 17 iprot->readMessageEnd(); 18 19 iprot->getTransport()->readEnd(); 20 21 //拋出一個(gè)不知道的方法的異常 22 23 ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, 24 25 "Invalid method name: '"+fname+"'"); 26 27 //寫入到調(diào)用客戶端 28 29 oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid); 30 31 x.write(oprot); 32 33 oprot->writeMessageEnd(); 34 35 oprot->getTransport()->writeEnd(); 36 37 oprot->getTransport()->flush(); 38 39 return true; 40 41 } 42 43 (this->*(pfn->second))(seqid, iprot, oprot, callContext);//調(diào)用具體的函數(shù)(RPC過程完成) 44 45 return true; 46 47 }
上面這個(gè)函數(shù)最終完成了RPC的過程,那個(gè)函數(shù)與映射函數(shù)的對(duì)應(yīng)關(guān)系的map結(jié)構(gòu)是在構(gòu)造函數(shù)中初始化的,所以可以找到,例如我們舉例的getName函數(shù)是下面這樣初始化的:
1 processMap_["getName"] = &FacebookServiceProcessor::process_getName;
和getName函數(shù)一樣,對(duì)于IDL定義的每一個(gè)函數(shù)在FacebookServiceProcessor類中都有一個(gè)映射的處理函數(shù),為了展示一個(gè)完整的處理過程我們?cè)诳纯磄etName函數(shù)的映射處理函數(shù)process_getName,它的定義如下:
1 void FacebookServiceProcessor::process_getName(int32_t seqid, 2 3 ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) 4 5 { 6 7 void* ctx = NULL; 8 9 if (eventHandler_.get() != NULL) { 10 11 //得到上下文調(diào)用環(huán)境 12 13 ctx = eventHandler_->getContext("FacebookService.getName", callContext); 14 15 } 16 17 //定義并初始化一個(gè)用于釋放資源的幫助類對(duì)象 18 19 ::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, "FacebookService.getName"); 20 21 if (eventHandler_.get() != NULL) { 22 23 eventHandler_->preRead(ctx, "FacebookService.getName");//讀之前事件處理 24 25 } 26 27 FacebookService_getName_args args; 28 29 args.read(iprot); 30 31 iprot->readMessageEnd(); 32 33 uint32_t bytes = iprot->getTransport()->readEnd(); 34 35 if (eventHandler_.get() != NULL) { 36 37 eventHandler_->postRead(ctx, "FacebookService.getName", bytes);//讀取和讀完之間的事件處理 38 39 } 40 41 FacebookService_getName_result result; 42 43 try { 44 45 iface_->getName(result.success);//這是重點(diǎn):調(diào)用服務(wù)器端的getName函數(shù) 46 47 result.__isset.success = true; 48 49 } catch (const std::exception& e) { 50 51 if (eventHandler_.get() != NULL) { 52 53 eventHandler_->handlerError(ctx, "FacebookService.getName");//錯(cuò)誤處理 54 55 } 56 57 //寫入具體的異常到客戶端 58 59 ::apache::thrift::TApplicationException x(e.what()); 60 61 oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_EXCEPTION, seqid); 62 63 x.write(oprot); 64 65 oprot->writeMessageEnd(); 66 67 oprot->getTransport()->writeEnd(); 68 69 oprot->getTransport()->flush(); 70 71 return; 72 73 } 74 75 if (eventHandler_.get() != NULL) { 76 77 eventHandler_->preWrite(ctx, "FacebookService.getName");//寫入之前事件處理 78 79 } 80 81 //寫入調(diào)用返回值(T_REPLY)消息到調(diào)用客戶端 82 83 oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_REPLY, seqid); 84 85 result.write(oprot); 86 87 oprot->writeMessageEnd(); 88 89 bytes = oprot->getTransport()->writeEnd(); 90 91 oprot->getTransport()->flush(); 92 93 if (eventHandler_.get() != NULL) { 94 95 eventHandler_->postWrite(ctx, "FacebookService.getName", bytes);//寫相應(yīng)之后處理 96 97 } 98 99 }
上面這個(gè)函數(shù)就是真正完成服務(wù)器端調(diào)用客戶端傳遞過來的函數(shù)的處理過程,有事件處理類處理相應(yīng)的事件(不過,目前都還是空實(shí)現(xiàn),以后可以繼承這個(gè)處理類重寫需要處理事件的函數(shù),例如:在調(diào)用服務(wù)器真正的處理函數(shù)之前可以先處理一下參數(shù),驗(yàn)證參數(shù)是否正確之類的),也有幫助釋放資源的幫助類。
(2)scribe服務(wù)IDL文件
1 include "/home/brucewoo/thrift-0.6.1/contrib/fb303/if/fb303.thrift" 2 3 namespace cpp scribe.thrift 4 5 namespace java scribe.thrift 6 7 namespace perl Scribe.Thrift 8 9 enum ResultCode 10 11 { 12 13 OK, 14 15 TRY_LATER 16 17 } 18 19 struct LogEntry 20 21 { 22 23 1: string category, 24 25 2: string message 26 27 } 28 29 service scribe extends fb303.FacebookService 30 31 { 32 33 ResultCode Log(1: list<LogEntry> messages); 34 35 }
這個(gè)IDL文件只定義了一個(gè)服務(wù)接口,就是用完成日志文件傳輸?shù)膸讉€(gè)Log,不過這個(gè)服務(wù)繼承FacebookService服務(wù),所以上面介紹FacebookService服務(wù)的功能它也具備,傳輸日志的結(jié)構(gòu)就是分類和具體的消息。這個(gè)服務(wù)的具體實(shí)現(xiàn)和上面介紹的FacebookService流程都是一樣的,不在詳細(xì)介紹,只要知道一點(diǎn)就是:客戶端在調(diào)用Log寫日志到scribe服務(wù)器的時(shí)候就會(huì)傳遞到服務(wù)器端來調(diào)用同名的函數(shù)處理日志。
第三節(jié) 總結(jié)
TProcessor類體系主要定義一個(gè)服務(wù)生產(chǎn)的框架,通過這個(gè)框架生產(chǎn)的各種語言的代碼可以實(shí)現(xiàn)RPC調(diào)用,具體的傳輸細(xì)節(jié)、協(xié)議和方式是通過后面講解的內(nèi)容實(shí)現(xiàn)的。
第二節(jié)對(duì)一個(gè)具體服務(wù)的實(shí)現(xiàn)內(nèi)容做詳細(xì)分析,不過都是基于文字描述和代碼分析,下面根據(jù)scribe服務(wù)提供的Log函數(shù)怎樣完成一次具體的處理過程用下面的圖形展示:

這個(gè)圖形并沒有展示內(nèi)部數(shù)據(jù)通信的細(xì)節(jié),只是簡(jiǎn)單的說明了一個(gè)客戶端的調(diào)用是怎樣完成的,服務(wù)器處理還涉及到很多相關(guān)細(xì)節(jié),將在后面章節(jié)中詳細(xì)分析。
浙公網(wǎng)安備 33010602011771號(hào)