<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      muduo的事件處理(Reactor模型關鍵結構)

      muduo的Reactor模式主要有3個類實現-Channel、Poller、EventLoop、定時器

      1. 事件分發類 Channel (最終干活的)

      Channel是selectable IO channel,自始至終只負責一個 fd 的(注冊與響應) IO 事件,但是不擁有該 fd ,所以也就在析構的時候不關閉它.
      Channel自始至終都屬于一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),自始至終只負責一個文件描述符的IO事件

        EventLoop* loop_;
        const int fd_;
        int events_;
        int revents_;
        int index_;
      
        ReadEventCallback readCallback_;
        EventCallback writeCallback_;
        EventCallback errorCallback_;
        EventCallback closeCallback_;
      
      

      幾個callback函數都是c++新標準里面的function對象(muduo里面是Boost::function),它們會在handleEvent這個成員函數中根據不同的事件被調用。index_是poller類中pollfds_數組的下標。events_和revents_明顯對應了struct pollfd結構中的成員。需要注意的是,Channel并不擁有該fd,它不會在析構函數中去關閉這個fd(fd是由Socket類的析構函數中關閉,即RAII的方法),Channel的生命周期由其owner負責。

      如何工作?
      在Channel類中保存這IO事件的類型以及對應的回調函數,當IO事件發生時,最終會調用到Channel類中的回調函數

      具體流程如下:
      首先給定Channel所屬的 loop,及其要處理的 fd;接著注冊 fd 上需要監聽的事件,如果是常用的讀寫事件的話,可以直接調用接口函數enableReading或enableWriting來注冊對應fd上的事件,disable* 是銷毀指定的事件;然后通過 setCallback 來設置事件發生時的回調即可
      注冊事件時函數調用關系,如下:Channel::update()->EventLoop::updateChannel(Channel)->Poller::updateChannel(Channel),最終向 poll 系統調用的監聽事件表注冊或修改事件。

      2. IO multiplexing 類 Poller

      Poller是個基類,具體可以是EPollPoller(默認) 或者PollPoller,對應 epoll 和 poll.需要去實現(唯一使用面向對象的一個類)

        typedef std::vector<struct pollfd> PollFdList;
        typedef std::map<int, Channel*> ChannelMap;  // fd to Channel
        PollFdList pollfds_;
        ChannelMap channels_;
      
      

      ChannelMap是fd到Channel類的映射,PollFdList保存了每一個fd所關心的事件,用作參數傳遞到poll函數中,Channel類里面的index_即是這里的下標。Poller類有下面四個函數

      Timestamp poll(int timeoutMs, ChannelList* activeChannels);
      void updateChannel(Channel* channel);
      void removeChannel(Channel* channel);
      private:
      void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
      
      

      updateChannel和removeChannel都是對上面兩個數據結構的操作,poll函數是對::poll的封裝。私有的fillActiveChannels函數負責把返回的活動時間添加到activeChannels(vector<Channel*>)這個結構中,返回給用戶。Poller的職責也很簡單,負責IO multiplexing,一個EventLoop有一個Poller,Poller的生命周期和EventLoop一樣長。

      是eventloop的成員,它的職責僅僅是IO復用,事件分發交給 Channel 完成,生命期和 EventLoop 一樣長。
      poll函數調用 epoll_wait/poll 來監聽注冊了的文件描述符,然后通過fillActiveChannels函數將返回的就緒事件裝入 activeChannels 數組

      3. EventLoop 類

      EventLoop類是核心,大多數類都會包含一個EventLoop*的成員,因為所有的事件都會在EventLoop::loop()中通過Channel分發。先來看一下這個loop循環:

      while (!quit_)
      {
          activeChannels_.clear();
          pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
          for (ChannelList::iterator it = activeChannels_.begin();
              it != activeChannels_.end(); ++it)
          {
            (*it)->handleEvent(pollReturnTime_);
          }
          doPendingFunctors();
      }
      
      

      handleEvent是Channel類的成員函數,它會根據事件的類型去調用不同的Callback。循環末尾還有一個doPendingFunctors(),這個函數的作用在后面多線程的部分去說明。
      EventLoop類是Reactor模式的核心,一個線程一個事件循環,即one loop per thread,EventLoop 對象的生命周期通常與其所屬的線程一樣長。EventLoop對象構造的時候,會檢查當前線程是否已經創建了其他EventLoop對象,如果已創建,終止程序(LOG_FATAL),EventLoop類的構造函數會記錄本對象所屬線程(threadld_),創建了EventLoop對象的線程稱為IO線程.其主要功能是運行事件循環,等待事件發生,然后調用回調處理發生的事件。EventLoop::loop() -> Poller::poll()填充就緒事件集合 activeChannels,然后遍歷該容器,執行每個 channel的 Channel::handleEvent() 完成對應就緒事件回調。

      由上面三個類已經可以構成Reactor的核心,整個流程如下:
      用戶通過Channel向Poller類注冊fd和關心的事件
      EventLoop從poll中返回活躍的fd和對應的Channel
      通過Channel去回調相應的時間。

      4. TimerQueue

      typedef std::shared_ptr<Timer> TimerPtr;
      typedef std::pair<Timestamp, TimerPtr> Entry;
      typedef std::set<Entry> TimerList;
      Channel timerfdChannel_;
      const int timerfd_;
      TimerList timers_;
      

      采用std::pair<Timestamp, TimerPtr> 加上set的 的形式是為了處理兩個Timer同事到期的情況,即使到期時間相同,它們的地址也不同。timerfdChannel_是用來管理timerfd_create函數創建的fd。Timer類里面包含了一個回調函數和一個到期時間。expiration_就是上面Entry中的Timestamp。

      const TimerCallback callback_;
      Timestamp expiration_;
      

      用一個set來保存所有的事件和時間
      根據set集合里面最早的時間來更新timerfd_的到期時間(用timerfd_settime函數)
      時間到期后,EventLoop的poll函數會返回,并調用timerfdChannel_里面的handleEvent回調函數。
      通過handleEvent這個回調函數,再去處理到期的所有事件。

      timerfdChannel_.setReadCallback(
          std::bind(&TimerQueue::handleRead,this));
      timerfdChannel_.enableReading();
      

      timerfdChannel_的callback函數注冊了TimerQueue的handleRead函數。在handleRead中應該干什么就很明顯了,自然是撈出所有到期的timer,一次去執行對應的事件

      void TimerQueue::handleRead()
      {
        loop_->assertInLoopThread();
        Timestamp now(Timestamp::now());
        readTimerfd(timerfd_, now);
        std::vector<Entry> expired = getExpired(now);
        // safe to callback outside critical section
        for (std::vector<Entry>::iterator it = expired.begin();
            it != expired.end(); ++it)
        {
          it->second->run();
        }
        reset(expired, now);
      }
      
      

      多線程的技巧

      一個線程一個EventLoop,每個線程都有自己管理的各種ChannelList和TimerQueue。有時候,我們總有一些需求,要在各個線程之間調配任務。比如添加一個定時時間到IO線程中,這樣TimerQueue就有兩個線程同時訪問。

      1. runInLoop()和queueInLoop()

      先來看幾個EventLoop里面重要的函數和成員:

      std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
      void EventLoop::runInLoop(Functor&& cb) {
        if (isInLoopThread()) {
          cb();
        } else {
          queueInLoop(std::move(cb));
        }
      }
      void EventLoop::queueInLoop(Functor&& cb) {
        {
          MutexLockGuard lock(mutex_);
          pendingFunctors_.push_back(cb);
        }
        if (!isInLoopThread() || callingPendingFunctors_) {
          wakeup();
        }
      }
      
      

      注意這里的函數參數,我用到了C++11的右值引用。
      在前面的EventLoop::loop里面,我們已經看到了doPendingFunctors()這個函數,EventLoop還有一個重要的成員pendingFunctors_,該成員是暴露給其他線程的。這樣,其他線程向IO線程添加定時時間的流程就是:

      其他線程調用runInLoop(),
      如果不是當前IO線程,再調用queueInLoop()
      在queueLoop中,將時間push到pendingFunctors_中,并喚醒當前IO線程
      注意這里的喚醒條件:不是當前IO線程肯定要喚醒;此外,如果正在調用Pending functor,也要喚醒;(為什么?,因為如果正在執行PendingFunctor里面,如果也執行了queueLoop,如果不喚醒的話,新加的cb就不會立即執行了。)

      2.doPendingFunctors()

      void EventLoop::doPendingFunctors() {
        std::vector<Functor> functors;
        callingPendingFunctors_ = true;
        {
          // reduce the lenth of the critical section
          // avoid the dead lock cause the functor can call queueInloop(;)
          MutexLockGuard lock(mutex_);
          functors.swap(pendingFunctors_);
        }
        for (size_t i = 0; i < functors.size(); ++i) {
          functors[i]();
        }
        callingPendingFunctors_ = false;
      }
      

      doPendingFunctors并沒有直接在臨界區去執行functors,而是利用了一個棧對象,把事件swap到棧對象中,再去執行。這樣做有兩個好處:
      減少了臨界區的長度,其它線程調用queueInLoop對pendingFunctors加鎖時,就不會被阻塞
      避免了死鎖,可能在functors里面也會調用queueInLoop(),從而造成死鎖。
      回過頭來看,muduo在處理多線程加鎖訪問共享數據的策略上,有一個很重要的原則:拼命減少臨界區的長度
      試想一下,如果沒有pendingFunctors_這個數據成員,我們要想往TimerQueue中添加timer,肯定要對TimerQueue里面的insert函數加鎖,造成鎖的爭用,而pendingFunctors_這個成員將鎖的范圍減少到了一個vector的push_back操作上。此外,在doPendingFunctors中,利用一個棧對象減少臨界區,也是很巧妙的一個重要技巧。

      3.wake()

      前面說到喚醒IO線程,EventLoop阻塞在poll函數上,怎么去喚醒它?以前的做法是利用pipe,向pipe中寫一個字節,監視在這個pipe的讀事件的poll函數就會立刻返回。在muduo中,采用了linux中eventfd調用

      static int createEventfd() {
        int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
        if (evtfd < 0) {
          LOG_SYSERR << "Failed in eventfd";
          abort();
        }
        return evtfd;
      }
      void EventLoop::wakeup() {
        uint64_t one = 1;
        ssize_t n = ::write(wakeupFd_, &one, sizeof one);
        if (n != sizeof one) {
          LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
        }
      }
      
      

      把eventfd得到的fd和前面一樣,通過Channel注冊到poll里面,喚醒的時候,只需要向wakeupFd中寫入一個字節,就能達到喚醒的目的。eventfd、timerfd都體現了linux的設計哲學,Everyting is a fd。

      參考
      [2]基本網絡編程范式 https://segmentfault.com/a/1190000005910673

      posted @ 2021-04-01 23:49  demianzhang  閱讀(473)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国内精品久久久久影院薰衣草| 国产高清自产拍av在线| 久久亚洲精品天天综合网| 亚洲蜜臀av乱码久久| 久久久综合九色合综| 精品无码久久久久国产动漫3d| 国产精品免费看久久久| 狠狠色噜噜狠狠狠狠色综合久av| 国产一区二区三区亚洲精品| 亚洲 一区二区 在线| 99精品热在线在线观看视| 麻豆蜜桃av蜜臀av色欲av| 少妇高潮喷水惨叫久久久久电影 | 日韩乱码人妻无码中文字幕| 无码av岛国片在线播放| 亚洲精品av一二三区无码| 色综合久久综合香蕉色老大| 国产成人av电影在线观看第一页| 福利视频一区二区在线| 思思99热精品在线| 秋霞人妻无码中文字幕| 成人精品视频一区二区三区| 国产精品中文第一字幕| 久久涩综合一区二区三区| 日本va欧美va精品发布| 激情综合网激情综合网五月| 国产目拍亚洲精品二区| 又爽又黄又无遮掩的免费视频| 国产精品无遮挡又爽又黄| 成全影视大全在线观看| 国产欧美日韩精品第二区| 末发育娇小性色xxxxx视频| 99久久无色码中文字幕| 肉大捧一进一出免费视频| 欧美成人午夜在线观看视频| 免费极品av一视觉盛宴| 欧洲无码一区二区三区在线观看 | 亚洲高潮喷水无码AV电影| 日韩精品一区二区三区日韩| 麻花传媒在线观看免费| 亚洲色成人网站www永久下载|