<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      ChannelPipeline----貫穿io事件處理的大動脈

      ChannelPipeline貫穿io事件處理的大動脈

      上一篇,我們分析了NioEventLoop及其相關類的主干邏輯代碼,我們知道netty采用線程封閉的方式來避免多線程之間的資源競爭,最大限度地減少并發問題,減少鎖的使用,因而能夠有效減低線程切換的開銷,減少cpu的使用時間。此外,我們還簡單分析了netty對于線程組的封裝EventLoopGroup,目前一般采用roundRobin的方式在多個線程上均勻地分配channel。通過前面幾篇文章的分析,我們已經對channel的初始化,注冊到EventLoop上,SingleThreadEventLoop的線程啟動過程以及線程中運行的代碼邏輯有了一些了解,此外我們也分析了用于處理基于TCP協議的io事件的NioEventLoop類的具體的循環邏輯,通過對代碼的詳細分析,我們了解了對于connect,write,read,accept事件的不同處理邏輯,但是對于write和read事件的處理邏輯我們并沒有分析的很詳細,因為這些事件的處理涉及到netty中另一個很重要的模塊,ChannelPipeline以及一系列相關的類如Channel, ChannelHandler, ChannelhandlerContext等的理解,netty中的事件處理采用了經典的責任鏈(responsbility chain)的的設計模式,這種設計模式使得netty的io事件處理框架易于擴展,并且為業務邏輯提供了一個很好的抽象模型,大大降低了netty的使用難度,使得io事件的處理變得更符合思維習慣。
      好了,廢話了那么多,其實主要是想把前面分析的幾篇的文章做一個小結和回顧,然后引出本篇的主題--netty的io事件處理鏈模式。
      因為netty的代碼結構相對來說還是很規整,它的模塊之間的邊界劃分比較明確,EventLoop作為io事件的“發源地”,與其交互的對象是Channel類,而ChannelPipeline,ChannelhandlerContext, ChannelHandler等幾個類則是與Channel交互,他們并不直接與EventLoop交互。

      ChannelPipeline的結構圖

      首先每一個Channel在初始化的時候就會創建一個ChannelPipeline,這點我們在前面分析NioSocketChannel的初始化時也分析到了。目前ChannelPipeline的實現只有DefaultChannelPipeline一種,所以我們也以DefaultChannelPipeline來分析。DefaultChannelPipeline內部有一個雙向鏈表結構,這個鏈表的每個節點都是一個AbstractChannelHandlerContext類型的節點,DefaultChannelPipeline剛初始化時就會創建兩個初始節點,分別是HeadContext和TailContext,這兩個節點也并不完全是標記節點,他們都有各自實際的作用,

      • HeadContext,實現了bind,connect,disconnect,close,write,flush等等幾個方法,基本都是通過直接調用unsafe的相關方法實現的。而對于其他的方法基本都是通過調用AbstractChannelHandlerContext的fire方法將事件傳給下一個節點。
      • TailContext, 主要用于處理寫數據幾乎沒有實現任何邏輯,它的功能幾乎全部繼承自AbstractChannelHandlerContext,而AbstractChannelHandlerContext對于大部分事件處理的實現都是簡單地將事件向下一個節點傳遞。注意,這里下一個節點不一定是前一個還是后一個,要根據具體事件類型或者具體的操作而定,對于ChannelOutboundInvoker接口中的方法都是從尾節點向首節點傳遞事件,而對于ChannelInboundInvoker接口中的方法都是從首節點往尾節點傳遞。我們可以形象地理解為,首節點是最靠近socket的,而尾節點是最原理socket的,所以有數據進來時,產生的讀事件最先從首節點開始向后傳遞,當有寫數據的動作時,則會從尾節點向頭結點傳遞。

      下面,我們以兩個最重要的事件讀事件和寫事件,來分析netty的這種鏈式處理結構到底是怎么運轉的。

      讀事件

      首先,我們需要找到一個產生讀事件并調用相關方法使得讀事件開始傳遞的例子,很自然我們應該想到在EventLoop中會產生讀事件。
      如下,就是NioEventLoop中對于讀事件的處理,通過調用NioUnsafe.read方法

             // 處理read和accept事件
              if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                  unsafe.read();
              }
      

      我們繼續看NioByteUnsafe.read方法,這個方法我們之前在分析NioEventLoop事件處理邏輯時提到過,這個方法首先會通過緩沖分配器分配一個緩沖,然后從channel(也就是socket)中將數據讀到緩沖中,每讀一個緩沖,就會觸發一個讀事件,我們看具體的觸發讀事件的調用:

                  do {
                      // 分配一個緩沖
                      byteBuf = allocHandle.allocate(allocator);
                      // 將通道的數據讀取到緩沖中
                      allocHandle.lastBytesRead(doReadBytes(byteBuf));
                      // 如果沒有讀取到數據,說明通道中沒有待讀取的數據了,
                      if (allocHandle.lastBytesRead() <= 0) {
                          // nothing was read. release the buffer.
                          // 因為沒讀取到數據,所以應該釋放緩沖
                          byteBuf.release();
                          byteBuf = null;
                          // 如果讀取到的數據量是負數,說明通道已經關閉了
                          close = allocHandle.lastBytesRead() < 0;
                          if (close) {
                              // There is nothing left to read as we received an EOF.
                              readPending = false;
                          }
                          break;
                      }
      
                      // 更新Handle內部的簿記量
                      allocHandle.incMessagesRead(1);
                      readPending = false;
                      // 向channel的處理器流水線中觸發一個事件,
                      // 讓取到的數據能夠被流水線上的各個ChannelHandler處理
                      pipeline.fireChannelRead(byteBuf);
                      byteBuf = null;
                      // 這里根據如下條件判斷是否繼續讀:
                      // 上一次讀取到的數據量大于0,并且讀取到的數據量等于分配的緩沖的最大容量,
                      // 此時說明通道中還有待讀取的數據
                  } while (allocHandle.continueReading());
      

      為了代碼邏輯的完整性,我這里把整個循環的代碼都貼上來,其實我們要關注的僅僅是pipeline.fireChannelRead(byteBuf)這一句,好了,現在我們找到ChannelPipeline觸發讀事件的入口方法,我們順著這個方法,順藤摸瓜就能一步步理清事件的傳遞過程了。

      DefaultChannelPipeline.fireChannelRead

      如果我們看一下ChannelPipeline接口,這里面的方法名都是以fire開頭的,實際就是想表達這些方法都是觸發了一個事件,然后這個事件就會在內部的處理器鏈表中傳遞。
      我們看到這里調用了一個靜態方法,并且以頭結點為參數,也就是說事件傳遞是從頭結點開始的。

      public final ChannelPipeline fireChannelRead(Object msg) {
          AbstractChannelHandlerContext.invokeChannelRead(head, msg);
          return this;
      }
      

      AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

      可以看到,這個方法中通過調用invokeChannelRead執行處理邏輯

      static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
          // 維護引用計數,主要是為了偵測資源泄漏問題
          final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
          EventExecutor executor = next.executor();
          if (executor.inEventLoop()) {
              // 調用invokeChannelRead執行處理邏輯
              next.invokeChannelRead(m);
          } else {
              executor.execute(new Runnable() {
                  @Override
                  public void run() {
                      next.invokeChannelRead(m);
                  }
              });
          }
      }
      

      AbstractChannelHandlerContext.invokeChannelRead(Object msg)

      這里可以看到,AbstractChannelHandlerContext通過自己內部的handler對象來實現讀數據的邏輯。這也體現了ChannelHandlerContext在整個結構中的作用,其實它是起到了在ChannelPipeline和handler之間的一個中間人的角色,那我們要問:既然ChannelHandlerContext不起什么實質性的作用,那為什么要多這一個中間層呢,這樣設計的好處是什么?我認為這樣設計其實是為了盡最大可能對使用者屏蔽netty框架的細節,試想如果沒有這個context的中間角色,使用者必然要詳細地了解ChannelPipeline,并且還要考慮事件傳遞是找下一個節點,還要考慮下一個節點應該沿著鏈表的正序找還是沿著鏈表 倒敘找,所以這里ChannelHandlerContext的角色我認為最大的作用就是封裝了鏈表的邏輯,并且封裝了不同類型操作的傳播方式。當然也起到了一些引用傳遞的作用,如channel引用可以簡介地傳遞給用戶。
      好了,回到正題,從前面的方法中我們知道讀事件最先是從HeadContext節點開始的,所以我們看一下HeadContext的channelRead方法(因為HeadContext也實現了handler方法,并且返回的就是自身)

      private void invokeChannelRead(Object msg) {
          // 如果這個handler已經準備就緒,那么就執行處理邏輯
          // 否則將事件傳遞給下一個處理器節點
          if (invokeHandler()) {
              try {
                  // 調用內部的handler的channelRead方法
                  ((ChannelInboundHandler) handler()).channelRead(this, msg);
              } catch (Throwable t) {
                  notifyHandlerException(t);
              }
          } else {
              fireChannelRead(msg);
          }
      }
      

      HeadContext.channelRead

      這里的調用也是一個重要的注意點,這里調用了ChannelHandlerContext.fireChannelRead方法,這正是事件傳播的方法,fire開頭的方法的作用就是將當前的操作(或者叫事件)從當前處理節點傳遞給下一個處理節點。這樣就實現了事件在鏈表中的傳播。

         public void channelRead(ChannelHandlerContext ctx, Object msg) {
              ctx.fireChannelRead(msg);
          }
      

      小結

      到這里我們先暫停一下,總結一下讀事件(或者是讀操作)在ChannelPipeline內部的傳播機制,其實很簡單,

      • 首先外部調用者會通過unsafe最終調用ChannelPipeline.fireChannelRead方法,并將從channel中讀取到的數據作為參數傳進來
      • 以頭結點作為參數調用靜態方法AbstractChannelHandlerContext.fireChannelRead
      • 然后頭結點HeadContext開始調用節點的invokeChannelRead方法(即ChannelHandlerContext的invokeChannelRead方法),
      • invokeChannelRead方法會調用當前節點的handler對象的channelRead方法執行處理邏輯
      • handler對象的channelRead方法中可以調用AbstractChannelHandlerContext.fireChannelRead將這個事件傳遞到下一個節點
      • 這樣事件就能夠沿著鏈條不斷傳遞下去,當然如果業務處理需要,完全可以在某個節點將事件的傳遞終止,也就是在這個節點不調用ChannelHandlerContext.fireChannelRead

      寫事件

      此外,我們分析一下寫數據的操作是怎么傳播的。分析寫數據操作的入口并不想讀事件那么好找,在netty中用戶的代碼中寫數據最終都是被放到內部的緩沖中,當NioEventLoop中監聽到底層的socket可以寫數據的事件時,實際上是吧當前緩沖中的數據發送到socket中,而對于用戶來講,是接觸不到socketChannel這一層的。
      根據前面的分析,我們知道,用戶一般都會與Channel,ChannelHandler, ChannelhandlerContext這幾種類打交道,寫數據的操作也是通過Channel的write和writeAndFlush觸發的,這兩個方法區別在于writeAndFlush在寫完數據后還會觸發一次刷寫操作,將緩沖中的數據實際寫入到socket中。

      AbstractChannel.write

      仍然是將操作交給內部的ChannelPipeline,觸發流水線操作

      public ChannelFuture write(Object msg, ChannelPromise promise) {
          return pipeline.write(msg, promise);
      }
      

      DefaultChannelPipeline.write

      這里可以很清楚地看出來,寫數據的操作從為節點開始,但是TailContext并未重寫write方法,所以最終調用的還是AbstractChannelHandlerContext中的相應方法。
      我們沿著調用鏈往下走,發現write系列的方法其實是將寫操作傳遞給了下一個ChannelOutboundHandler類型的處理節點,注意這里是從尾節點向前找,遍歷鏈表的順序和讀數據正好相反。
      真正調用

      public final ChannelFuture write(Object msg, ChannelPromise promise) {
          return tail.write(msg, promise);
      }
      

      AbstractChannelHandlerContext.write

      從這個方法可以明顯地看出來,write方法將寫操作交給了下一個ChannelOutboundHandler類型的處理器節點。

      private void write(Object msg, boolean flush, ChannelPromise promise) {
          ObjectUtil.checkNotNull(msg, "msg");
          try {
              if (isNotValidPromise(promise, true)) {
                  ReferenceCountUtil.release(msg);
                  // cancelled
                  return;
              }
          } catch (RuntimeException e) {
              ReferenceCountUtil.release(msg);
              throw e;
          }
      
          // 沿著鏈表向前遍歷,找到下一個ChannelOutboundHandler類型的處理器節點
          final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                  (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
          final Object m = pipeline.touch(msg, next);
          EventExecutor executor = next.executor();
          if (executor.inEventLoop()) {
              if (flush) {
                  // 調用AbstractChannelHandlerContext.invokeWriteAndFlush方法執行真正的寫入邏輯
                  next.invokeWriteAndFlush(m, promise);
              } else {
                  next.invokeWrite(m, promise);
              }
          } else {
              // 如果當前是異步地寫入數據,那么需要將寫入的邏輯封裝成一個任務添加到EventLoop的任務對隊列中
              final AbstractWriteTask task;
              if (flush) {
                  task = WriteAndFlushTask.newInstance(next, m, promise);
              }  else {
                  task = WriteTask.newInstance(next, m, promise);
              }
              if (!safeExecute(executor, task, promise, m)) {
                  // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                  // and put it back in the Recycler for re-use later.
                  //
                  // See https://github.com/netty/netty/issues/8343.
                  task.cancel();
              }
          }
      }
      

      AbstractChannelHandlerContext.invokeWrite

      我們接著看invokeWrite0方法

      private void invokeWrite(Object msg, ChannelPromise promise) {
          if (invokeHandler()) {
              invokeWrite0(msg, promise);
          } else {
              write(msg, promise);
          }
      }
      

      AbstractChannelHandlerContext.invokeWrite0

      這里可以清楚地看到,最終是調用了handler的write方法執行真正的寫入邏輯,這個邏輯實際上就是有用戶自己實現的。

      private void invokeWrite0(Object msg, ChannelPromise promise) {
          try {
              // 調用當前節點的handler的write方法執行真正的寫入邏輯
              ((ChannelOutboundHandler) handler()).write(this, msg, promise);
          } catch (Throwable t) {
              notifyOutboundHandlerException(t, promise);
          }
      }
      

      到這里,我們已經知道寫入的操作是怎么從尾節點開始,也知道了通過調用當前處理節點的AbstractChannelHandlerContext.write方法可以將寫入操作傳遞給下一個節點,那么數據經過層層傳遞后,最終是怎么寫到socket中的呢?回答這個問題,我們需要看一下HeadContext的代碼!我們知道寫入的操作是從尾節點向前傳遞的,那么頭節點HeadContext就是傳遞的最后一個節點。

      HeadContext.write

      最終調用了unsafe.write方法。
      在AbstractChannel.AbstractUnsafe的實現中,write方法將經過前面一系列處理器處理過的數據存放到內部的緩沖中。

          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
              unsafe.write(msg, promise);
          }
      

      刷寫操作的傳遞

      前面我們提到,寫數據的操作除了write還有writeAndFlush,這個操作除了寫數據,還會緊接著執行一次刷寫操作。刷寫操作也會從尾節點向前傳遞,最終傳遞到頭結點HeadContext,其中的flush方法如下:

          public void flush(ChannelHandlerContext ctx) {
              unsafe.flush();
          }
      

      在AbstractChannel.AbstractUnsafe的實現中,flush操作會將前面存儲在內部緩沖區中的數據吸入到socket中,從而完成刷寫。

      總結

      本節,我們主要通過io事件處理中最重要的兩種事件,即讀事件和寫事件為切入點 詳細分析了netty中對于這兩種事件的處理方法。其中寫數據的事件與我們之前在jdk nio中建立起的印象差別還是不大的,都是對從socket中讀取的數據進行處理,但是寫事件跟jdk nio中的概念就有較大差別了,因為netty對數據的寫入做了很大的改變和優化,用戶代碼中通過channel調用相關的寫數據的方法,這個方法會觸發處理器鏈條上的所有相關的處理器對待寫入的數據進行加工,最后在頭結點HeadCOntext中被寫入channel內部的緩沖區,通過flush操作將緩沖的數據寫入socket中。
      這里面最重要的也是最值得我們學習的一點就是責任鏈模式,顯然,這又是一次對責任鏈模式的成功運用,是的框架的擴展性大大增強,而且面向用戶的接口更加容易理解,簡單易用,向用戶屏蔽了大部分框架實現細節。

      posted on 2019-06-29 04:29  _朱葛  閱讀(922)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 深夜福利资源在线观看| 蜜臀精品一区二区三区四区| 亚洲欧美自偷自拍视频图片| 天天躁日日摸久久久精品| 亚洲国产午夜精品福利| 体态丰腴的微胖熟女的特征| 国产成人欧美一区二区三区在线| 国产精成人品日日拍夜夜| 久久亚洲欧美日本精品| 怡红院一区二区三区在线| 免费又黄又爽1000禁片| 亚洲精品国产精品不乱码| 亚洲男人精品青春的天堂| 无码中文字幕热热久久| 九九热爱视频精品视频| 我国产码在线观看av哈哈哈网站| 18禁黄无遮挡网站免费| 国产乱子影视频上线免费观看| 精品国产亚洲午夜精品av| 风韵丰满妇啪啪区老老熟女杏吧| 国产91小视频在线观看| 99国产精品一区二区蜜臀| 国产成人高清亚洲综合| AV教师一区高清| 蜜臀av日韩精品一区二区| 人人妻人人澡人人爽| 色综合AV综合无码综合网站| 久青草国产综合视频在线| 国产精一品亚洲二区在线播放| 国产精品中文字幕观看| 无码中文字幕人妻在线一区| 成人网站免费观看永久视频下载| 布尔津县| 青春草在线视频观看| 加勒比无码人妻东京热| 三穗县| 深夜福利成人免费在线观看| 99riav国产精品视频| 白丝乳交内射一二三区| 日本熟妇人妻一区二区三区| 国内精品无码一区二区三区|