thrift之TTransport層的分幀傳輸類TFramedTransport
幀傳輸類就是按照一幀的固定大小來傳輸數(shù)據(jù),所有的寫操作首先都是在內(nèi)存中完成的直到調(diào)用了flush操作,然后傳輸節(jié)點在flush操作之后將所有數(shù)據(jù)根據(jù)數(shù)據(jù)的有效載荷寫入數(shù)據(jù)的長度的二進(jìn)制塊發(fā)送出去,允許在接收的另一端按照固定的長度來讀取。
幀傳輸類同樣還是從緩存基類TBufferBase繼承而來,實現(xiàn)的接口當(dāng)然也基本相同,只是實現(xiàn)的方式不同而已,下面就來看看具體的實現(xiàn)過程和原理。
這個類所采用的默認(rèn)緩存長度是512(static const int DEFAULT_BUFFER_SIZE = 512;),兩個基本構(gòu)造函數(shù)一個采用默認(rèn)的緩存長度,另一個可以指定一個需要的緩存長度。下面還是重點分析慢讀、讀幀等操作的實現(xiàn)過程:
(1)慢讀實現(xiàn)如下:
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { uint32_t want = len;//想要讀取的長度 uint32_t have = rBound_ - rBase_;//內(nèi)存緩存中已經(jīng)有的數(shù)據(jù)的長度 assert(have < want);//如果以后數(shù)據(jù)長度滿足需要讀的長度就不需要采用慢讀 // 如果我們有一些數(shù)據(jù)在緩存,拷貝出來并且返回它。 // 我們沒有試圖讀取更多的數(shù)據(jù)而是不得不返回它,因為我們不能保證在下面的 // 傳輸層實際上有更多的數(shù)據(jù),因此應(yīng)該嘗試阻塞式讀它。 if (have > 0) { memcpy(buf, rBase_, have);//拷貝出緩存中已有的數(shù)據(jù) setReadBuffer(rBuf_.get(), 0);//重新設(shè)置緩存基地址 return have;//返回 } // 讀取另一幀。 if (!readFrame()) { // EOF. No frame available. return 0; } // 處理我們已有的數(shù)據(jù) uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));//已有數(shù)據(jù)想要讀取長度取短的 memcpy(buf, rBase_, give);//拷貝 rBase_ += give;//調(diào)整緩存基地址 want -= give;//計算還有多少想要的數(shù)據(jù)沒有得到 return (len - want);//返回實際讀取長度 }
緩存中沒有數(shù)據(jù)的時候就會調(diào)用讀取幀的函數(shù)readFrame,這個函數(shù)實現(xiàn)如下:
bool TFramedTransport::readFrame() { //首先讀下一幀數(shù)據(jù)的長度 int32_t sz;//存放長度的變量 uint32_t size_bytes_read = 0;//讀取長度數(shù)據(jù)的字節(jié)數(shù) while (size_bytes_read < sizeof(sz)) {//表示長度的數(shù)據(jù)小于存放長度數(shù)據(jù)的字節(jié)數(shù) uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;//長度變量轉(zhuǎn)換為指針 uint32_t bytes_read = transport_->read(szp, sizeof(sz) - size_bytes_read);//讀取 if (bytes_read == 0) {//如果返回為0表示沒有數(shù)據(jù)了 if (size_bytes_read == 0) {//沒有任何數(shù)據(jù)讀到,返回false return false; } else { // 部分的幀頭部,拋出異常。 throw TTransportException(TTransportException::END_OF_FILE, "No more data to read after " "partial frame header."); } } size_bytes_read += bytes_read;//以讀取的長度 } sz = ntohl(sz);//長整數(shù)的網(wǎng)絡(luò)字節(jié)序轉(zhuǎn)換為主機(jī)字節(jié)序 if (sz < 0) {//幀的長度不能是負(fù)數(shù)澀,拋出異常 throw TTransportException("Frame size has negative value"); } // 讀取有效數(shù)據(jù)負(fù)載,重新設(shè)置緩存標(biāo)記。 if (sz > static_cast<int32_t>(rBufSize_)) { rBuf_.reset(new uint8_t[sz]);//接收基地址 rBufSize_ = sz;//緩存大小 } transport_->readAll(rBuf_.get(), sz);//調(diào)用readAll讀取sz長度的數(shù)據(jù) setReadBuffer(rBuf_.get(), sz);//設(shè)置讀緩存基地址 return true; }
從上面實現(xiàn)代碼看出,在按幀讀取的過程中,首先需要讀取這一幀的頭部信息,而這個頭部信息就是這一幀的長度,后面就根據(jù)頭部信息中給定的長度來讀取數(shù)據(jù)部分,讀出來的數(shù)據(jù)放入緩存中。讀取頭部信息時注意處理異常的情況,還有就是讀出來的數(shù)據(jù)需要經(jīng)過網(wǎng)絡(luò)字節(jié)序到主機(jī)字節(jié)序的轉(zhuǎn)換。下面繼續(xù)看慢寫函數(shù)和flush刷新函數(shù)的實現(xiàn)過程,慢寫函數(shù)實現(xiàn)如下(快讀和快寫基類TBufferBase的實現(xiàn)已經(jīng)滿足要求了,所以不需要再去單獨實現(xiàn)了):
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { // 直到有足夠的雙緩沖大小 uint32_t have = wBase_ - wBuf_.get();//緩存空間已經(jīng)有多少數(shù)據(jù) uint32_t new_size = wBufSize_; if (len + have < have /* overflow */ || len + have > 0x7fffffff) {//如果長度溢出或大于2GB了 throw TTransportException(TTransportException::BAD_ARGS, "Attempted to write over 2 GB to TFramedTransport.");//拋出異常 } while (new_size < len + have) {//緩存空間的長度小于已有數(shù)據(jù)的長度和需要寫入數(shù)據(jù)長度的和 new_size = new_size > 0 ? new_size * 2 : 1;如果緩存空間長度是大于0的話就擴(kuò)容一倍的空間 } uint8_t* new_buf = new uint8_t[new_size];// 分配新空間 memcpy(new_buf, wBuf_.get(), have);// 拷貝已有的數(shù)據(jù)到新空間. wBuf_.reset(new_buf);// 緩存地址重新設(shè)置 wBufSize_ = new_size;// 緩存新長度 wBase_ = wBuf_.get() + have;//新的開始寫入地址 wBound_ = wBuf_.get() + wBufSize_;//寫入界限 memcpy(wBase_, buf, len);//拷貝數(shù)據(jù)到新緩存地址 wBase_ += len;//更新緩存基地址 }
上面代碼就是實現(xiàn)把從上層傳輸?shù)臄?shù)據(jù)寫入緩存中以供下層發(fā)送使用,這段代碼需要注意的是while循環(huán),這個while循環(huán)保證有足夠的緩存來存放寫入的數(shù)據(jù)到緩存中,每次增長的長度是上次的一倍;還需要注意的是,分配了新的空間需要把原來還沒有真正寫入的數(shù)據(jù)拷貝到新緩存中來,不然就會造成內(nèi)容丟失;最后就是更新緩存的基地址和長度等描述緩存的信息。繼續(xù)看flush函數(shù)的實現(xiàn)代碼:
void TFramedTransport::flush() { int32_t sz_hbo, sz_nbo; assert(wBufSize_ > sizeof(sz_nbo));//斷言緩存長度應(yīng)該大于個字節(jié)sizeof(int32_t) sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));// 滑動到第一幀數(shù)據(jù)的開始位置。 sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));//主機(jī)字節(jié)序轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序 memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));//頭部長度拷貝寫緩存 if (sz_hbo > 0) {//保證緩存有需要寫入的數(shù)據(jù) //如果底層傳輸寫拋出了異常注意確保我們處于安全的狀態(tài) //(例如內(nèi)部緩沖區(qū)清理),重置我們寫入前的狀態(tài)(因為底層沒有傳輸成功) wBase_ = wBuf_.get() + sizeof(sz_nbo);//得到 // 寫入長度和幀 transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo); } // 刷新底層傳輸. transport_->flush(); }
刷新函數(shù)就是把緩存中的數(shù)據(jù)真正的發(fā)送出去,但是在寫入到底層時,底層傳輸可能不會真正成功(如網(wǎng)絡(luò)突然斷了),這個時候底層會拋出異常,那么我們需要捕獲異常,以便重新處理這些數(shù)據(jù),只有數(shù)據(jù)真正寫入成功的時候我們才計算我們寫如數(shù)據(jù)的長度。所以還有寫結(jié)束和讀結(jié)束函數(shù)writeEnd、readEnd,它們都只有簡單的一句代碼就是計算真正完成讀寫數(shù)據(jù)的長度。
整個按幀傳輸?shù)念惖墓δ芙榻B完畢了,主要需要注意的就是緩存的操作,保證數(shù)據(jù)不丟失。將來實現(xiàn)考慮分配內(nèi)存使用c語言的malloc類函數(shù),而不是使用new操作,這樣也能提高不少的效率。
浙公網(wǎng)安備 33010602011771號