<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開始實現(xiàn)簡易版Netty(二) MyNetty pipeline流水線

      從零開始實現(xiàn)簡易版Netty(二) MyNetty pipeline流水線

      1. Netty pipeline流水線介紹

      在上一篇博客中l(wèi)ab1版本的MyNetty參考netty實現(xiàn)了一個極其精簡的reactor模型。按照計劃,lab2版本的MyNetty需要實現(xiàn)pipeline流水線,以支持不同的邏輯處理模塊的解耦。

      由于本文屬于系列博客,讀者需要對之前的博客內(nèi)容有所了解才能更好地理解本文內(nèi)容。

      在lab1版本中,MyNetty的EventLoop處理邏輯中,允許使用者配置一個EventHandler,并在處理read事件時調(diào)用其實現(xiàn)的自定義fireChannelRead方法。
      這一機制在實現(xiàn)demo中的簡易echo服務(wù)器時是夠用的,但在實際的場景中,一個完備的網(wǎng)絡(luò)程序,業(yè)務(wù)想要處理的IO事件有很多類型,并且不希望在一個大而全的臃腫的處理器中處理所有的IO事件,而是能夠模塊化的拆分不同的處理邏輯,實現(xiàn)架構(gòu)上的靈活解耦。
      因此netty提供了pipeline流水線機制,允許用戶在使用netty時能按照自己的需求,按順序組裝自己的處理器鏈條。

      1.1 netty的IO事件

      在實際的網(wǎng)絡(luò)環(huán)境中,有非常多不同類型的IO事件,最典型的就是讀取來自遠端的數(shù)據(jù)(read)以及向遠端寫出發(fā)送數(shù)據(jù)(write)。
      netty對這些IO事件進行了抽象,并允許用戶編寫自定義的處理器監(jiān)聽或是主動觸發(fā)這些事件。
      netty按照事件數(shù)據(jù)流的傳播方向?qū)O事件分成了入站(InBound)與出站(OutBound)兩大類,由遠端輸入傳播到本地應(yīng)用程序的事件被叫做入站事件,從本地應(yīng)用程序觸發(fā)向遠端傳播的事件叫出站事件。
      主要的入站事件有channelRead、channelActive等,主要的出站事件有write、connect、bind等。

      1.2 netty的IO事件處理器與pipeline流水線

      針對InBound入站IO事件,netty抽象出了ChannelInboundHandler接口;針對OutBound出站IO事件,netty抽象出了ChannelOutboundHandler接口。
      用戶可以編寫一系列繼承自對應(yīng)ChannelHandler接口的自定義處理器,將其綁定到ChannelPipeline中。每一個Channel都對應(yīng)一個ChannelPipeline,ChannelPipeline實例是獨屬于某個特定channel連接的。
      netty_pipeline

      2. MyNetty實現(xiàn)pipeline流水線

      經(jīng)過上述對于netty的IO事件與pipeline流水線簡要介紹后,讀者對netty的流水線雖然有了一定的概念,但對具體的細節(jié)還是知之甚少。下面我們結(jié)合MyNetty的源碼,展開介紹netty的流水線機制實現(xiàn)。

      2.1 MyNetty的事件處理器

      /**
       * 事件處理器(相當(dāng)于netty中ChannelInboundHandler和ChannelOutboundHandler合在一起)
       * */
      public interface MyChannelEventHandler {
      
          // ========================= inbound入站事件 ==============================
          void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception;
      
          void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) throws Exception;
      
          // ========================= outbound出站事件 ==============================
          void close(MyChannelHandlerContext ctx) throws Exception;
      
          void write(MyChannelHandlerContext ctx, Object msg) throws Exception;
      }
      
      • 前面說到,netty將入站與出站事件用兩個不同的ChannelEventHandler接口進行了抽象,而在MyNetty中因為最終要支持的IO事件沒有netty那么多,所以出站、入站的處理接口進行了合并。
        這樣做雖然在架構(gòu)上不如netty那樣拆分開來的設(shè)計優(yōu)雅,但相對來說理解起來會更加簡單。
      • 未來MyChannelEventHandler還會隨著迭代支持更多的IO事件,但這是個漸進的過程,目前l(fā)ab2中只需要支持少數(shù)幾個IO事件便能滿足需求。

      2.2 MyNetty的pipeline流水線與ChannelHandler上下文

      MyNetty pipeline流水線實現(xiàn)
      public interface MyChannelEventInvoker {
      
          // ========================= inbound入站事件 ==============================
          void fireChannelRead(Object msg);
      
          void fireExceptionCaught(Throwable cause);
      
          // ========================= outbound出站事件 ==============================
          void close();
      
          void write(Object msg);
      }
      
      /**
       * pipeline首先自己也是一個Invoker
       *
       * 包括head和tail兩個哨兵節(jié)點
       * */
      public class MyChannelPipeline implements MyChannelEventInvoker {
      
          private static final Logger logger = LoggerFactory.getLogger(MyChannelPipeline.class);
      
          private final MyNioChannel channel;
      
          /**
           * 整條pipeline上的,head和tail兩個哨兵節(jié)點
           *
           * inbound入站事件默認都從head節(jié)點開始向tail傳播
           * outbound出站事件默認都從tail節(jié)點開始向head傳播
           * */
          private final MyAbstractChannelHandlerContext head;
          private final MyAbstractChannelHandlerContext tail;
      
          public MyChannelPipeline(MyNioChannel channel) {
              this.channel = channel;
      
              head = new MyChannelPipelineHeadContext(this);
              tail = new MyChannelPipelineTailContext(this);
      
              head.setNext(tail);
              tail.setPrev(head);
          }
      
          @Override
          public void fireChannelRead(Object msg) {
              // 從head節(jié)點開始傳播讀事件(入站)
              MyChannelPipelineHeadContext.invokeChannelRead(head,msg);
          }
      
          @Override
          public void fireExceptionCaught(Throwable cause) {
              // 異常傳播到了pipeline的末尾,打印異常信息
              onUnhandledInboundException(cause);
          }
      
          @Override
          public void close() {
              // 出站事件,從尾節(jié)點向頭結(jié)點傳播
              tail.close();
          }
      
          @Override
          public void write(Object msg) {
              tail.write(msg);
          }
      
          public void addFirst(MyChannelEventHandler handler){
              // 非sharable的handler是否重復(fù)加入的校驗
              checkMultiplicity(handler);
      
              MyAbstractChannelHandlerContext newCtx = newContext(handler);
      
              MyAbstractChannelHandlerContext oldFirstCtx = head.getNext();
              newCtx.setPrev(head);
              newCtx.setNext(oldFirstCtx);
              head.setNext(newCtx);
              oldFirstCtx.setPrev(newCtx);
          }
      
          public void addLast(MyChannelEventHandler handler){
              // 非sharable的handler是否重復(fù)加入的校驗
              checkMultiplicity(handler);
      
              MyAbstractChannelHandlerContext newCtx = newContext(handler);
      
              // 加入鏈表尾部節(jié)點之前
              MyAbstractChannelHandlerContext oldLastCtx = tail.getPrev();
              newCtx.setPrev(oldLastCtx);
              newCtx.setNext(tail);
              oldLastCtx.setNext(newCtx);
              tail.setPrev(newCtx);
          }
      
          private static void checkMultiplicity(MyChannelEventHandler handler) {
              if (handler instanceof MyChannelEventHandlerAdapter) {
                  MyChannelEventHandlerAdapter h = (MyChannelEventHandlerAdapter) handler;
      
                  if (!h.isSharable() && h.added) {
                      // 一個handler實例不是sharable,但是被加入到了pipeline一次以上,有問題
                      throw new MyNettyException(
                              h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
                  }
      
                  // 第一次被引入,當(dāng)前handler實例標記為已加入
                  h.added = true;
              }
          }
      
          public MyNioChannel getChannel() {
              return channel;
          }
      
          private void onUnhandledInboundException(Throwable cause) {
              logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                      "It usually means the last handler in the pipeline did not handle the exception.", cause);
          }
      
          private MyAbstractChannelHandlerContext newContext(MyChannelEventHandler handler) {
              return new MyDefaultChannelHandlerContext(this,handler);
          }
      }
      
      • pipeline實現(xiàn)了ChannelEventInvoker接口,ChannelEventInvoker與ChannelEventHandler中對應(yīng)IO事件的方法是一一對應(yīng)的,唯一的區(qū)別在于其方法中缺失了(MyChannelHandlerContext ctx)參數(shù)。
        Invoker接口用于netty內(nèi)部觸發(fā)流水線的事件傳播,而Handler接口用于用戶自定義IO事件觸發(fā)時的事件處理器。
      • 同時,pipeline流水線中定義了兩個關(guān)鍵屬性,head和tail,其都是AbstractChannelHandlerContext類型的,其內(nèi)部工作原理我們在下一小節(jié)展開。
        pipeline提供了addFirst和addLast兩個方法(netty中提供了非常多功能類似的方法,MyNetty簡單起見只實現(xiàn)了最常用的兩個),允許將用戶自定義的ChannelHandler掛載在pipeline中,與head、tail組成一個雙向鏈表,而入站出站事件會按照雙向鏈表中節(jié)點的順序進行傳播。
      • 對于入站事件(比如fireChannelRead),事件從head節(jié)點開始,從前到后的在流水線的handler鏈表中傳播;而出站事件(比如write), 事件則從tail節(jié)點開始,從后往前的在流水線的handler鏈表中傳播。

      2.3 MyNetty ChannelHandlerContext上下文實現(xiàn)

      下面我們來深入講解ChannelHandlerContext上下文原理,看看一個具體的事件在pipeline的雙向鏈表中的傳播是如何實現(xiàn)的。

      MyChannelHandlerContext上下文接口定義
      public interface MyChannelHandlerContext extends MyChannelEventInvoker {
      
          MyNioChannel channel();
      
          MyChannelEventHandler handler();
      
          MyChannelPipeline getPipeline();
      
          MyNioEventLoop executor();
      }
      
      MyAbstractChannelHandlerContext上下文骨架類
      public abstract class MyAbstractChannelHandlerContext implements MyChannelHandlerContext{
      
          private static final Logger logger = LoggerFactory.getLogger(MyAbstractChannelHandlerContext.class);
      
          private final MyChannelPipeline pipeline;
      
          private final int executionMask;
      
          /**
           * 雙向鏈表前驅(qū)/后繼節(jié)點
           * */
          private MyAbstractChannelHandlerContext prev;
          private MyAbstractChannelHandlerContext next;
      
          public MyAbstractChannelHandlerContext(MyChannelPipeline pipeline, Class<? extends MyChannelEventHandler> handlerClass) {
              this.pipeline = pipeline;
      
              this.executionMask = MyChannelHandlerMaskManager.mask(handlerClass);
          }
      
          @Override
          public MyNioChannel channel() {
              return pipeline.getChannel();
          }
      
          public MyAbstractChannelHandlerContext getPrev() {
              return prev;
          }
      
          public void setPrev(MyAbstractChannelHandlerContext prev) {
              this.prev = prev;
          }
      
          public MyAbstractChannelHandlerContext getNext() {
              return next;
          }
      
          public void setNext(MyAbstractChannelHandlerContext next) {
              this.next = next;
          }
      
          @Override
          public MyNioEventLoop executor() {
              return this.pipeline.getChannel().getMyNioEventLoop();
          }
      
          @Override
          public void fireChannelRead(Object msg) {
              // 找到當(dāng)前鏈條下最近的一個支持channelRead方法的MyAbstractChannelHandlerContext(inbound事件,從前往后找)
              MyAbstractChannelHandlerContext nextHandlerContext = findContextInbound(MyChannelHandlerMaskManager.MASK_CHANNEL_READ);
      
              // 調(diào)用找到的那個ChannelHandlerContext其handler的channelRead方法
              MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
              if(myNioEventLoop.inEventLoop()){
                  invokeChannelRead(nextHandlerContext,msg);
              }else{
                  // 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
                  myNioEventLoop.execute(()->{
                      invokeChannelRead(nextHandlerContext,msg);
                  });
              }
          }
      
          @Override
          public void fireExceptionCaught(Throwable cause) {
              // 找到當(dāng)前鏈條下最近的一個支持exceptionCaught方法的MyAbstractChannelHandlerContext(inbound事件,從前往后找)
              MyAbstractChannelHandlerContext nextHandlerContext = findContextInbound(MyChannelHandlerMaskManager.MASK_EXCEPTION_CAUGHT);
      
              // 調(diào)用找到的那個ChannelHandlerContext其handler的exceptionCaught方法
      
              MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
              if(myNioEventLoop.inEventLoop()){
                  invokeExceptionCaught(nextHandlerContext,cause);
              }else{
                  // 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
                  myNioEventLoop.execute(()->{
                      invokeExceptionCaught(nextHandlerContext,cause);
                  });
              }
          }
      
          @Override
          public void close() {
              // 找到當(dāng)前鏈條下最近的一個支持close方法的MyAbstractChannelHandlerContext(outbound事件,從后往前找)
              MyAbstractChannelHandlerContext nextHandlerContext = findContextOutbound(MyChannelHandlerMaskManager.MASK_CLOSE);
      
              MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
              if(myNioEventLoop.inEventLoop()){
                  doClose(nextHandlerContext);
              }else{
                  // 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
                  myNioEventLoop.execute(()->{
                      doClose(nextHandlerContext);
                  });
              }
          }
      
          private void doClose(MyAbstractChannelHandlerContext nextHandlerContext){
              try {
                  nextHandlerContext.handler().close(nextHandlerContext);
              } catch (Throwable t) {
                  logger.error("{} do close error!",nextHandlerContext,t);
              }
          }
      
          @Override
          public void write(Object msg) {
              // 找到當(dāng)前鏈條下最近的一個支持write方法的MyAbstractChannelHandlerContext(outbound事件,從后往前找)
              MyAbstractChannelHandlerContext nextHandlerContext = findContextOutbound(MyChannelHandlerMaskManager.MASK_WRITE);
      
              MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
              if(myNioEventLoop.inEventLoop()){
                  doWrite(nextHandlerContext,msg);
              }else{
                  // 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
                  myNioEventLoop.execute(()->{
                      doWrite(nextHandlerContext,msg);
                  });
              }
          }
      
          private void doWrite(MyAbstractChannelHandlerContext nextHandlerContext, Object msg) {
              try {
                  nextHandlerContext.handler().write(nextHandlerContext,msg);
              } catch (Throwable t) {
                  logger.error("{} do write error!",nextHandlerContext,t);
              }
          }
      
          @Override
          public MyChannelPipeline getPipeline() {
              return pipeline;
          }
      
          public static void invokeChannelRead(MyAbstractChannelHandlerContext next, Object msg) {
              try {
                  next.handler().channelRead(next, msg);
              }catch (Throwable t){
                  // 處理拋出的異常
                  next.invokeExceptionCaught(t);
              }
          }
      
          public static void invokeExceptionCaught(MyAbstractChannelHandlerContext next, Throwable cause) {
              next.invokeExceptionCaught(cause);
          }
      
          private void invokeExceptionCaught(final Throwable cause) {
              try {
                  this.handler().exceptionCaught(this, cause);
              } catch (Throwable error) {
                  // 如果捕獲異常的handler依然拋出了異常,則打印debug日志
                  logger.error(
                      "An exception {}" +
                          "was thrown by a user handler's exceptionCaught() " +
                          "method while handling the following exception:",
                      ThrowableUtil.stackTraceToString(error), cause);
              }
          }
      
          private MyAbstractChannelHandlerContext findContextInbound(int mask) {
              MyAbstractChannelHandlerContext ctx = this;
              do {
                  // inbound事件,從前往后找
                  ctx = ctx.next;
              } while (needSkipContext(ctx, mask));
      
              return ctx;
          }
      
          private MyAbstractChannelHandlerContext findContextOutbound(int mask) {
              MyAbstractChannelHandlerContext ctx = this;
              do {
                  // outbound事件,從后往前找
                  ctx = ctx.prev;
              } while (needSkipContext(ctx, mask));
      
              return ctx;
          }
      
          private static boolean needSkipContext(MyAbstractChannelHandlerContext ctx, int mask) {
              // 如果與運算后為0,說明不支持對應(yīng)掩碼的操作,需要跳過
              return (ctx.executionMask & (mask)) == 0;
          }
      }
      
      MyChannelPipelineHeadContext pipeline哨兵頭結(jié)點
      /**
       * pipeline的head哨兵節(jié)點
       * */
      public class MyChannelPipelineHeadContext extends MyAbstractChannelHandlerContext implements MyChannelEventHandler {
      
          public MyChannelPipelineHeadContext(MyChannelPipeline pipeline) {
              super(pipeline,MyChannelPipelineHeadContext.class);
          }
      
          @Override
          public void channelRead(MyChannelHandlerContext ctx, Object msg) {
              ctx.fireChannelRead(msg);
          }
      
          @Override
          public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) {
              ctx.fireExceptionCaught(cause);
          }
      
          @Override
          public void close(MyChannelHandlerContext ctx) throws Exception {
              // 調(diào)用jdk原生的channel方法,關(guān)閉掉連接
              ctx.getPipeline().getChannel().getJavaChannel().close();
          }
      
          @Override
          public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
              // 往外寫的操作,一定是socketChannel
              SocketChannel socketChannel = (SocketChannel) ctx.getPipeline().getChannel().getJavaChannel();
      
              if(msg instanceof ByteBuffer){
                  socketChannel.write((ByteBuffer) msg);
              }else{
                  // msg走到head節(jié)點的時候,必須是ByteBuffer類型
                  throw new Error();
              }
          }
      
          @Override
          public MyChannelEventHandler handler() {
              return this;
          }
      }
      
      MyChannelPipelineHeadContext pipeline哨兵尾結(jié)點
      /**
       * pipeline的tail哨兵節(jié)點
       * */
      public class MyChannelPipelineTailContext extends MyAbstractChannelHandlerContext implements MyChannelEventHandler {
      
          private static final Logger logger = LoggerFactory.getLogger(MyChannelPipelineTailContext.class);
      
          public MyChannelPipelineTailContext(MyChannelPipeline pipeline) {
              super(pipeline, MyChannelPipelineTailContext.class);
          }
      
          @Override
          public void channelRead(MyChannelHandlerContext ctx, Object msg) {
              // 如果channelRead事件傳播到了tail節(jié)點,說明用戶自定義的handler沒有處理好,但問題不大,打日志警告下
              onUnhandledInboundMessage(ctx,msg);
          }
      
          @Override
          public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) {
              // 如果exceptionCaught事件傳播到了tail節(jié)點,說明用戶自定義的handler沒有處理好,但問題不大,打日志警告下
              onUnhandledInboundException(cause);
          }
      
          @Override
          public void close(MyChannelHandlerContext ctx) throws Exception {
              // do nothing
              logger.info("close op, tail context do nothing");
          }
      
          @Override
          public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
              // do nothing
              logger.info("write op, tail context do nothing");
          }
      
          @Override
          public MyChannelEventHandler handler() {
              return this;
          }
      
          private void onUnhandledInboundException(Throwable cause) {
              logger.warn(
                  "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                      "It usually means the last handler in the pipeline did not handle the exception.",
                  cause);
          }
      
          private void onUnhandledInboundMessage(MyChannelHandlerContext ctx, Object msg) {
              logger.debug(
                  "Discarded inbound message {} that reached at the tail of the pipeline. " +
                      "Please check your pipeline configuration.", msg);
      
              logger.debug("Discarded message pipeline : {}. Channel : {}.",
                  ctx.getPipeline(), ctx.channel());
          }
      }
      
      • AbstractChannelHandlerContext作為ChannelHandlerContext子類的基礎(chǔ)骨架,是理解Netty中IO事件傳播機制的重中之重。AbstractChannelHandlerContext做為ChannelPipeline的實際節(jié)點,其擁有prev和next兩個屬性,用于關(guān)聯(lián)鏈表中的前驅(qū)和后繼。
      • 在觸發(fā)IO事件時,AbstractChannelHandlerContext會按照一定的規(guī)則(具體原理在下一節(jié)展開)找到下一個需要處理當(dāng)前類型IO事件的事件處理器(findContextInbound與findContextOutBound方法)。
      • 在找到后會先判斷當(dāng)前線程與目標MyAbstractChannelHandlerContext的執(zhí)行器線程是否相同(inEventLoop),如果是則直接觸發(fā)對應(yīng)handler的回調(diào)方法;如果不是則將當(dāng)前事件包裝成一個任務(wù)交給next節(jié)點的executor執(zhí)行。
        這樣設(shè)計的主要原因是netty作為一個高性能網(wǎng)絡(luò)框架,是非常忌諱使用同步鎖的。EventLoop線程是按照引入taskQueue隊列多寫單讀的方式消費IO事件以及相關(guān)任務(wù)的,這樣可以避免處理IO事件時防止不同線程間并發(fā)而大量加鎖。
      • 舉個例子,一個聊天服務(wù)器,用戶a通過連接A發(fā)送了一條消息給服務(wù)端,而服務(wù)端需要通過連接b將消息同步給用戶b,連接a和連接b屬于不同的EventLoop線程。
        連接a所在的EventLoop在接受到讀事件后,需要往連接b寫出數(shù)據(jù),此時不能直接由連接a的線程執(zhí)行channel的寫出操作(inEventLoop為false),而必須通過execute方法寫入taskQueue交給管理連接b的EventLoop線程,讓它異步的處理。
        試想如果能允許別的EventLoop線程來回調(diào)觸發(fā)不屬于它的channel的IO事件,那么所有的ChannelHandler都必須考慮多線程并發(fā)的問題而被迫引入同步機制,導(dǎo)致性能大幅降低。
      • netty中可以在ChannelHandler中主動的觸發(fā)一些IO事件,比如write寫出事件。如果是使用ChannelHandlerContext.write寫出,則傳播的起點是當(dāng)前Handler節(jié)點;而如果是ChannelHandlerContext.channel.write的方式寫出,其底層就是調(diào)用的是pipeline.write,其傳播的起點則是tail哨兵節(jié)點。
        結(jié)合MyNetty中上述pipeline相關(guān)的代碼,相信讀者應(yīng)該能更好的理解netty中的這一傳播機制。

      2.4 ChannelHandler mask掩碼過濾機制

      通常情況,用戶自定義的IO事件處理器一般都是各司其職的,不會對每一種IO事件都感興趣。比如最經(jīng)典的編解碼handler,一般來說encode編碼處理器只關(guān)心寫出到遠端的出站事件,而decode解碼處理器只關(guān)心讀取到數(shù)據(jù)的入站事件。
      但編碼、解碼處理器都是位于pipeline的同一個鏈表中的,因此IO事件理論上會在鏈表中的所有處理器中傳播。同時由于netty允許ChannelHandler在內(nèi)部自行決定是否將事件往下一個handler節(jié)點傳播,因此如果不引入特別的機制,則意味著用戶自定義的每一個ChannelHandler都必須實現(xiàn)所有的接口方法,并在內(nèi)部添加模版代碼來確保事件能夠繼續(xù)在pipeline中傳播(比如都必須實現(xiàn)fireChannelRead方法,并且都調(diào)用ctx.fireChannelRead方法讓事件能向后傳播)。
      netty中為ChannelHandler定義了非常多的IO事件接口,如果每個ChannelHandler都必須實現(xiàn)所有的IO事件接口,netty的用戶在實現(xiàn)自定義處理器時會非常痛苦,同時在高并發(fā)下不必要的方法調(diào)用也會對性能有所影響。
      為了解決上述問題,netty提供了Skip機制,允許用戶在編寫自定義處理器時僅關(guān)心自己感興趣的IO事件,而其它事件在進行傳播時能自動的跳過當(dāng)前handler節(jié)點在pipeline中繼續(xù)傳播。

      在2.3的AbstractChannelHandlerContext實現(xiàn)中,可以發(fā)現(xiàn)事件傳播的過程中關(guān)鍵的兩個方法(findContextInbound/findContextOutbound)都是基于needSkipContext方法來實現(xiàn)的。
      needSkipContext方法中基于AbstractChannelHandlerContext中的一個屬性executionMask來決定是否需要跳過某個ChannelHandler。

      下面我們結(jié)合MyNetty的源碼來看看這個executionMask屬性是如何被計算得出,又是如何基于該掩碼進行handler過濾的。

      /**
       * 計算并緩存每一個類型的handler所需要處理方法掩碼的管理器
       *
       * 參考自netty的ChannelHandlerMask類
       * */
      public class MyChannelHandlerMaskManager {
      
          private static final Logger logger = LoggerFactory.getLogger(MyChannelHandlerMaskManager.class);
      
          public static final int MASK_EXCEPTION_CAUGHT = 1;
      
          // ==================== inbound ==========================
          public static final int MASK_CHANNEL_REGISTERED = 1 << 1;
          public static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
          public static final int MASK_CHANNEL_ACTIVE = 1 << 3;
          public static final int MASK_CHANNEL_INACTIVE = 1 << 4;
          public static final int MASK_CHANNEL_READ = 1 << 5;
          public static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
      //    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
      //    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
      
          // ===================== outbound =========================
          public static final int MASK_BIND = 1 << 9;
          public static final int MASK_CONNECT = 1 << 10;
      //    static final int MASK_DISCONNECT = 1 << 11;
          public static final int MASK_CLOSE = 1 << 12;
      //    static final int MASK_DEREGISTER = 1 << 13;
          public static final int MASK_READ = 1 << 14;
          public static final int MASK_WRITE = 1 << 15;
          public static final int MASK_FLUSH = 1 << 16;
      
          private static final ThreadLocal<Map<Class<? extends MyChannelEventHandler>, Integer>> MASKS =
              ThreadLocal.withInitial(() -> new WeakHashMap<>(32));
      
          public static int mask(Class<? extends MyChannelEventHandler> clazz) {
              // 對于非共享的handler,會隨著channel的創(chuàng)建而被大量創(chuàng)建
              // 為了避免反復(fù)的計算同樣類型handler的mask掩碼而引入緩存,優(yōu)先從緩存中獲得對應(yīng)處理器類的掩碼
              Map<Class<? extends MyChannelEventHandler>, Integer> cache = MASKS.get();
              Integer mask = cache.get(clazz);
              if (mask == null) {
                  // 緩存中不存在,計算出對應(yīng)類型的掩碼值
                  mask = calculateChannelHandlerMask(clazz);
                  cache.put(clazz, mask);
              }
              return mask;
          }
      
          private static int calculateChannelHandlerMask(Class<? extends MyChannelEventHandler> handlerType) {
              int mask = 0;
      
              // MyChannelEventHandler中的方法一一對應(yīng),如果支持就通過掩碼的或運算將對應(yīng)的bit位設(shè)置為1
      
              if(!needSkip(handlerType,"channelRead", MyChannelHandlerContext.class,Object.class)){
                  mask |= MASK_CHANNEL_READ;
              }
      
              if(!needSkip(handlerType,"exceptionCaught", MyChannelHandlerContext.class,Throwable.class)){
                  mask |= MASK_EXCEPTION_CAUGHT;
              }
      
              if(!needSkip(handlerType,"close", MyChannelHandlerContext.class)){
                  mask |= MASK_CLOSE;
              }
      
              if(!needSkip(handlerType,"write", MyChannelHandlerContext.class,Object.class)){
                  mask |= MASK_WRITE;
              }
      
              return mask;
          }
      
          private static boolean needSkip(Class<?> handlerType, String methodName, Class<?>... paramTypes) {
              try {
                  Method method = handlerType.getMethod(methodName, paramTypes);
      
                  // 如果有skip注解,說明需要跳過
                  return method.isAnnotationPresent(Skip.class);
              } catch (NoSuchMethodException e) {
                  // 沒有這個方法,就不需要設(shè)置掩碼
                  return false;
              }
          }
      }
      
      /**
       * 用于簡化用戶自定義的handler的適配器
       *
       * 由于所有支持的方法都加上了@Skip注解,子類只需要重寫想要關(guān)注的方法即可,其它未重寫的方法將會在事件傳播時被跳過
       * */
      public class MyChannelEventHandlerAdapter implements MyChannelEventHandler{
      
          /**
           * 當(dāng)前是否已經(jīng)被加入sharable緩存
           * */
          public volatile boolean added;
      
          @Skip
          @Override
          public void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception {
              ctx.fireChannelRead(msg);
          }
      
          @Skip
          @Override
          public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) throws Exception {
              ctx.fireExceptionCaught(cause);
          }
      
          @Skip
          @Override
          public void close(MyChannelHandlerContext ctx) throws Exception {
              ctx.close();
          }
      
          @Skip
          @Override
          public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
              ctx.write(msg);
          }
      
      
          private static ConcurrentHashMap<Class<?>, Boolean> isSharableCacheMap = new ConcurrentHashMap<>();
      
          public boolean isSharable() {
              /**
               * MyNetty中直接用全局的ConcurrentHashMap來緩存handler類是否是sharable可共享的,實現(xiàn)起來很簡單
               * 而netty中利用FastThreadLocal做了優(yōu)化,避免了不同線程之間的鎖爭搶
               * 高并發(fā)下每分每秒都會創(chuàng)建大量的鏈接以及所屬的Handler,優(yōu)化后性能會有很大提升
               *
               * See <a >#2289</a>.
               */
              Class<?> clazz = getClass();
              Boolean sharable = isSharableCacheMap.computeIfAbsent(
                      clazz, k -> clazz.isAnnotationPresent(Sharable.class));
              return sharable;
          }
      }
      
      • 在ChannelHandler被加入到pipeline時,會被包裝成AbstractChannelHandlerContext節(jié)點加入鏈表。在AbstractChannelHandlerContext的構(gòu)造方法中,計算出對應(yīng)ChannelHandler的掩碼。
      • ChannelHandler中的每個IO事件的方法都對應(yīng)mask掩碼的一個bit位,bit位為1則代表對該IO事件感興趣,為0則代表不感興趣需要跳過。在IO事件傳播時,通過對應(yīng)掩碼進行與操作快速的判斷是否需要跳過該節(jié)點。
      • 具體每一位的掩碼值是通過方法上是否含有@Skip注解來判斷的,帶上了該注解就表示對當(dāng)前IO事件不感興趣,傳播時需要跳過該ChannelHandler。
      • 掩碼的計算引入了map緩存,相同類型的ChannelHandler實例的掩碼不需要重復(fù)計算,在創(chuàng)建大量連接時,其對應(yīng)pipeline中的AbstractChannelHandlerContext實例也會被大量創(chuàng)建,使用緩存能很好的提高性能。
      • Netty為入站,出站的ChannelHandler分別提供了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter兩個適配器,其方法中默認都帶上了@Skip注解。
        在實際開發(fā)時,用戶可以選擇令自己的自定義ChannelHandler繼承對應(yīng)的Adapter,重寫感興趣的IO事件的方法。重寫后的方法不會帶@Skip注解,會在IO事件傳播時觸發(fā)其自定義的方法邏輯。

      2.5 @Sharable防共享檢測原理簡單介紹

      netty還提供了防共享檢測機制,用來避免用戶錯誤的使用共享ChannelHandler。

      • 正常情況下,每個ChannelPipeline中對應(yīng)的ChannelEventHandler實例都是互相獨立的,但在一些場景下使用共享的ChannelHandler能帶來更好的性能。對于一些無狀態(tài)的,或者架構(gòu)上就是全局唯一的handler(比如dubbo中維護業(yè)務(wù)線程池的Handler),令其在不同的Channel中共享是一個好的選擇。
      • netty會在ChannelHandler加入到pipeline時對其進行檢查,如果存在一個ChannelHandler實例被不止一次的注冊到netty中,netty會認為其被錯誤的注冊。因為默認情況下,一個ChannelHandler實例不能同時被注冊到一個以上的channel中,否則其將出現(xiàn)并發(fā)問題,netty會拋出異常來警告用戶。
        而只有當(dāng)用戶在對應(yīng)的ChannelHandler上顯式標記上@Sharable注解,明確了其就是可以共享,已經(jīng)考慮過并發(fā)的可能性時,才能在重復(fù)注冊時通過校驗。
      • 從個人的經(jīng)歷來說,我在初次使用netty時曾對@Sharable注解的功能有過誤解。第一感覺是在構(gòu)造流水線時,被打上了@Sharable注解的Handler會類似spring的單例模式一樣,即使重復(fù)注冊也會被netty自動的弄成全局唯一。
        但在了解了其工作原理后發(fā)現(xiàn)是反過來的,@Sharable更多的是起到一個檢查的作用,避免用戶錯誤的重復(fù)注冊并發(fā)不安全的ChannelHandler。

      2.6 EventLoop改造接入pipeline流水線

      目前l(fā)ab2版本的EventLoop還比較簡單,只是在處理讀事件的時候從原來的直接調(diào)用EventHandler的fireChannelRead方法,改造成了調(diào)用pipeline的fireChannelRead方法,令讀事件在整個ChannelHandler流水線中傳播。

          private void processReadEvent(SelectionKey key) throws Exception {
              SocketChannel socketChannel = (SocketChannel)key.channel();
      
              // 目前所有的attachment都是MyNioChannel
              MyNioSocketChannel myNioChannel = (MyNioSocketChannel) key.attachment();
      
              // 簡單起見,buffer不緩存,每次讀事件來都新創(chuàng)建一個
              // 暫時也不考慮黏包/拆包場景(Netty中靠ByteToMessageDecoder解決,后續(xù)再分析其原理),理想的認為每個消息都小于1024,且每次讀事件都只有一個消息
              ByteBuffer readBuffer = ByteBuffer.allocate(1024);
      
              int byteRead = socketChannel.read(readBuffer);
              logger.info("processReadEvent byteRead={}",byteRead);
              if(byteRead == -1){
                  // 簡單起見不考慮tcp半連接的情況,返回-1直接關(guān)掉連接
                  socketChannel.close();
              }else{
                  // 將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作
                  readBuffer.flip();
                  // 根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
                  byte[] bytes = new byte[readBuffer.remaining()];
                  // 將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
                  readBuffer.get(bytes);
      
                  if(myNioChannel != null) {
                      // 觸發(fā)pipeline的讀事件
                      myNioChannel.getChannelPipeline().fireChannelRead(bytes);
                  }else{
                      logger.error("processReadEvent attachment myNioChannel is null!");
                  }
              }
          }
      

      3.MyNettyBootstrap與新版本Echo服務(wù)器demo實現(xiàn)

      在實現(xiàn)了pipeline流水線功能后,配置自定義事件處理器的方式也要有所改變,MyNetty參考netty實現(xiàn)了一個簡單的Client/Server的Bootstrap。
      其中構(gòu)建pipeline的方式與netty有所不同,netty中使用了一個特殊的ChannelInboundHandler,即ChannelInitializer。ChannelInitializer會在連接被注冊時觸發(fā)initChannel方法,執(zhí)行用戶自定義的組裝pipeline的邏輯,然后再將這個特殊的Handler從鏈表中remove掉以完成最終channel鏈表的構(gòu)建。
      而MyNetty簡單起見,并沒有支持用戶自定義channel的惰性創(chuàng)建,也不支持在運行時動態(tài)的增加或刪除pipeline中鏈表中的handler(所以沒有那些handler狀態(tài)的臨界值判斷),而是直接設(shè)計了一個MyChannelPipelineSupplier接口,在MyNIOChannel被創(chuàng)建時,也一并創(chuàng)建pipeline中的handler鏈表。

      服務(wù)端Bootstrap
      public class MyNioServerBootstrap {
      
          private static final Logger logger = LoggerFactory.getLogger(MyNioServerBootstrap.class);
      
          private final InetSocketAddress endpointAddress;
      
          private final MyNioEventLoopGroup bossGroup;
      
          public MyNioServerBootstrap(InetSocketAddress endpointAddress,
                                      MyChannelPipelineSupplier childChannelPipelineSupplier,
                                      int bossThreads, int childThreads) {
              this.endpointAddress = endpointAddress;
      
              MyNioEventLoopGroup childGroup = new MyNioEventLoopGroup(childChannelPipelineSupplier,childThreads);
              this.bossGroup = new MyNioEventLoopGroup(childChannelPipelineSupplier, bossThreads, childGroup);
          }
      
          public void start() throws IOException {
              ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
              serverSocketChannel.configureBlocking(false);
      
              MyNioEventLoop myNioEventLoop = this.bossGroup.next();
      
              myNioEventLoop.execute(()->{
                  try {
                      Selector selector = myNioEventLoop.getUnwrappedSelector();
                      serverSocketChannel.socket().bind(endpointAddress);
                      SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
                      // 監(jiān)聽accept事件
                      selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
                      logger.info("MyNioServer do start! endpointAddress={}",endpointAddress);
                  } catch (IOException e) {
                      logger.error("MyNioServer do bind error!",e);
                  }
              });
          }
      }
      
      客戶端Bootstrap
      public class MyNioClientBootstrap {
      
          private static final Logger logger = LoggerFactory.getLogger(MyNioClientBootstrap.class);
      
          private final InetSocketAddress remoteAddress;
      
          private final MyNioEventLoopGroup eventLoopGroup;
      
          private MyNioSocketChannel myNioSocketChannel;
      
          private final MyChannelPipelineSupplier myChannelPipelineSupplier;
      
          public MyNioClientBootstrap(InetSocketAddress remoteAddress, MyChannelPipelineSupplier myChannelPipelineSupplier) {
              this.remoteAddress = remoteAddress;
      
              this.eventLoopGroup = new MyNioEventLoopGroup(myChannelPipelineSupplier, 1);
      
              this.myChannelPipelineSupplier = myChannelPipelineSupplier;
          }
      
          public void start() throws IOException {
              SocketChannel socketChannel = SocketChannel.open();
              socketChannel.configureBlocking(false);
      
              MyNioEventLoop myNioEventLoop = this.eventLoopGroup.next();
      
              myNioEventLoop.execute(()->{
                  try {
                      Selector selector = myNioEventLoop.getUnwrappedSelector();
      
                      myNioSocketChannel = new MyNioSocketChannel(selector,socketChannel,myChannelPipelineSupplier);
      
                      myNioEventLoop.register(myNioSocketChannel);
      
                      // doConnect
                      // Returns: true if a connection was established,
                      //          false if this channel is in non-blocking mode and the connection operation is in progress;
                      if(!socketChannel.connect(remoteAddress)){
                          // 簡單起見也監(jiān)聽READ事件,相當(dāng)于netty中開啟了autoRead
                          int clientInterestOps = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;
      
                          myNioSocketChannel.getSelectionKey().interestOps(clientInterestOps);
      
                          // 監(jiān)聽connect事件
                          logger.info("MyNioClient do start! remoteAddress={}",remoteAddress);
                      }else{
                          logger.info("MyNioClient do start connect error! remoteAddress={}",remoteAddress);
      
                          // connect操作直接失敗,關(guān)閉連接
                          socketChannel.close();
                      }
                  } catch (IOException e) {
                      logger.error("MyNioClient do connect error!",e);
                  }
              });
          }
      }
      

      原來的Echo服務(wù)端/客戶端demo也對邏輯進行了拆分,將業(yè)務(wù)邏輯和編解碼邏輯拆分成了不同的ChannelHandler。

      Echo服務(wù)器與客戶端編解碼處理器實現(xiàn)
      public class EchoMessageEncoder extends MyChannelEventHandlerAdapter {
      
          private static final Logger logger = LoggerFactory.getLogger(EchoMessageEncoder.class);
      
          @Override
          public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
              // 寫事件從tail向head傳播,msg一定是string類型
              String message = (String) msg;
      
              ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
              writeBuffer.put(message.getBytes(StandardCharsets.UTF_8));
              writeBuffer.flip();
      
              logger.info("EchoMessageEncoder message to byteBuffer, " +
                  "message={}, writeBuffer={}",message,writeBuffer);
      
              ctx.write(writeBuffer);
          }
      }
      
      public class EchoMessageDecoder extends MyChannelEventHandlerAdapter {
      
          private static final Logger logger = LoggerFactory.getLogger(EchoMessageDecoder.class);
      
          @Override
          public void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception {
              // 讀事件從head向tail傳播,msg一定是string類型
              String receivedStr = new String((byte[]) msg, StandardCharsets.UTF_8);
      
              logger.info("EchoMessageDecoder byteBuffer to message, " +
                  "msg={}, receivedStr={}",msg,receivedStr);
      
              // 當(dāng)前版本,不考慮黏包拆包等各種問題,decoder只負責(zé)將byte轉(zhuǎn)為string
              ctx.fireChannelRead(receivedStr);
          }
      }
      
      Echo服務(wù)器與客戶端demo
      public class ServerDemo {
          public static void main(String[] args) throws IOException {
              MyNioServerBootstrap myNioServerBootstrap = new MyNioServerBootstrap(
                  new InetSocketAddress(8080),
                  // 先簡單一點,只支持childEventGroup自定義配置pipeline
                  new MyChannelPipelineSupplier() {
                      @Override
                      public MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {
                          MyChannelPipeline myChannelPipeline = new MyChannelPipeline(myNioChannel);
                          // 注冊自定義的EchoServerEventHandler
                          myChannelPipeline.addLast(new EchoMessageEncoder());
                          myChannelPipeline.addLast(new EchoMessageDecoder());
                          myChannelPipeline.addLast(new EchoServerEventHandler());
                          return myChannelPipeline;
                      }
                  },1,5);
              myNioServerBootstrap.start();
          }
      }
      
      public class ClientDemo {
          public static void main(String[] args) throws IOException {
              MyNioClientBootstrap myNioClientBootstrap = new MyNioClientBootstrap(new InetSocketAddress(8080),new MyChannelPipelineSupplier() {
                  @Override
                  public MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {
                      MyChannelPipeline myChannelPipeline = new MyChannelPipeline(myNioChannel);
                      // 注冊自定義的EchoClientEventHandler
                      myChannelPipeline.addLast(new EchoMessageEncoder());
                      myChannelPipeline.addLast(new EchoMessageDecoder());
                      myChannelPipeline.addLast(new EchoClientEventHandler());
                      return myChannelPipeline;
                  }
              });
              myNioClientBootstrap.start();
          }
      }
      

      總結(jié)

      • 在lab2中,MyNetty實現(xiàn)了pipeline流水線機制,允許用戶構(gòu)造自定義處理器鏈條,進行功能的解耦。同時也提供了一個Bootstrap腳手架幫助用戶更快捷的實現(xiàn)自己的網(wǎng)絡(luò)應(yīng)用程序。
        相信在了解了MyNetty的簡易版本流水線功能實現(xiàn)后,能幫助讀者更好的理解netty中更加復(fù)雜的pipeline工作原理。
      • 目前為止,受限于MyNetty現(xiàn)版本的簡陋功能,我們的Echo服務(wù)應(yīng)用程序還非常原始,大量極端場景下的臨界條件都沒有處理。比如分配的ByteBuffer是固定大小,無法動態(tài)擴容,接受的消息體過大就會出錯;發(fā)送和接受的消息也存在黏包、拆包的問題,等等。千里之行始于足下,在后續(xù)的lab中,MyNetty會逐步的完善上述提到的問題。
      • 在迭代MyNetty的過程中,讀者也將能夠體會到Netty的強大之處。因為在普通使用者無法直接感知的地方,netty底層處理了大量的邊界情況,這才使得普通開發(fā)者能夠基于netty高效的構(gòu)建起一個健壯的網(wǎng)絡(luò)應(yīng)用程序。

      博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab2_pipeline_handle 分支),內(nèi)容如有錯誤,還請多多指教。

      posted on 2025-07-03 20:40  小熊餐館  閱讀(289)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 99精品视频在线观看免费蜜桃| 久热这里只有精品12| 隆林| 国产精品亚洲五月天高清| 日韩精品一区二区三区不卡| 中文字幕久久人妻熟人妻 | 亚洲国产精品日韩在线 | 岳阳县| 日韩欧激情一区二区三区| 午夜福利精品国产二区| 性色av无码久久一区二区三区 | 男女激情一区二区三区| 国产成人AV在线免播放观看新| 成人天堂资源www在线| 国内自拍小视频在线看| 亚洲精品无码日韩国产不卡av| 精品人妻人人做人人爽夜夜爽| 欧美三级欧美成人高清| 久久精品一区二区三区综合| 麻豆精品久久久久久久99蜜桃| 亚洲AV永久无码嘿嘿嘿嘿| 最新国产精品拍自在线观看| 国产区免费精品视频| 精品中文人妻在线不卡| 亚洲综合一区二区精品导航| 国产成人午夜福利院| 国产精品麻豆欧美日韩ww | 亚洲色大成网站WWW永久麻豆| 亚洲中文无码手机永久| 人妻少妇偷人精品一区| 国产精品理论片| 国产亚洲亚洲国产一二区| 亚洲欧美人成电影在线观看| 灌阳县| 产综合无码一区| 国产精品成人一区二区三区| 国内精品久久久久影院网站 | 久久人搡人人玩人妻精品| 中文字幕在线精品国产| 黑人巨大无码中文字幕无码| 精品国产中文字幕在线|