長文梳理muduo網絡庫核心代碼、剖析優秀編程細節
前言
muduo庫是陳碩個人開發的tcp網絡編程庫,支持Reactor模型,推薦大家閱讀陳碩寫的《Linux多線程服務端編程:使用muduo C++網絡庫》。本人前段時間出于個人學習、找工作的目的用C++11重構了muduo庫中核心的Reactor架構。這篇博文對muduo庫中的Multi-Reactor架構代碼進行邏輯梳理,同時認真剖析了作者每一處精妙的代碼設計思想。目前我只重構并剖析了muduo庫中的核心部分,即Multi-Reactor架構部分,具體細分有以下幾個模塊:
- 網絡相關模塊:如Socket、InetAddress、TcpConnection、Acceptor、TcpServer等
- 事件循環相關模塊:如EventLoop、Channel、Poller、EPollPoller等
- 線程相關模塊:如Thread、EventLoopThread、EventLoopThreadPool等
- 基礎模塊:如用戶態緩沖區Buffer、時間戳Timestamp、日志類Logger等
我的個人Github代碼地址:[??muduo_cpp11],但是這部分已經足夠支撐起一個基本的高并發TCP服務器的運行了。
在下文概述篇中會概述下每一個類的作用,其意圖在于對每一個類建立一個直覺,概述部分不會講的很詳細。重點留在下文主線篇。主線篇會圍繞連接建立、消息讀取、消息發送、連接斷開這幾條主線來進行梳理,同時會對代碼中優秀編程思想和編程細節進行討論。最后在線程篇會講到muduo庫中涉及到的線程控制、線程通信的機制。
概述篇
1. Multi-Reactor概述
muduo庫是基于Reactor模式實現的TCP網絡編程庫。該文章后續篇幅都是圍繞Multi-reactor模型進行展開。Multi-Reactor模型如下所示:
2. Multi-Reactor架構三大核心模塊介紹
muduo庫有三個核心組件支撐一個Reactor實現持續的監聽一組fd,并根據每個fd上發生的事件調用相應的處理函數。這三個組件分別是Channel類、Poller/EpollPoller類以及EventLoop類。
2.1. Channel
Channel類其實相當于一個文件描述符的保姆,它將文件描述符及該文件描述符對應的回調函數綁定在了一起。
在TCP網絡編程中,想要IO多路復用監聽某個文件描述符,就要把這個fd和該fd感興趣的事件通過epoll_ctl注冊到IO多路復用模塊(我管它叫事件監聽器)上。當事件監聽器監聽到該fd發生了某個事件。事件監聽器返回發生事件的fd集合以及每個fd都發生了什么事件。
Channel類則封裝了一個fd和這個fd感興趣事件以及事件監聽器監聽到該fd實際發生的事件。同時Channel類還提供了設置該fd的感興趣事件,以及將該fd及其感興趣事件注冊到事件監聽器或從事件監聽器上移除,以及保存了該fd的每種事件對應的處理函數。
Channel類有以下幾個重要成員變量:
fd_這個Channel對象照看的文件描述符int events_代表fd感興趣的事件類型集合int revents_代表事件監聽器實際監聽到該fd發生的事件類型集合,當事件監聽器監聽到一個fd發生了什么事件,通過Channel::set_revents()函數來設置revents值。EventLoop *loop:這個fd屬于哪個EventLoop對象,這個暫時不解釋。read_callback_、write_callback_、close_callback_、error_callback_:這些是std::function類型,代表著這個Channel為這個文件描述符保存的各事件類型發生時的處理函數。比如這個fd發生了可讀事件,需要執行可讀事件處理函數,這時候Channel類都替你保管好了這些可調用函數,真是貼心啊,要用執行的時候直接管保姆要就可以了。
Channel類的重要成員方法:
向Channel對象注冊各類事件的處理函數:
void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);}
一個文件描述符會發生可讀、可寫、關閉、錯誤事件。當發生這些事件后,就需要調用相應的處理函數來處理。外部通過調用上面這四個函數可以將事件處理函數放進Channel類中,當需要調用的時候就可以直接拿出來調用了。
將Channel中的文件描述符及其感興趣事件注冊事件監聽器上或從事件監聽器上移除:
void enableReading() {events_ |= kReadEvent; upadte();}
void disableReading() {events_ &= ~kReadEvent; update();}
void enableWriting() {events_ |= kWriteEvent; update();}
void disableWriting() {events_ &= ~kWriteEvent; update();}
void disableAll() {events_ |= kNonEvent; update();}
外部通過這幾個函數來告知Channel你所監管的文件描述符都對哪些事件類型感興趣,并把這個文件描述符及其感興趣事件注冊到事件監聽器(IO多路復用模塊)上。這些函數里面都有一個update()私有成員方法,這個update其實本質上就是調用了epoll_ctl()。
-
int set_revents(int revt) {revents_ = revt;}
當事件監聽器監聽到某個文件描述符發生了什么事件,通過這個函數可以將這個文件描述符實際發生的事件封裝進這個Channel中。 -
void handleEvent(TimeStamp receive_time)
當調用epoll_wait()后,可以得知事件監聽器上哪些Channel(文件描述符)發生了哪些事件,事件發生后自然就要調用這些Channel對應的處理函數。Channel::handleEvent,讓每個發生了事件的Channel調用自己保管的事件處理函數。每個Channel會根據自己文件描述符實際發生的事件(通過Channel中的revents_變量得知)和感興趣的事件(通過Channel中的events_變量得知)來選擇調用read_callback_和/或write_callback_和/或close_callback_和/或error_callback_。
2.2. Poller / EPollPoller
負責監聽文件描述符事件是否觸發以及返回發生事件的文件描述符以及具體事件的模塊就是Poller。所以一個Poller對象對應一個事件監聽器(這里我不確定要不要把Poller就當作事件監聽器)。在Multi-Reactor模型中,有多少Reactor就有多少Poller。
muduo提供了epoll和poll兩種IO多路復用方法來實現事件監聽。不過默認是使用epoll來實現,也可以通過選項選擇poll。但是我自己重構的muduo庫只支持epoll。
這個Poller是個抽象虛類,由EpollPoller和PollPoller繼承實現(陳碩的muduo網絡庫中除此之外基本都是基于對象的編程風格,只有這里通過采用面向對象的方式,Poller實際上是一個抽象類,EpollPoller才是對Poller的具體實現,也是對epoll的具體封裝),與監聽文件描述符和返回監聽結果的具體方法也基本上是在這兩個派生類中實現。EpollPoller就是封裝了用epoll方法實現的與事件監聽有關的各種方法,PollPoller就是封裝了poll方法實現的與事件監聽有關的各種方法。以后談到Poller希望大家都知道我說的其實是EpollPoller。
Poller/EpollPoller的重要成員變量:
epollfd_: 就是epoll_create方法返回的epoll句柄。channels_:這個變量是std::unordered_map<int, Channel*>類型,負責記錄文件描述符fd到Channel的映射,也幫忙保管所有注冊在你這個Poller上的Channel。ownerLoop_:所屬的EventLoop對象,看到后面你懂了。
EpollPoller給外部提供的最重要的方法:
TimeStamp poll(int timeoutMs, ChannelList *activeChannels):這個函數可以說是Poller的核心了,當外部調用poll方法的時候,該方法底層其實是通過epoll_wait獲取這個事件監聽器上發生事件的fd及其對應發生的事件,我們知道每個fd都是由一個Channel封裝的,通過哈希表channels_可以根據fd找到封裝這個fd的Channel。將事件監聽器監聽到該fd發生的事件寫進這個Channel中的revents成員變量中。然后把這個Channel裝進activeChannels中(它是一個vector<Channel*>)。這樣,當外界調用完poll之后就能拿到事件監聽器的監聽結果(activeChannels_),【后面會經常提到這個“監聽結果”這四個字,希望你明白這代表什么含義】
2.3. EventLoop
剛才的Poller是封裝了和事件監聽有關的方法和成員,調用一次Poller::poll方法它就能給你返回事件監聽器的監聽結果(發生事件的fd 及其發生的事件)。作為一個網絡服務器,需要有持續監聽、持續獲取監聽結果、持續處理監聽結果對應的事件的能力,也就是我們需要循環的去調用Poller::poll方法獲取實際發生事件的Channel集合,然后調用這些Channel里面保管的不同類型事件的處理函數(調用Channel::handleEvent方法)。
EventLoop就是負責實現“循環”,負責驅動“循環”的重要模塊!Channel和Poller其實相當于EventLoop的手下,EventLoop整合封裝了二者并向上提供了更方便的接口來使用。
2.3.1. 全局概覽Poller、Channel和EventLoop在整個Multi-Reactor通信架構中的角色
EventLoop起到一個驅動循環的功能,Poller負責從事件監聽器上獲取監聽結果。而Channel類則在其中起到了將fd及其相關屬性封裝的作用,將fd及其感興趣事件和發生的事件以及不同事件對應的回調函數封裝在一起,這樣在各個模塊中傳遞更加方便。接著EventLoop調用。

另外上面這張圖我沒有畫出Acceptor,因為Acceptor和EventLoop和Poller之間有點錯雜,可能畫出來效果不好。
2.3.2. One Loop Per Thread 含義介紹
有沒有注意到上面圖中,每一個EventLoop都綁定了一個線程(一對一綁定),這種運行模式是muduo庫的特色!充份利用了多核CPU的能力,每一個核的線程負責循環監聽一組文件描述符的集合。至于這個One Loop Per Thread是怎么實現的,后面還會交代。
2.3.3. EventLoop重要方法 EventLoop::loop()
我將EventLoop核心邏輯給出,省略了非核心代碼:
void EventLoop::loop() { // EventLoop 所屬線程執行
while (!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //此時activeChannels已經填好了事件發生的channel
for(Channel *channel : activeChannels_)
channel->handleEvent(pollReturnTime_);
}
LOG_INFO("EventLoop %p stop looping. \n", t_loopInThisThread);
}
每個EventLoop對象都唯一綁定了一個線程,這個線程其實就在一直執行這個函數里面的while循環,這個while循環的大致邏輯比較簡單。就是調用Poller::poll方法獲取事件監聽器上的監聽結果。接下來在loop里面就會調用監聽結果中每一個Channel的處理函數handleEvent()。每一個Channel的處理函數會根據Channel類中封裝的實際發生的事件,執行Channel類中封裝的各事件處理函數。(比如一個Channel發生了可讀事件,可寫事件,則這個Channel的handleEvent()就會調用提前注冊在這個Channel的可讀事件和可寫事件處理函數,又比如另一個Channel只發生了可讀事件,那么handleEvent()就只會調用提前注冊在這個Channel中的可讀事件處理函數)。上面講Channel時也提到了Channel對應的四種事件處理函數(回調函數):讀回調函數、寫回調函數、關閉回調函數、錯誤回調函數等四種回調函數。
看完上面的代碼,感受到EventLoop的主要功能了嗎?其實EventLoop所做的事情就是:持續循環的獲取監聽結果并且根據結果調用處理函數。
3. 其他主要類介紹
3.1. Acceptor類
Acceptor用于接受新用戶連接并分發連接給SubReactor(Sub EventLoop),封裝了服務器監聽套接字fd以及相關處理方法。Acceptor類內部其實沒有貢獻什么核心的處理函數,主要是對其他類的方法調用進行封裝。但是Acceptor中核心的點在于將監聽套接字和Channel綁定在了一起,并為其注冊了讀回調handRead。當有連接到來的時候,讀事件發生,就會調用相應讀回調handleRead,讀回調會間接調用TcpServer的newConnection(),該函數負責以輪詢的方式把連接分發給sub EventLoop去處理。
3.1.1. Acceptor封裝的重要成員變量:
acceptSocket_:這個是服務器監聽套接字的文件描述符,即socket套接字返回的監聽套接字acceptChannel_:這是個Channel類,把acceptSocket_及其感興趣事件和事件對應的處理函數都封裝進去。EventLoop *loop:監聽套接字的fd由哪個EventLoop負責循環監聽以及處理相應事件,其實這個EventLoop就是main EventLoop。newConnectionCallback_: TcpServer構造函數中將TcpServer::newConnection()函數注冊給了這個成員變量。這個TcpServer::newConnection函數的功能是公平的選擇一個subEventLoop,并把已經接受的連接分發給這個subEventLoop。
3.1.2. Acceptor封裝的重要成員方法:
listen():該函數底層調用了linux的函數listen(),開啟對acceptSocket_的監聽同時將acceptChannel及其感興趣事件(可讀事件)注冊到main EventLoop的事件監聽器上。換言之就是讓main EventLoop事件監聽器去監聽acceptSocket_。handleRead():這是一個私有成員方法,這個方法是要注冊到acceptChannel_上的, 同時handleRead()方法內部還調用了成員變量newConnectionCallback_保存的函數。當main EventLoop監聽到acceptChannel_上發生了可讀事件時(新用戶連接事件),就是調用這個handleRead()方法。
簡單說一下這個handleRead()最終實現的功能是什么,接受新連接,并且以負載均衡的選擇方式選擇一個sub EventLoop,并把這個新連接分發到這個subEventLoop上,這里是需要理解的重點。
3.2. Socket類
#pragma once
#include "noncopyable.h"
class InetAddress;
// 封裝socket fd
class Socket : noncopyable
{
public:
explicit Socket(int sockfd)
: sockfd_(sockfd)
{
}
~Socket();
int fd() const { return sockfd_; }
void bindAddress(const InetAddress &localaddr); //調用bind綁定服務器IP和端口
void listen(); //調用listen監聽套接字
int accept(InetAddress *peeraddr); //調用accept接收新客戶連接請求 只不過這里封裝的是accept4
void shutdownWrite(); //調用shutdown關閉服務端寫通道
/* 下面四個函數都是調用setsockopt來設置一些socket選項 */
void setTcpNoDelay(bool on); //不啟動naggle算法 增大對小數據包的支持
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on); //服務器監聽套接字文件描述符
private:
const int sockfd_;
};
這個類也確實沒啥好說的,都是TCP編程常規函數的封裝,直接看代碼注釋就可。
3.3. Buffer類
Buffer類其實是封裝了一個用戶緩沖區,以及向這個緩沖區寫數據讀數據等一系列控制方法。
3.3.1. Buffer類主要設計思想 (讀寫配合,緩沖區內部調整以及動態擴容)
我個人覺得這個緩沖區類的實現值得參考和借鑒,以前自己寫的只支持一次性全部讀出和寫入,而這個Buffer類可以讀一點,寫一點,內部邏輯穩定。這個Buffer類是vector(方便動態擴容),對外表現出std::queue的特性,它的內部原理大概就是下圖這樣子的,用兩個游標(readerIndex_和writerIndex_)標記可讀緩沖區的起始位置和空閑空間的起始位置。

其中需要關注的一個思想就是,隨著寫入數據和讀入數據,藍色的空閑空間會越來越少,prependable空間會越來越大,當什么時候空用空間耗盡了,就會向步驟4一樣,把所有數據拷貝前移,重新調整。另外當整個緩沖區的prependable空間和藍色的空閑空間都無法裝下新來的數據時,那就會調用vector的resize,實現擴容機制。
3.3.2. 重要的成員函數:
append(const char* data, size_t len):將data數據添加到緩沖區中。retrieveAsString(size_t len):獲取緩沖區中長度為len的數據,并以string返回。retrieveAllString():獲取緩沖區所有數據,并以string返回。ensureWritableByts(size_t len):當你打算向緩沖區寫入長度為len的數據之前,先調用這個函數,這個函數會檢查你的緩沖區可寫空間能不能裝下長度為len的數據,如果不能,就進行動態擴容操作。
下面兩個方法主要是封裝了調用了上面幾個方法:
ssize_t Buffer::readFd(int fd, int* saveErrno):客戶端發來數據,readFd從該TCP接收緩沖區中將數據讀出來并放到Buffer中。ssize_t Buffer::writeFd(int fd, int* saveErrno):服務端要向這條TCP連接發送數據,通過該方法將Buffer中的數據拷貝到TCP發送緩沖區中。
其實readFd和writeFd函數的設計還有一些值得討論的地方,這個放在以后討論。
3.4. TcpConnection類
在上面講Acceptor的時候提到了這個TcpConnection類。這個類主要封裝了一個已建立的TCP連接,以及控制該TCP連接的方法(連接建立和關閉和銷毀),以及該連接發生的各種事件(讀/寫/錯誤/連接)對應的處理函數,以及這個TCP連接的服務端和客戶端的套接字地址信息等。
我個人覺得TcpConnection類和Acceptor類是兄弟關系,Acceptor用于main EventLoop中,對服務器監聽套接字fd及其相關方法進行封裝(監聽、接受連接、分發連接給Sub EventLoop等),TcpConnection用于Sub EventLoop中,對連接套接字fd及其相關方法進行封裝(讀消息事件、發送消息事件、連接關閉事件、錯誤事件等)。
3.4.1. TcpConnection的重要變量:
socket_:用于保存已連接套接字文件描述符。channel_:封裝了上面的socket_及其各類事件的處理函數(讀、寫、錯誤、關閉等事件處理函數)。這個Channel中保存的各類事件的處理函數是在TcpConnection對象構造函數中注冊的。loop_:這是一個EventLoop*類型,該Tcp連接的Channel注冊到了哪一個sub EventLoop上。這個loop_就是那一個sub EventLoop。inputBuffer_:這是一個Buffer類,是該TCP連接對應的用戶接收緩沖區。outputBuffer_:也是一個Buffer類,不過是用于暫存那些暫時發送不出去的待發送數據。因為Tcp發送緩沖區是有大小限制的,假如達到了高水位線,就沒辦法把發送的數據通過send()直接拷貝到Tcp發送緩沖區,而是暫存在這個outputBuffer_中,等TCP發送緩沖區有空間了,觸發可寫事件了,再把outputBuffer_中的數據拷貝到Tcp發送緩沖區中。state_:這個成員變量標識了當前TCP連接的狀態(Connected、Connecting、Disconnecting、Disconnected)connetionCallback_、messageCallback_、writeCompleteCallback_、closeCallback_:用戶會自定義 [連接建立/關閉后的處理函數] 、[收到消息后的處理函數]、[消息發送完后的處理函數]以及Muduo庫中定義的[連接關閉后的處理函數]。這四個函數都會分別注冊給這四個成員變量保存。
3.4.2. TcpConnection的重要成員方法:
handleRead()、handleWrite()、handleClose()、handleError():這四個函數都是私有成員方法,在一個已經建立好的TCP連接上主要會發生四類事件:可讀事件、可寫事件、連接關閉事件、錯誤事件。當事件監聽器監聽到一個連接發生了以上的事件,那么就會在EventLoop中調用這些事件對應的處理函數,同時accept返回已連接套接字所綁定的Channel中注冊了這四種回調函數。
handleRead():負責處理TCP連接的可讀事件,它會將客戶端發送來的數據拷貝到用戶緩沖區中(inputBuffer_),然后再調用connectionCallback_保存的連接建立后的處理函數messageCallback_。這個messageCallback_由上層用戶注冊,之后muduo庫會在TcpServer中會對其設置。handleWrite():負責處理Tcp連接的可寫事件。這個函數的情況有些復雜,留到下一篇講解。handleClose():負責處理Tcp連接關閉的事件。大概的處理邏輯就是將這個TcpConnection對象中的channel_從事件監聽器中移除。然后調用connectionCallback_和closeCallback_保存的回調函數。這closeCallback_中保存的函數是由muduo庫提供的,connectionCallback_保存的回調函數則由用戶提供的(可有可無其實)。
主線篇
本來一開始寫這個博客的時候,很想把每一行代碼全給分析一遍,后來越整理越亂,最后直接放棄了,所以這里決定對Muduo庫的幾條大的主線進行脈絡梳理。
TCP網絡編程的本質其實是處理下面這幾個事件:
- 連接的建立。
- 連接的斷開:包括主動斷開和被動斷開。
- 消息到達,客戶端連接文件描述符可讀。
- 消息發送,向客戶端連接文件描述符寫數據。
所以我們這一篇內容也是圍繞上面四個主線展開!
1. 用muduo庫實現簡易echo服務器
要理順muduo庫的代碼邏輯,當然還是先學會一下怎么使用吧,這一小節就看代碼注釋就好了。一定要會最簡單的使用。
#include <string>
#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>
class EchoServer
{
public:
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注冊回調函數 將用戶定義的連接事件處理函數注冊進TcpServer中,TcpServer發生連接事件時會執行onConnection函數。
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1));
//將用戶定義的可讀事件處理函數注冊進TcpServer中,TcpServer發生可讀事件時會執行onMessage函數。
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 設置合適的subloop線程數量 你這里設置為3,就和概述篇圖中的EventLoop2 EventLoop3 EventLoop4對應,有三個sub EventLoop
server_.setThreadNum(3);
}
void start()
{
server_.start();
}
private:
// 連接建立或斷開的回調函數
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}
// 可讀寫事件回調
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown(); // 關閉寫端 底層響應EPOLLHUP => 執行closeCallback_
}
EventLoop *loop_;
TcpServer server_;
};
int main() {
EventLoop loop;
InetAddress addr(8002, "192.168.194.130"); //InetAddress其實是對socket編程中的sockaddr_in進行封裝,使其變為更友好簡單的接口而已
EchoServer server(&loop, addr, "EchoServer");
server.start(); //啟動TcpServer服務器
loop.loop(); //執行EventLoop::loop()函數,這個函數在概述篇的EventLoop小節有提及
return 0;
}
主要做了一下幾件事:
- 建立事件循環器EventLoop:
EventLoop loop; - 創建服務器對象,即TcpServer類對象:
TcpServer server_; - 向TcpServer注冊各類事件的用戶自定義的處理函數:
setMessageCallback()、setConnectionCallback(); - 啟動server:
server.start(); - 開啟事件循環:
loop.loop();
2. 建立連接
2.1. 連接建立的代碼邏輯

注意下面的標號分別對應上圖中的代碼方框標號!!
1:TcpServer::TcpServer()
當我們創建一個TcpServer對象,即執行代碼TcpServer server(&loop, listenAddr);調用了TcpServer的構造函數,TcpServer構造函數最主要的就是類的內部實例化了一個Acceptor對象,并往這個Acceptor對象注冊了一個回調函數TcpServer::newConnection()。
5:Acceptor::Acceptor()
當我們在TcpServer構造函數實例化Acceptor對象時,Acceptor的構造函數中實例化了一個Channel對象,即acceptChannel_,該Channel對象封裝了服務器監聽套接字文件描述符(尚未注冊到main EventLoop的事件監聽器上)。接著Acceptor構造函數將Acceptor::handleRead()方法注冊進acceptChannel_中,這也意味著,日后如果事件監聽器監聽到acceptChannel_發生可讀事件,將會調用Acceptor::handleRead()函數。
至此,TcpServer對象創建完畢,用戶調用TcpServer::start()方法,開啟TcpServer。我們來直接看一下TcpServer::start()方法都干了什么,我省略了一些非核心的代碼:
/******** TcpServer.cc *********/
void TcpServer::start() // 開啟服務器監聽
{
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
/******** Acceptor.cc *********/
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_注冊至Poller !重要
}
其實就是將其實主要就是調用Acceptor::listen()函數(底層是調用了linux的函數listen())監聽服務器套接字,以及將acceptChannel_注冊到main EventLoop的事件監聽器上監聽它的可讀事件(新用戶連接事件)
接著用戶調用loop.loop(),即調用了EventLoop::loop()函數,該函數就會循環的獲取事件監聽器的監聽結果,并且根據監聽結果調用注冊在事件監聽器上的Channel對象的事件處理函數。
6:Acceptor::handleRead()
當程序如果執行到了這個函數里面,說明acceptChannel_發生可讀事件,程序處理新客戶連接請求。該函數首先調用了Linux的函數accept()接受新客戶連接。接著調用了TcpServer::newConnection()函數,這個函數是在步驟1中注冊給Acceptor并由成員變量newConnectionCallback_保存。
7:TcpServer::newConnection()
該函數的主要功能就是將建立好的連接進行封裝(封裝成TcpConnection對象),并使用選擇算法公平的選擇一個sub EventLoop,并調用TcpConnection::connectEstablished()將TcpConnection::channel_注冊到剛剛選擇的sub EventLoop上。
2.2. 編程細節啟發,什么時候用智能指針管理對象最合適!
我平時對智能指針的使用缺乏權衡感。在一些情況下使用智能指針會帶來額外的性能開銷,所以不能無腦梭哈。但是智能指針又能保護內存安全。這里的編程細節也給了我一些啟發。來看下下面的核心邏輯代碼,非核心以刪除:
/******** Callbacks.h ********/
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
/******** TcpServer.cc ********/
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
ioLoop->runInLoop(
std::bind(&TcpConnection::connectEstablished, conn));
}
在TcpServer::newConnection()函數中,當接受了一個新用戶連接,就要把這個Tcp連接封裝成一個TcpConnection對象,也就是上面代碼中的new TcpConnection(…)。然后用一個共享型智能指針來管理這個對象。所以為什么這里要把TcpConnection用智能指針來管理啊?
這里使用智能指針管理TcpConnection的最重要原因在于防止指針懸空,而指針懸空可能會來自以下這三個方面:
- TcpConnection會和用戶直接交互,用戶可能會手欠刪除。在我們編寫echo服務器的時候,我們用戶可以自定義連接事件發生后的處理函數(如下所示),并將這個函數注冊到TcpServer中。
/**** 用戶自定義的連接事件發生后的處理函數 *****/
void onConnection(const TcpConnectionPtr &conn)
{
...
}
假如這里的onConnection函數傳入的是TcpConnection而不是TcpConnectionPtr,用戶在onConnection函數中把TcpConnection對象給delete了怎么辦?刪除了之后,程序內部還要好幾處地方都在使用TcpConnection對象。結果這個對象的內存突然消失了,服務器訪問非法內存崩潰。雖然這一系列連鎖反應會讓人覺得用戶很笨。但是作為設計者的我們必須要保證,編程設計不可以依賴用戶行為,一定要盡可能地封死用戶的誤操作。所以這里用了共享智能指針。
- TcpConnection對象的多線程安全問題:假如服務器要關閉了,這個時候Main EventLoop線程中的TcpServer::~TcpServer()函數開始把所有TcpConnection對象都刪掉。那么其他線程還在使用這個TcpConnection對象,如果你把它的內存空間都釋放了,其他線程訪問了非法內存,會直接崩潰。
你可能會覺得,反正我都要把服務器給關了,崩就崩了吧。這種想法是錯的!因為可能在你關閉服務器的時候,其他線程正在處理TcpConnection的發送消息任務,這個時候你應該等它發完才釋放TcpConnection對象的內存才對!
第三種情況我們留到將連接關閉的時候再來討論,這一部分也是有很好的編程啟發的!
3. 消息讀回

在Main EventLoop中接受新連接請求之后,將這條TCP連接封裝成TcpConnection對象。TcpConnection對象的內容如上圖所示,主要就是封裝了已連接套接字的fd(上圖中的socket_,即accept返回的已連接套接字)、已連接套接字的channel_等。在TcpConnection的構造函數中會將TcpConnection::handleRead()等四個上圖中的藍色方法注冊進這個channel_內。
當TcpConnection對象建立完畢之后,Main EventLoop的Acceptor會將這個TcpConnection對象中的channel_注冊到某一個Sub EventLoop中。如何注冊的呢?就是TcpConnection對象的EventLoop *loop_指針指向子線程綁定的EvenLoop對象上即可,標志著當前的TcpConnection屬于該具體的EventLoop。
3.1 消息讀取邏輯

如上圖所示,Sub EventLoop中的EventLoop::loop()函數內部會循環的執行上圖中的步驟1和步驟2。步驟1就是調用Poller::poll()方法獲取事件監聽結果,這個事件監聽結果是一個Channel集合,每一個Channel封裝著一個fd、fd感興趣的事件和事件監聽器監聽到該fd實際發生的事件。步驟2就是調用每一個Channel的Channel::handleEvent方法。該方法會根據每一個Channel的感興趣事件以及實際發生的事件調用提前注冊在Channel內的對應的事件處理函數:readCallback_、writeCallback_、closeCallback_、errorCallback_。
readCallback_保存的函數其實是TcpConnection::handleRead(),消息讀取的處理邏輯也就是由這個函數提供的,我們稍微剖析一下這個函數:
// 讀是相對服務器而言的 當對端客戶端有數據到達 服務器端檢測到EPOLLIN 就會觸發該fd上的回調 handleRead取讀走對端發來的數據
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) // 有數據到達
{
// 已建立連接的用戶有可讀事件發生了 調用用戶傳入的回調操作onMessage shared_from_this就是獲取了TcpConnection的智能指針
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0) // 客戶端斷開
{
handleClose();
}
else // 出錯了
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
TcpConnection::handleRead()函數首先調用Buffer_.readFd(channel_->fd(), &saveErrno),該函數底層調用Linux的函數readv(),將TCP接收緩沖區數據拷貝到用戶定義的緩沖區中(inputBuffer_)。如果在讀取拷貝的過程中發生了什么錯誤,這個錯誤信息就會保存在savedErrno中。
- readFd()返回值>0,說明從接收緩沖區中讀取到了數據,那么會接著調用messageCallback_中保存的用戶自定義的讀取消息后的處理函數。
- readFd()返回值=0,說明客戶端連接關閉,這時候應該調用TcpConnection::handleClose()來處理連接關閉事件。
- readFd()返回值=-1,說明發生了錯誤,調用TcpConnection::handleError()來處理savedErrno的錯誤事件。muduo庫只支持LT模式,所以讀事件不會出現EAGAIN的錯誤,所以一旦出現錯誤,說明肯定是不好的非正常錯誤了。而EAGAIN錯誤只不過是非阻塞IO調用時的一種常見錯誤而已,關于EAGAIN更多的細節,可以參考我的另一篇文章[??IO模型及高性能網絡架構分析]。
3.2 Buffer::readFd()函數剖析:
剖析這個函數是因為這個函數的設計有可取之處。這個readFd巧妙的設計,可以讓用戶一次性把所有TCP接收緩沖區的所有數據全部都讀出來并放到用戶自定義的緩沖區Buffer中。
用戶自定義緩沖區Buffer是有大小限制的,我們一開始不知道TCP接收緩沖區中的數據量有多少,如果一次性讀出來會不會導致Buffer裝不下而溢出。所以在readFd()函數中會在棧上創建一個臨時空間extrabuf,然后使用readv的分散讀特性,將TCP緩沖區中的數據先拷貝到Buffer中,如果Buffer容量不夠,就把剩余的數據都拷貝到extrabuf中,然后再調整Buffer的容量(動態擴容),再把extrabuf的數據拷貝到Buffer中。當這個函數結束后,extrabuf也會被釋放。另外extrabuf是在棧上開辟的空間,速度比在堆上開辟還要快。
/**
* 從fd上讀取數據 Poller工作在LT模式
* Buffer緩沖區是有大小的! 但是從fd上讀取數據的時候 卻不知道tcp數據的最終大小
*
* @description: 從socket讀到緩沖區的方法是使用readv先讀至buffer_,
* Buffer_空間如果不夠會讀入到棧上65536個字節大小的空間,然后以append的
* 方式追加入buffer_。既考慮了避免系統調用帶來開銷,又不影響數據的接收。
**/
ssize_t Buffer::readFd(int fd, int *saveErrno)
{
// 棧額外空間,用于從套接字往出讀時,當buffer_暫時不夠用時暫存數據,待buffer_重新分配足夠空間后,在把數據交換給buffer_。
char extrabuf[65536] = {0}; // 棧上內存空間 65536/1024 = 64KB
/*
struct iovec {
ptr_t iov_base; // iov_base指向的緩沖區存放的是readv所接收的數據或是writev將要發送的數據
size_t iov_len; // iov_len在各種情況下分別確定了接收的最大長度以及實際寫入的長度
};
*/
// 使用iovec分配兩個連續的緩沖區
struct iovec vec[2];
const size_t writable = writableBytes(); // 這是Buffer底層緩沖區剩余的可寫空間大小 不一定能完全存儲從fd讀出的數據
// 第一塊緩沖區,指向可寫空間
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二塊緩沖區,指向棧空間
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof(extrabuf);
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
// 這里之所以說最多128k-1字節,是因為若writable為64k-1,那么需要兩個緩沖區 第一個64k-1 第二個64k 所以做多128k-1
// 如果第一個緩沖區>=64k 那就只采用一個緩沖區 而不使用棧空間extrabuf[65536]的內容
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可寫緩沖區已經夠存儲讀出來的數據了
{
writerIndex_ += n;
}
else // extrabuf里面也寫入了n-writable長度的數據
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // 對buffer_擴容 并將extrabuf存儲的另一部分數據追加至buffer_
}
return n;
}
4. 消息發送
4.1 消息發送邏輯
當用戶調用了TcpConnetion::send(buf)函數時,相當于要求muduo庫把數據buf發送給該TCP連接的客戶端。此時該TcpConnection注冊在事件監聽器上的感興趣事件中是沒有可寫事件的。TcpConnection::send(buf)函數內部其實是調用了Linux的函數write()。
- 如果TCP發送緩沖區能一次性容納buf,那這個write()函數將buf全部拷貝到發送緩沖區中。
- 如果TCP發送緩沖區內不能一次性容納buf:
-
這時候write()函數buf數據盡可能地拷貝到TCP發送緩沖區中,并且將errno設置為EWOULDBLOCK(等效于EAGAIN)。
-
剩余未拷貝到TCP發送緩沖區中的buf數據會被存放在TcpConnection::outputBuffer_中。并且向事件監聽器上注冊該TcpConnection::channel_的可寫事件。
-
事件監聽器監聽到該TCP連接可寫事件,就會調用
TcpConnection::handleWrite()函數把TcpConnection::outputBuffer_中剩余的數據發送出去。 -
在
TcpConnection::handleWrite()函數中,通過調用Buffer::writeFd()函數將outputBuffer_的數據寫入到TCP發送緩沖區,如果TCP發送緩沖區能容納全部剩余的未發送數據,那最好不過了。如果TCP發送緩沖區依舊沒法容納剩余的未發送數據,那就盡可能地將數據拷貝到TCP發送緩沖區中,繼續保持可寫事件的監聽。
-
當數據全部拷貝到TCP發送緩沖區之后,就會調用用戶自定義的【寫完后的事件處理函數】,并且移除該TcpConnection在事件監聽器上的可寫事件。(移除可寫事件是為了提高效率,不會讓epoll_wait() 毫無意義的頻繁觸發可寫事件。因為大多數時候是沒有數據需要發送的,頻繁觸發可寫事件但又沒有數據可寫。)
這里補充一下我對可讀可寫的理解,由于muduo底層是epoll驅動的,可讀可寫狀態是由文件描述符對應的內核緩沖區決定的,若要發生可讀或者可寫事件,首先,前提是要向epoll注冊(調用epoll_ctl)需要關注的事件類型。當關注了可讀事件時,若文件描述符對應的緩沖區從空變為非空,則觸發讀事件;當關注了可寫事件時,若文件描述符對應的緩沖區由滿變為不滿,則觸發寫事件。當然epoll的LT模式會多次觸發,而ET模式僅觸發一次,這里不過多介紹。回到上面的項目來說,一般我們的文件描述符對應的緩沖區都不是滿的,所以一開始不會關注寫事件,否則如果說緩沖區不滿就觸發寫事件,那就會一直輪詢觸發,對系統資源是一種浪費,所以一般情況下只關注讀事件。只有當系統要向文件描述符對應的緩沖區寫數據時,再關注寫事件,這樣一來,當緩沖區有“空位”,我就立刻往里面丟數據,反而在這種關鍵時刻關注寫事件,寫效率會更高一些,寫完數據記得再通過epoll_ctl取消對寫事件的關注。
5. 連接斷開
5.1. 連接被動斷開
服務端TcpConnection::handleRead()中感知到客戶端把連接斷開了。TcpConnection::handleRead()函數內部調用了Linux的函數readv(),當readv()返回0的時候,服務端就知道客戶端斷開連接了。然后就接著調用TcpConnection::handleClose()。

上圖中的標號1、2、3是函數調用順序,我們可以看到:
- 在執行
TcpConnection::handle_Close()的時候,該函數還是在Sub EventLoop線程中運行的,接著調用closeCallback_(connPtr)回調函數,該函數保存的其實是TcpServer::removeConnection()函數 TcpServer::removeConnection()函數調用了remvoveConnectionInLoop()函數,該函數的運行是在Main EventLoop線程中執行的,這里涉及到線程切換技術,后面再講。removeConnectionInLoop()函數:TcpServer對象中有一個connections_成員變量,這是一個unordered_map,負責保存string到TcpConnection的映射,其實就是保存著TCP連接的名字到TcpConnection對象的映射。因為這個TCP連接要關閉了,所以也要把這個TcpConnection對象從connections_中刪掉。然后再調用TcpConnection::connectDestroyed函數。另外為什么removeConnectionInLoop()要在Main EventLoop中運行,因為該函數主要是從TcpServer對象中刪除某條數據。而TcpServer對象是屬于Main EventLoop的。這也是貫徹了One Loop Per Thread的理念。TcpConnection::connectDestroyed()函數的執行是又跳回到了sub EventLoop線程中。該函數就是將TCP連接的監聽描述符從事件監聽器中移除。另外Sub EventLoop中的Poller類對象還保存著這條TCP連接的channel_,所以調用channel_.remove()將這個TCP連接的channel對象從Poller內的數據結構中刪除。
再提醒一下,線程切換后面會講,這里不知道為什么函數的執行為什么可以在線程之間來回跳來跳去的話,先將就將就。
5.2. 服務器主動關閉導致連接斷開
當服務器主動關閉時,調用TcpServer::~TcpServer()析構函數。
我們之前討論了為什么TcpConnection要用智能指針管理,留下了一個TcpConnection對象的多線程安全問題沒討論。這里涉及到兩個很好的編程設計思想。好好學起來!
5.2.1. TcpConnection對象的析構問題:
這里在提示一下EventLoop::runInLoop()函數的意義,假如你有一個EventLoop對象 loop_,當你調用了loop_->runInLoop(function)函數時,這個function函數的執行會在這個loop_綁定的線程上運行!
所以我們畫了下面這幅圖,在創建TcpConnection對象時,Acceptor都要將這個對象分發給一個Sub EventLoop來管理。這個TcpConnection對象的一切函數執行都要在其管理的Sub EventLoop線程中運行。再一次貫徹One Loop Per Thread的設計模式。比如要想徹底刪除一個TcpConnection對象,就必須要調用這個對象的connecDestroyed()方法,這個方法執行完后才能釋放這個對象的堆內存。每個TcpConnection對象的connectDestroyed()方法都必須在這個TcpConnection對象所屬的Sub EventLoop綁定的線程中執行。

所有上面的TcpServer::~TcpServer()函數就是干這事兒的,不斷循環的讓這個TcpConnection對象所屬的Sub EventLoop線程執行TcpConnection::connectDestroyed()函數,同時在Main EventLoop的TcpServer::~TcpServer()函數中調用item.second.reset()釋放保管TcpConnection對象的共享智能指針,以達到釋放TcpConnection對象的堆內存空間的目的。
但是這里面其實有一個問題需要解決,TcpConnection::connectDestroyed()函數的執行以及這個TcpConnection對象的堆內存釋放操作不在同一個線程中運行,所以要考慮怎么保證一個TcpConnectino對象的堆內存釋放操作是在TcpConnection::connectDestroyed()調用完后。
這個析構函數巧妙利用了共享智能指針的特點,當沒有共享智能指針指向這個TcpConnection對象時(引用計數為0),這個TcpConnection對象就會被析構刪除(堆內存釋放)。
我們解讀一下TcpServer::~TcpServer()中的代碼邏輯:
TcpServer::~TcpServer()
{
for(auto &item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset(); // 把原始的智能指針復位 讓棧空間的TcpConnectionPtr conn指向該對象 當conn出了其作用域 即可釋放智能指針指向的對象
// 銷毀連接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}
-
首先
TcpServer::connections_是一個unordered_map<string, TcpConnectionPtr>,其中TcpConnectionPtr的含義是指向TcpConnection的shared_ptr。 -
在一開始,每一個TcpConnection對象都被一個共享智能指針TcpConnetionPtr持有,當執行了
TcpConnectionPtr conn(item.second)時,這個TcpConnetion對象就被conn和這個item.second共同持有,但是這個conn的生存周期很短,只要離開了當前的這一次for循環,conn就會被釋放。 -
緊接著調用
item.second.reset()釋放掉TcpServer中保存的該TcpConnectino對象的智能指針。此時在當前情況下,只剩下conn還持有這個TcpConnection對象,因此當前TcpConnection對象還不會被析構。 -
接著調用了
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));這句話的含義是讓Sub EventLoop線程去執行TcpConnection::connectDestroyed()函數。當你把這個conn的成員函數傳進去的時候,conn所指向的資源的引用計數會加1。因為傳給runInLoop的不只有函數,還有這個函數所屬的對象conn。 -
Sub EventLoop線程開始運行
TcpConnection::connectDestroyed()函數。 -
Main EventLoop線程當前這一輪for循環跑完,共享智能指針conn離開代碼塊,因此被析構,但是TcpConnection對象還不會被釋放,因為還有一個共享智能指針指向這個TcpConnection對象,而且這個智能指針在
TcpConnection::connectDestroyed()中,只不過這個智能指針你看不到,它在這個函數中是一個隱式的this的存在。當這個函數執行完后,智能指針就真的被釋放了。到此,就沒有任何智能指針指向這個TcpConnection對象了。TcpConnection對象就徹底被析構刪除了。
5.2.2. 如果TcpConnection中有正在發送的數據,怎么保證在觸發TcpConnection關閉機制后,能先讓TcpConnection先把數據發送完再釋放TcpConnection對象的資源?
這個問題就要好好參考這部分代碼的設計了,這部分代碼也是很值得吸收的精華。
/******* TcpConnection.cc *******/
// 連接建立
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 向poller注冊channel的EPOLLIN讀事件
// 新連接建立 執行回調
connectionCallback_(shared_from_this());
}
我們先了解一下shared_from_this()是什么意思,首先TcpConnection類繼承了一個類,繼承了這個類之后才能使用shared_from_this()函數。
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
假如我們在TcpConnection對象(我們管這個對象叫TCA)中的成員函數中調用了shared_from_this(),該函數可以返回一個shared_ptr,并且這個shared_ptr指向的對象是TCA。接著這個shared_ptr就作為channel_的Channel::tie()函數的函數參數。
/******* Channel.h ********/
std::weak_ptr<void> tie_;
/******* Channel.cc ********/
// channel的tie方法什么時候調用過? TcpConnection => channel
/**
* TcpConnection中注冊了Chnanel對應的回調函數,傳入的回調函數均為TcpConnection
* 對象的成員方法,因此可以說明一點就是:Channel的結束一定早于TcpConnection對象!
* 此處用tie去解決TcoConnection和Channel的生命周期時長問題,從而保證了Channel對
* 象能夠在TcpConnection銷毀前銷毀。
**/
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}
void Channel::handleEvent(Timestamp receiveTime)
{
if (tied_)
{
std::shared_ptr<void> guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
// 如果提升失敗了 就不做任何處理 說明Channel的TcpConnection對象已經不存在了
}
else
{
handleEventWithGuard(receiveTime); //我實在不知道啥情況下會走到這里 希望有知道的大佬可以評論區告訴我
}
}
當事件監聽器返回監聽結果,就要對每一個發生事件的channel對象調用他們的HandlerEvent()函數。在這個HandlerEvent函數中,會先把tie_這個weak_ptr提升為強共享智能指針。這個強共享智能指針會指向當前的TcpConnection對象。就算你外面調用刪除析構了其他所有的指向該TcpConnection對象的智能指針。你只要handleEventWithGuard()函數沒執行完,你這個TcpConnetion對象都不會被析構釋放堆內存。而handleEventWithGuard()函數里面就有負責處理消息發送事件的邏輯。當handleEventWithGuard()函數調用完畢,這個guard智能指針就會被釋放。
線程篇
One Loop Per Thread的含義就是,一個EventLoop和一個線程唯一綁定,和這個EventLoop有關的,被這個EventLoop管轄的一切操作都必須在這個EventLoop綁定線程中執行,比如在Main EventLoop中,負責新連接建立的操作都要在Main EventLoop線程中運行。已建立的連接分發到某個Sub EventLoop上,這個已建立連接的任何操作,比如接收數據發送數據,連接斷開等事件處理都必須在這個Sub EventLoop線程上運行,還不準跑到別的Sub EventLoop線程上運行。那這到底到底怎么實現的呢?這里終于有機會講了。
1. 預備知識:eventfd()的使用
/* eventfd - create a file descriptor for event notification */
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
調用函數eventfd()會創建一個eventfd對象,或者也可以理解打開一個eventfd類型的文件,類似普通文件的open操作。eventfd的在內核空間維護一個無符號64位整型計數器, 初始化為initval的值。
flags是以下三個標志位或運算的結果:
EFD_CLOEXEC(since Linux 2.6.27):eventfd()返回一個文件描述符,如果該進程被fork的時候,這個文件描述符也會被復制過去,這個時候就會有多個描述符指向同一個eventfd對象,如果設置了這個標志,則子進程在執行exec的時候,會自動清除掉父進程的這個文件描述符。EFD_NONBLOCK(since Linux 2.6.27):文件描述符會被設置為O_NONBLOCK,如果沒有設置這個標志位,read操作的時候將會阻塞直到計數器中有值,如果設置了這個這個標志位,計數器沒有值得時候也會立刻返回-1。EFD_SEMAPHORE(since Linux 2.6.30):這個標志位會影響read操作。后面講。
使用方法:
- write向eventfd中寫值
- 如果寫入的值和 <
0xFFFFFFFFFFFFFFFE,則寫入成功。 - 如果 >
0xFFFFFFFFFFFFFFFE:- 設置了
EFD_NONBLOCK標志位,則直接返回-1。 - 沒有設置
EFD_NONBLOCK標志位,就會一直阻塞到read操作執行。
- 設置了
- 如果寫入的值和 <
- read讀取eventfd的值:
- 如果計數器中的值 > 0
- 設置了
EFD_SEMAPHORE標志位,則返回1,且計數器的值減去1。 - 沒有設置
EFD_SEMAPHORE標志位,則返回計數器中的值,并且設置計數器值為0。
- 設置了
- 如果計數器中的值 = 0
- 設置了
EFD_NONBLOCK標志位就直接返回-1。 - 沒有設置
EFD_NONBLOCK標志位就會一直阻塞直到計數器中的值大于0。
- 設置了
- 如果計數器中的值 > 0
2. 如何保證一個EventLoop對象和一個線程唯一綁定(該線程只能綁定一個EventLoop對象,該EventLoop對象也必須綁定一個線程)
下面是EventLoop構造函數(我把不相關的代碼全刪了,只保留部分邏輯代碼)
/****** EventLoop.cc ******/
// 防止一個線程創建多個EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;
EventLoop::EventLoop()
: wakeupFd_(createEventfd()) //生成一個eventfd,每個EventLoop對象,都會有自己的eventfd
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d\n", this, threadId_);
if (t_loopInThisThread) //如果當前線程已經綁定了某個EventLoop對象了,那么該線程就無法創建新的EventLoop對象了
{
LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this)); // 設置wakeupfd的事件類型以及發生事件后的回調操作
wakeupChannel_->enableReading(); // 每一個EventLoop都將監聽wakeupChannel_的EPOLL讀事件了
}
介紹一下這個__thread,這個__thread是一個關鍵字,被這個關鍵字修飾的全局變量t_loopInThisThread會具備一個屬性,那就是該變量在每一個線程內都會有一個獨立的實體。因為一般的全局變量都是被同一個進程中的多個線程所共享,但是這里我們不希望這樣。
在EventLoop對象的構造函數中,如果當前線程沒有綁定EventLoop對象,那么t_loopInThisThread為nullptr,然后就讓該指針變量指向EventLoop對象的地址。如果t_loopInThisThread不為nullptr,說明當前線程已經綁定了一個EventLoop對象了,這時候EventLoop對象構造失敗!
3. Muduo庫如何實現每個EventLoop線程只運行隸屬于該EventLoop的操作?
這個小標題有點拗口,這一小節主要解決這樣的一個問題,比如在MainEventLoop中,負責新連接建立的操作都要在MainEventLoop線程中運行。已建立的連接分發到某個SubEventLoop上之后,這個已建立連接的任何操作,比如接收數據發送數據,連接斷開等事件處理都必須在這個SubEventLoop線程上運行,不準跑到別的SubEventLoop線程上運行。
EventLoop構造函數的初始化列表中,如下所示(省略了部分代碼):
/* 創建線程之后主線程和子線程誰先運行是不確定的。
* 通過一個eventfd在線程之間傳遞數據的好處是多個線程無需上鎖就可以實現同步。
* eventfd支持的最低內核版本為Linux 2.6.27,在2.6.26及之前的版本也可以使用eventfd,但是flags必須設置為0。
* 函數原型:
* #include <sys/eventfd.h>
* int eventfd(unsigned int initval, int flags);
* 參數說明:
* initval,初始化計數器的值。
* flags, EFD_NONBLOCK,設置socket為非阻塞。
* EFD_CLOEXEC,執行fork的時候,在父進程中的描述符會自動關閉,子進程中的描述符保留。
* 場景:
* eventfd可以用于同一個進程之中的線程之間的通信。
* eventfd還可以用于同親緣關系的進程之間的通信。
* eventfd用于不同親緣關系的進程之間通信的話需要把eventfd放在幾個進程共享的共享內存中(沒有測試過)。
*/
// 創建wakeupfd 用來notify喚醒subReactor處理新來的channel
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) //eventfd創建失敗,一般不會失敗,除非一個進程把文件描述符(Linux一個進程最多1024個)全用光了。
{
LOG_FATAL("eventfd error:%d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: wakeupFd_(createEventfd()) //生成一個eventfd,每個EventLoop對象,都會有自己的eventfd
...
{...}
在EventLoop的初始化列表中:
createEventfd()返回一個eventfd文件描述符,并且該文件描述符設置為非阻塞和子進程不拷貝模式。該eventfd文件描述符賦給了EventLoop對象的成員變量wakeupFd_。- 隨即將
wakeupFd_用Channel封裝起來,得到wakeupChannel_。接著在EventLoop構造函數中。
我們來描繪一個情景,我們知道每個EventLoop線程主要就是在執行其EventLoop對象的loop函數(該函數就是一個while循環,循環的獲取事件監聽器的結果以及調用每一個發生事件的Channel的事件處理函數)。此時Sub EventLoop上注冊的TCP連接都沒有任何動靜,整個Sub EventLoop線程就阻塞在epoll_wait()上。
此時Main EventLoop接受了一個新連接請求,并把這個新連接封裝成一個TcpConnection對象,并且希望在Sub EventLoop線程中執行TcpConnection::connectEstablished()函數,因為該函數的目的是將TcpConnection注冊到Sub EventLoop的事件監聽器上,并且調用用戶自定義的連接建立后的處理函數。當該TcpConnection對象注冊到Sub EventLoop之后,這個TcpConnection對象的任何操作(包括調用用戶自定義的連接建立后的處理函數。)都必須要在這個Sub EventLoop線程中運行,所以TcpConnection::connectEstablished()函數必須要在Sub EventLoop線程中運行。
那么我們怎么在Main EventLoop線程中通知Sub EventLoop線程起來執行TcpConnection::connectEstablished()函數呢?這里就要好好研究一下EventLoop::runInLoop()函數了。
// 在當前loop中執行cb
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 如果當前調用runInLoop的線程正好是EventLoop的運行線程,則直接執行此回調函數
{
cb();
}
else // 在非當前EventLoop線程中執行cb,就需要喚醒EventLoop所在線程執行cb
{
queueInLoop(cb);
}
}
// 把cb放入隊列中 喚醒loop所在的線程執行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
/**
* || callingPendingFunctors的意思是 當前loop正在執行回調中 但是loop的pendingFunctors_中又加入了新的回調 需要通過wakeup寫事件
* 喚醒相應的需要執行上面回調操作的loop的線程 讓loop()下一次poller_->poll()不再阻塞(阻塞的話會延遲前一次新加入的回調的執行),然后
* 繼續執行pendingFunctors_中的回調函數
**/
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 喚醒loop所在線程
}
}
// 用來喚醒loop所在線程 向wakeupFd_寫一個數據 wakeupChannel就發生讀事件 當前loop線程就會被喚醒
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
}
// 開啟事件循環
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping\n", this);
while (!quit_)
{
activeChannels_.clear();
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// Poller監聽哪些channel發生了事件 然后上報給EventLoop 通知channel處理相應的事件
channel->handleEvent(pollRetureTime_);
}
/**
* 執行當前EventLoop事件循環需要處理的回調操作 對于線程數 >=2 的情況 IO線程 mainloop(mainReactor) 主要工作:
* accept接收連接 => 將accept返回的connfd打包為Channel => TcpServer::newConnection通過輪詢將TcpConnection對象分配給subloop處理
*
* mainloop調用queueInLoop將回調加入subloop(該回調需要subloop執行 但subloop還在poller_->poll處阻塞) queueInLoop通過wakeup將subloop喚醒
**/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n", this);
looping_ = false;
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_); // 交換的方式減少了鎖的臨界區范圍 提升效率 同時避免了死鎖 如果執行functor()在臨界區內 且functor()中調用queueInLoop()就會產生死鎖
}
for (const Functor &functor : functors)
{
functor(); // 執行當前loop需要執行的回調操作
}
callingPendingFunctors_ = false;
}
首先EventLoop::runInLoop函數接受一個可調用的函數對象Functor cb,如果當前CPU正在運行的線程就是該EventLoop對象綁定的線程,那么就直接執行cb函數。否則就把cb傳給queueInLoop()函數。
在queueInLoop()函數中主要就是把cb這個可調用對象保存在EventLoop對象的pendingFunctors_這個數組中,我們希望這個cb能在某個EventLoop對象所綁定的線程上運行,但是由于當前CPU執行的線程不是我們期待的這個EventLoop線程,我們只能把這個可調用對象先存在這個EventLoop對象的數組成員pendingFunctors_中。
再把目光轉移到上面代碼中的EventLoop::loop()函數中,我們知道EventLoop::loop()肯定運行在其所綁定的EventLoop線程內,在該函數內會調用doPendingFunctors()函數,這個函數就是把自己這個EventLoop對象中的pendingFunctors_數組中保存的可調用對象拿出來執行。pendingFunctors_中保存的是其他線程希望你這個EventLoop線程執行的函數。那么問題來了,假如EventLoop A線程阻塞在EventLoop::loop()中的epoll_wait()調用上(EventLoop A上監聽的文件描述符沒有任何事件發生),這時候EventLoop線程要求EventLoop A趕緊執行某個函數,那其他線程要怎么喚醒這個阻塞住的EventLoop A線程呢?這時候我們就要把目光聚焦在上面的wakeup()函數了。
wakeup()函數就是向我們想喚醒的線程所綁定的EventLoop對象持有的wakeupFd_隨便寫入一個8字節數據,因為wakeupFd_已經注冊到了這個EventLoop中的事件監聽器上,這時候事件監聽器監聽到有文件描述符的事件發生,epoll_wait()阻塞結束而返回。這就相當于起到了喚醒線程的作用!你這個EventLoop對象既然阻塞在事件監聽上,那我就通過wakeup()函數給你這個EventLoop對象一個事件,讓你結束監聽阻塞。
關于這個EventLoop::doPendingFunctors()函數這部分實現,也值得去學習!
試想一個問題,如果當前線程要去執行消費該線程EventLoop對應的任務隊列里的回調函數,此時又有新的回調函數想加入到該隊列中,那我們肯定沒辦法一邊去遍歷隊列來執行消費任務隊列中的回調函數,一邊向任務隊列加入新的任務,我們需要首先對隊列加鎖,執行完隊列中的任務后再去解鎖,然后再讓其他回調函數能夠加入到任務隊列。但是這樣肯定是不可行的,其效率是非常低的,如下圖所示:
所以引發了我的思考,我認為如何優化鎖這件事情對提升程序執行效率是非常重要的,需要考慮到以下兩點:
- 如何減少鎖的臨界區大小(鎖住的代碼范圍)
- 如何用盡可能少的時間去持有鎖
以上兩種方式的目的就是盡可能少的在代碼里產生互斥,這樣的話,代碼的執行效率就會得到提升。那么陳碩是如何解決上面對整體隊列加鎖的情況呢?我們來看下代碼吧:
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_); // 交換的方式減少了鎖的臨界區范圍 提升效率 同時避免了死鎖 如果執行functor()在臨界區內 且functor()中調用queueInLoop()就會產生死鎖
}
for (const Functor &functor : functors)
{
functor(); // 執行當前loop需要執行的回調操作
}
callingPendingFunctors_ = false;
}
陳碩在doPendingFunctors()函數中,創建了一個棧空間的任務隊列functors,然后對pendingFunctors_進行加鎖僅僅只是為了將其隊列中的回調函數換出到棧空間任務隊列functors當中。換出完畢后遍歷棧空間任務隊列,并在棧空間任務隊列中執行相應的回調任務,而pendingFunctors_則很快的解除互斥鎖的持有,能夠更加迅速的響應外部,讓回調函數盡快加入到該任務隊列中。
當然還有一種情況我還沒有說到,那就是如果我們沒有按照陳碩的做法去做,而是回到最開始所想的,先加鎖,在持有鎖的過程中遍歷任務隊列挨個兒執行注冊的回調函數,此時如果某個回調函數需要持有鎖,那豈不是當下就會發生死鎖了?這個地方需要仔細想想,因為你既要在遍歷任務隊列前持有鎖,又要在執行任務的時候持有鎖,顯然前者已經搶占到了該鎖,那回調函數就會一直阻塞,既然如此,任務隊列的遍歷執行操作也會一直阻塞下去。所以陳碩的解決方案不僅僅提高了鎖的效率,也避免了死鎖的發生,這個地方還是非常值得學習的!
總結
那么以上就是我在學習陳碩muduo網絡庫的時候,覺得非常精髓的一些知識點,想著就對其進行下匯總和梳理,避免遺忘的同時,也能拿出來經常復習品味,好的代碼值得反復去學習。那么上述總結和梳理中如果哪里有問題,也歡迎大家評論區指正,我會虛心學習和交流。

浙公網安備 33010602011771號