<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      netty服務端啟動--ServerBootstrap源碼解析

      netty服務端啟動--ServerBootstrap源碼解析

      前面的第一篇文章中,我以spark中的netty客戶端的創建為切入點,分析了netty的客戶端引導類Bootstrap的參數設置以及啟動過程。顯然,我們還有另一個重要的部分--服務端的初始化和啟動過程沒有探究,所以這一節,我們就來從源碼層面詳細分析一下netty的服務端引導類ServerBootstrap的啟動過程。

      spark中netty服務端的創建

      我們仍然以spark中對netty的使用為例,以此為源碼分析的切入點,首先我們看一下spark的NettyRpc模塊中創建netty服務端引導類的代碼:

      TransportServer.init

      TransportServer的構造方法中會調用init方法,ServerBootstrap類就是在init方法中被創建并初始化以及啟動的。
      這個方法主要分為三塊:

      • 創建ServerBootstrap對象,并設置各種參數。我們看到,這里的bossGroup和workerGroup是同一個線程組,此外還設置了socket的一些參數如排隊的連接數,接收緩沖區,發送緩沖區大小等。
      • 設置childHandler參數,之所以把這個參數的設置單獨拿出來就是為了凸顯這個參數的重要性,childHandler參數是用戶實現時間處理邏輯的地方
      • 最后將服務端綁定到某個端口,同時在綁定的過程中也會啟動服務端,開始監聽io事件。

      很顯然,ServerBootstrap的啟動入口就是bind方法。

            // 初始化netty服務端
            private void init(String hostToBind, int portToBind) {
            
              // io模式,有兩種選項NIO, EPOLL
              IOMode ioMode = IOMode.valueOf(conf.ioMode());
              // 創建bossGroup和workerGroup,即主線程組合子線程組
              EventLoopGroup bossGroup =
                NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
              EventLoopGroup workerGroup = bossGroup;
            
              // 緩沖分配器,分為堆內存和直接內存
              PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
                conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
            
              // 創建一個netty服務端引導對象,并設置相關參數
              bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NettyUtils.getServerChannelClass(ioMode))
                .option(ChannelOption.ALLOCATOR, allocator)
                .childOption(ChannelOption.ALLOCATOR, allocator);
            
              // 內存使用的度量對象
              this.metrics = new NettyMemoryMetrics(
                allocator, conf.getModuleName() + "-server", conf);
            
              // 排隊的連接數
              if (conf.backLog() > 0) {
                bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
              }
            
              // socket接收緩沖區大小
              if (conf.receiveBuf() > 0) {
                bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
              }
            
              // socket發送緩沖區大小
              if (conf.sendBuf() > 0) {
                bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
              }
            
              // 子channel處理器
              bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                  RpcHandler rpcHandler = appRpcHandler;
                  for (TransportServerBootstrap bootstrap : bootstraps) {
                    rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
                  }
                  context.initializePipeline(ch, rpcHandler);
                }
              });
            
              InetSocketAddress address = hostToBind == null ?
                  new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
              // 綁定到ip地址和端口
              channelFuture = bootstrap.bind(address);
              // 同步等待綁定成功
              channelFuture.syncUninterruptibly();
            
              port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
              logger.debug("Shuffle server started on port: {}", port);
            }
      

      AbstractBootstrap.init(SocketAddress localAddress)

      這里的校驗主要是對group和channelFactory的非空校驗
      public ChannelFuture bind(SocketAddress localAddress) {
      validate();
      return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
      }

      AbstractBootstrap.doBind

      這個方法,我們之前在分析Bootstrap的啟動過程時提到過,它的主要作用如下:

      • 通過反射根據傳入的channel類型創建一個具體的channel對象
      • 調用init方法對這個channel對象進行初始化
      • 將初始化完成的channel對象注冊到一個EventLoop線程上

      之前,我們分析了NioSocketChannel的構造過程,以及Bootstarp中對channel的初始化過程,
      本節我們要分析NioServerSocketChannel的構造過程,以及ServerBootstrap的init方法的實現。

      private ChannelFuture doBind(final SocketAddress localAddress) {
          // 創建一個channel,并對這個channel做一些初始化工作
          final ChannelFuture regFuture = initAndRegister();
          final Channel channel = regFuture.channel();
          if (regFuture.cause() != null) {
              return regFuture;
          }
      
          if (regFuture.isDone()) {
              // At this point we know that the registration was complete and successful.
              ChannelPromise promise = channel.newPromise();
              // 將這個channel綁定到指定的地址
              doBind0(regFuture, channel, localAddress, promise);
              return promise;
          } else {// 對于尚未注冊成功的情況,采用異步的方式,即添加一個回調
              // Registration future is almost always fulfilled already, but just in case it's not.
              final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              regFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      Throwable cause = future.cause();
                      if (cause != null) {
                          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                          // IllegalStateException once we try to access the EventLoop of the Channel.
                          promise.setFailure(cause);
                      } else {
                          // Registration was successful, so set the correct executor to use.
                          // See https://github.com/netty/netty/issues/2586
                          promise.registered();
      
                          doBind0(regFuture, channel, localAddress, promise);
                      }
                  }
              });
              return promise;
          }
      }
      

      NioServerSocketChannel的構造方法

      這里通過調用jdk的api創建了一個ServerSocketChannel。
      public NioServerSocketChannel() {
      this(newSocket(DEFAULT_SELECTOR_PROVIDER));
      }

      與NioSocketChannelConfig類似,NioServerSocketChannelConfig也是一種門面模式,是對NioServerSocketChannel中的參數接口的封裝。
      此外,我們注意到,這里規定了NioServerSocketChannel的初始的感興趣的事件是ACCEPT事件,即默認會監聽請求建立連接的事件。
      而在NioSocketChannel中的初始感興趣的事件是read事件。
      所以,這里與NioSocketChannel構造過程最主要的不同就是初始的感興趣事件不同。

      public NioServerSocketChannel(ServerSocketChannel channel) {
          super(null, channel, SelectionKey.OP_ACCEPT);
          config = new NioServerSocketChannelConfig(this, javaChannel().socket());
      }
      

      這里首先調用了父類的構造方法,最終調用了AbstractNioChannel類的構造方法,這個過程我們在之前分析NioSocketChannel初始化的時候已經詳細說過,主要就是創建了內部的Unsafe對象和ChannelPipeline對象。

      ServerBootstrap.init

      分析完了channel的構造過程,我們再來看一下ServerBootstrap是怎么對channel對象進行初始化的。

      • 設置參數,設置屬性
      • 獲取子channel的參數和屬性,以便在有新的連接時給新創建的channel設置參數和屬性
      • 給serverChannel中添加一個重要的handler,這個handler中實現了對新創建的channel的處理邏輯。

      所以,很顯然,我們接下來就要看一下這個特殊的handler,ServerBootstrapAcceptor的read方法。

      void init(Channel channel) throws Exception {
          final Map<ChannelOption<?>, Object> options = options0();
          // 設置參數
          synchronized (options) {
              setChannelOptions(channel, options, logger);
          }
      
          // 設置屬性
          final Map<AttributeKey<?>, Object> attrs = attrs0();
          synchronized (attrs) {
              for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                  @SuppressWarnings("unchecked")
                  AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                  channel.attr(key).set(e.getValue());
              }
          }
      
          ChannelPipeline p = channel.pipeline();
      
          // 子channel的group和handler參數
          final EventLoopGroup currentChildGroup = childGroup;
          final ChannelHandler currentChildHandler = childHandler;
          final Entry<ChannelOption<?>, Object>[] currentChildOptions;
          final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
          synchronized (childOptions) {
              currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
          }
          synchronized (childAttrs) {
              currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
          }
      
          // 添加處理器
          p.addLast(new ChannelInitializer<Channel>() {
              @Override
              public void initChannel(final Channel ch) throws Exception {
                  final ChannelPipeline pipeline = ch.pipeline();
                  // 一般情況下,對于ServerBootstrap用戶無需設置handler
                  ChannelHandler handler = config.handler();
                  if (handler != null) {
                      pipeline.addLast(handler);
                  }
      
                  // 這里添加了一個關鍵的handler,并且順手啟動了對應的EventLoop的線程
                  ch.eventLoop().execute(new Runnable() {
                      @Override
                      public void run() {
                          pipeline.addLast(new ServerBootstrapAcceptor(
                                  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                      }
                  });
              }
          });
      }
      

      NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)

      在分析ServerBootstrapAcceptor之前,我們首先來回顧一下NioEventLoop的循環中,對于accept事件的處理邏輯,這里截取其中的一小段代碼:

              // 處理read和accept事件
              if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                  unsafe.read();
              }
      

      可見,對于accept事件和read事件一樣,調用NioUnsafe的read方法

      AbstractNioMessageChannel.NioMessageUnsafe.read

      因為NioServerSocketChannel繼承了AbstractNioMessageChannel,并且read方法的實現也是在AbstractNioMessageChannel中,

      • doReadMessages是一個抽象方法,在NioServerSocketChannel的實現中,這個方法調用jdk的api接收一個連接,并包裝成NioSocketChannel對象
      • 以讀取到的channel對象作為消息,在channelPipeline中觸發一個讀事件

      根據前面對channelPipeline的分析,我們知道,讀事件對從頭結點開始,向尾節點傳播。上面我們也提到了,對于初始的那個NioServerSocketChannel,會在ServerBootstarp的init方法中向這個channel的處理鏈中加入一個ServerBootstrapAcceptor處理器,所以,很顯然,接下來我們應該分析ServerBootstrapAcceptor中對讀事件的處理。

          public void read() {
              // 確認當前代碼的執行是在EventLoop的線程中
              assert eventLoop().inEventLoop();
              final ChannelConfig config = config();
              final ChannelPipeline pipeline = pipeline();
              final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
              allocHandle.reset(config);
      
              boolean closed = false;
              Throwable exception = null;
              try {
                  try {
                      do {
                          // 這里讀取到的是建立的連接對應的channel,
                          // jdk的socketChannel被包裝成了netty的NioSocketChannel
                          int localRead = doReadMessages(readBuf);
                          if (localRead == 0) {
                              break;
                          }
                          if (localRead < 0) {
                              closed = true;
                              break;
                          }
      
                          allocHandle.incMessagesRead(localRead);
                      } while (allocHandle.continueReading());
                  } catch (Throwable t) {
                      exception = t;
                  }
      
                  int size = readBuf.size();
                  for (int i = 0; i < size; i ++) {
                      readPending = false;
                      // 把接收到的每一個channel作為消息,在channelPipeline中觸發一個讀事件
                      pipeline.fireChannelRead(readBuf.get(i));
                  }
                  readBuf.clear();
                  allocHandle.readComplete();
                  // 最后觸發一個讀完成的事件
                  pipeline.fireChannelReadComplete();
      
                  if (exception != null) {
                      closed = closeOnReadError(exception);
      
                      pipeline.fireExceptionCaught(exception);
                  }
      
                  if (closed) {
                      inputShutdown = true;
                      if (isOpen()) {
                          close(voidPromise());
                      }
                  }
              } finally {
                  // Check if there is a readPending which was not processed yet.
                  // This could be for two reasons:
                  // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                  // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                  //
                  // See https://github.com/netty/netty/issues/2254
                  if (!readPending && !config.isAutoRead()) {
                      removeReadOp();
                  }
              }
          }
      }
      

      ServerBootstrapAcceptor.channelRead

      代碼邏輯還是比較簡單的,因為有了前面的鋪墊,即在ServerBootstrap的init方法對創始的那個serverChannel進行初始化時,將用戶設置的子channel的參數,屬性,子channel的handler和子group等參數作為構造參數全部傳給了ServerBootstrapAcceptor,所以在這里直接用就行了。
      其實這里的子channel的初始化和注冊過程和Bootstrap中對一個新創建的channel的初始化過程基本一樣,區別在于Bootstrap中channel是用戶代碼通過調用connect方法最終在initAndregistry中通過反射構造的一個對象;而在服務端,通過監聽ServerSocketChannel的accept事件,當有新的連接建立請求時,會自動創建一個SocketChannel(jdk的代碼實現),然后NioServerSocketChannel將其包裝成一個NioSocketChannel,并作為消息在傳遞給處理器,所以在ServerSocketChannel中的子channel的創建是由底層的jdk的庫實現的。

       public void channelRead(ChannelHandlerContext ctx, Object msg) {
              // 類型轉換,這里的強制轉換是安全的的,
              // 是由各種具體的AbstractNioMessageChannel子類型的實現保證的
              // 各種具體的AbstractNioMessageChannel子類型的讀方法確保它們讀取并最終返回的是一個Channel類型
              final Channel child = (Channel) msg;
      
              // 給子channel添加handler
              child.pipeline().addLast(childHandler);
      
              // 給子channel設置參數
              setChannelOptions(child, childOptions, logger);
      
              // 給子channel設置屬性
              for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                  child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
              }
      
              try {
                  // 將子channel注冊到子group中
                  childGroup.register(child).addListener(new ChannelFutureListener() {
                      @Override
                      public void operationComplete(ChannelFuture future) throws Exception {
                          if (!future.isSuccess()) {
                              forceClose(child, future.cause());
                          }
                      }
                  });
              } catch (Throwable t) {
                  forceClose(child, t);
              }
          }
      

      AbstractBootstrap.doBind0

      回到doBind方法中,在完成了channel的構造,初始化和注冊邏輯后,接下來就要把這個server類型的channel綁定到一個地址上,這樣才能接受客戶端建立連接的請求。
      從代碼中可以看出,調用了channel的bind方法實現綁定的邏輯。

       private static void doBind0(
              final ChannelFuture regFuture, final Channel channel,
              final SocketAddress localAddress, final ChannelPromise promise) {
      
          // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
          // the pipeline in its channelRegistered() implementation.
          channel.eventLoop().execute(new Runnable() {
              @Override
              public void run() {
                  if (regFuture.isSuccess()) {
                      // 調用了channel.bind方法完成綁定的邏輯
                      channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                  } else {
                      promise.setFailure(regFuture.cause());
                  }
              }
          });
      }
      

      AbstractChannel.bind

      bind操作的傳遞是從尾節點開始向前傳遞,所以我們直接看Headcontext對于bind方法的實現
      public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
      return pipeline.bind(localAddress, promise);
      }

      DefaultChannelPipeline.bind

      public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
          return tail.bind(localAddress, promise);
      }
      

      HeadContext.bind

      調用了unsafe的bind方法。

          public void bind(
                  ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
              unsafe.bind(localAddress, promise);
          }
      

      因為后面右有幾個事件的觸發,每個觸發事件都是通過channel的相關方法來觸發,然后又是通過channelpipeline的傳遞事件,這些事件最后基本都是由HeadContext處理了,所以這里我只簡單地敘述一下后面的 大概邏輯,代碼比較繁瑣,而且很多都是相同的調用過程,所以就不貼代碼了。

      • 通過前面的分析,我們知道首先通過channel觸發了一個bind操作,這個操作的實現最終由HeadCOntex實現,HeadContex的實現中是調用了unsafe.bind
      • bind的實現邏輯中,首先通過jdk的api完成了ServerSocketChannel的綁定,然后又觸發了一個channelActive的事件,這個事件的處理最終也是有HeadContext實現
      • 在HeadContext對channelActive操作的實現中,觸發了一個read()操作,注意這里的這個read方法是不帶參數的,是ChannelOutboundInvoker接口中定義的一個方法,也是有HeadContext實現
      • HeadContext對read操作的實現中,調用了Unsafe.beginRead方法,經過幾個子類的具體實現后,最終由AbstractNioChannel.doBeginRead實現具體的開始讀的邏輯,

      從代碼中可以看出來,最終調用了jdk的api,將感興趣的事件添加到selectionKey中。通過前面的 分析,我們知道對于NioSocketChannel,它的感興趣的讀事件類型是SelectionKey.OP_READ,也就是讀事件;
      而對于NioServerSocketChannel,根據前面對其構造方法的分析,它的感興趣的事件是SelectionKey.OP_ACCEPT,也就是建立連接的事件。

      protected void doBeginRead() throws Exception {
          // Channel.read() or ChannelHandlerContext.read() was called
          final SelectionKey selectionKey = this.selectionKey;
          if (!selectionKey.isValid()) {
              return;
          }
      
          readPending = true;
      
          // 將讀事件類型加入到selectionKey的感興趣的事件中
          // 這樣jdk底層的selector就會監聽相應類型的事件
          final int interestOps = selectionKey.interestOps();
          if ((interestOps & readInterestOp) == 0) {
              selectionKey.interestOps(interestOps | readInterestOp);
          }
      }
      

      總結

      到這里,我們就把ServerBootstrap的主要功能代碼分析完了,這里面主要包括三個方面:

      • ServerBootstrap中對server類型的channel的初始化,包括最重要的handler----ServerBootstrapAcceptor的添加
      • ServerBootstrapAcceptor中對于新創建的子channel的處理,包括初始化和注冊的邏輯
      • 將serverChannel綁定到具體的地址上,綁定過程中也啟動了對應的注冊的線程。

      posted on 2019-06-30 01:52  _朱葛  閱讀(759)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 一色屋精品视频在线观看| 亚洲日韩精品一区二区三区| 人妻无码中文专区久久app| 日韩精品卡一卡二卡三卡四| 又黄又无遮挡AAAAA毛片| 亚洲国产成人资源在线| 亚洲色成人网站www永久下载| 亚洲成人av免费一区| 熟女熟妇伦av网站| 国产av丝袜旗袍无码网站| 国产精品国产精品国产专区| 久久综合亚洲色一区二区三区| 日韩深夜福利视频在线观看 | 在线a级毛片无码免费真人| 大胸美女吃奶爽死视频| 婷婷99视频精品全部在线观看| 九九热在线视频免费观看| 国产在线视频一区二区三区| 精品无码久久久久国产| 成人做爰www网站视频| 精品人妻系列无码天堂| 国产999精品2卡3卡4卡| 濮阳市| 日本夜爽爽一区二区三区| 国产一区一一区高清不卡| 好吊妞人成视频在线观看| 亚洲最大成人在线播放| 日本老熟女一二三区视频| 18禁男女爽爽爽午夜网站免费| 亚洲免费一区二区av| 国产免费福利网站| 暖暖影院日本高清...免费| 国产不卡一区二区在线| www久久只有这里有精品| 亚洲一区二区三区丝袜| 韩国午夜理伦三级| 91精品午夜福利在线观看 | 东京热加勒比无码少妇| 欧美人与zoxxxx另类| 国产亚洲精品黑人粗大精选| 亚洲精品国自产拍影院|