<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      深入淺出NIO Socket實現機制

      前言

      Java NIO 由以下幾個核心部分組成:

      • Buffer
      • Channel
      • Selector

      以前基于net包進行socket編程時,accept方法會一直阻塞,直到有客戶端請求的到來,并返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取并處理后面的請求;當然我們可以把獲取socket和處理socket的過程分開,一個線程負責accept,線程池負責處理請求。

      NIO為我們提供了更好的解決方案,采用選擇器(Selector)找出已經準備好讀寫的socket,并按順序處理,基于通道(Channel)和緩沖區(Buffer)來傳輸和保存數據。

      Buffer和Channel已經介紹過深入淺出NIO Channel和Buffer,本文主要介紹NIO的Selector和Socket的實踐以及實現原理。

      Selector是什么?

      在養雞場,有這一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來。這樣,如果負責人想知道雞場情況,只需要到那個人查詢即可,當然前提是,負責得讓那個人知道需要記錄哪些情況。

      Selector的作用相當這個人的工作,每個雞籠相當于一個SocketChannel,單個線程通過Selector可以管理多個SocketChannel。


      A Thread uses a Selector to handle 3 Channels

      為了實現Selector管理多個SocketChannel,必須將多個具體的SocketChannel對象注冊到Selector對象,并聲明需要監聽的事件,目前有4種類型的事件:

      connect:客戶端連接服務端事件,對應值為SelectionKey.OP_CONNECT(8)
      accept:服務端接收客戶端連接事件,對應值為SelectionKey.OP_ACCEPT(16)
      read:讀事件,對應值為SelectionKey.OP_READ(1)
      write:寫事件,對應值為SelectionKey.OP_WRITE(4)

      當SocketChannel有對應的事件發生時,Selector能夠覺察到并進行相應的處理。

      為了更好地理解NIO Socket,先來看一段服務端的示例代碼

      ServerSocketChannel serverChannel = ServerSocketChannel.open();
      serverChannel.configureBlocking(false);
      serverChannel.socket().bind(new InetSocketAddress(port));
      Selector selector = Selector.open();
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
      while(true){
          int n = selector.select();
          if (n == 0) continue;
          Iterator ite = this.selector.selectedKeys().iterator();
          while(ite.hasNext()){
              SelectionKey key = (SelectionKey)ite.next();
              if (key.isAcceptable()){
                  SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
                  clntChan.configureBlocking(false);
                  //將選擇器注冊到連接到的客戶端信道,
                  //并指定該信道key值的屬性為OP_READ,
                  //同時為該信道指定關聯的附件
                  clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
              }
              if (key.isReadable()){
                  handleRead(key);
              }
              if (key.isWritable() && key.isValid()){
                  handleWrite(key);
              }
              if (key.isConnectable()){
                  System.out.println("isConnectable = true");
              }
            ite.remove();
          }
      }

       

      服務端連接過程
      1、創建ServerSocketChannel實例serverSocketChannel,并bind到指定端口。
      2、創建Selector實例selector;
      3、將serverSocketChannel注冊到selector,并指定事件OP_ACCEPT。
      4、while循環執行:
      4.1、調用select方法,該方法會阻塞等待,直到有一個或多個通道準備好了I/O操作或等待超時。
      4.2、獲取選取的鍵列表;
      4.3、循環鍵集中的每個鍵:
      4.3.a、獲取通道,并從鍵中獲取附件(如果添加了附件);
      4.3.b、確定準備就緒的操縱并執行,如果是accept操作,將接收的信道設置為非阻塞模式,并注冊到選擇器;
      4.3.c、如果需要,修改鍵的興趣操作集;
      4.3.d、從已選鍵集中移除鍵

      在步驟3中,selector只注冊了serverSocketChannel的OP_ACCEPT事件

      • 如果有客戶端A連接服務,執行select方法時,可以通過serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊socketChannel的OP_READ事件。
      • 如果客戶端A發送數據,會觸發read事件,這樣下次輪詢調用select方法時,就能通過socketChannel讀取數據,同時在selector上注冊該socketChannel的OP_WRITE事件,實現服務器往客戶端寫數據。

      NIO Socket實現原理

      SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。

      public static SelectorProvider provider() {
          synchronized (lock) {
              if (provider != null)
                  return provider;
              return AccessController.doPrivileged(
                  new PrivilegedAction<SelectorProvider>() {
                      public SelectorProvider run() {
                              if (loadProviderFromProperty())
                                  return provider;
                              if (loadProviderAsService())
                                  return provider;
                              provider = sun.nio.ch.DefaultSelectorProvider.create();
                              return provider;
                          }
                      });
          }
      }

      SelectorProvider在windows和linux下有不同的實現,provider方法會返回對應的實現。

      Selector分析

      Selector是如何做到同時管理多個socket?

      Selector初始化時,會實例化PollWrapper、SelectionKeyImpl數組和Pipe。

      WindowsSelectorImpl(SelectorProvider sp) throws IOException {
          super(sp);
          pollWrapper = new PollArrayWrapper(INIT_CAP);
          wakeupPipe = Pipe.open();
          wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
      
          // Disable the Nagle algorithm so that the wakeup is more immediate
          SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
          (sink.sc).socket().setTcpNoDelay(true);
          wakeupSinkFd = ((SelChImpl)sink).getFDVal();
          pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
      }

       

      pollWrapper用Unsafe類申請一塊物理內存,存放注冊時的socket句柄fdVal和event的數據結構pollfd,其中pollfd共8位,0~3位保存socket句柄,4~7位保存event。


      pollfd

      pollWrapper


      pollWrapper提供了fdVal和event數據的相應操作,如添加操作通過Unsafe的putInt和putShort實現。

      void putDescriptor(int i, int fd) {
          pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
      }
      void putEventOps(int i, int event) {
          pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
      }

       

      SelectionKeyImpl保存注冊時的channel、selector、event以及保存在pollWrapper的偏移位置index。

      先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何實現的:

      public final SelectionKey register(Selector sel, int ops, Object att)
          throws ClosedChannelException {
          synchronized (regLock) {
              SelectionKey k = findKey(sel);
              if (k != null) {
                  k.interestOps(ops);
                  k.attach(att);
              }
              if (k == null) {
                  // New registration
                  synchronized (keyLock) {
                      if (!isOpen())
                          throw new ClosedChannelException();
                      k = ((AbstractSelector)sel).register(this, ops, att);
                      addKey(k);
                  }
              }
              return k;
          }
      }

       

      1. 如果該channel和selector已經注冊過,則直接添加事件和附件。
      2. 否則通過selector實現注冊過程。
      protected final SelectionKey register(AbstractSelectableChannel ch,
            int ops,  Object attachment) {
          if (!(ch instanceof SelChImpl))
              throw new IllegalSelectorException();
          SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
          k.attach(attachment);
          synchronized (publicKeys) {
              implRegister(k);
          }
          k.interestOps(ops);
          return k;
      }
      
      protected void implRegister(SelectionKeyImpl ski) {
          synchronized (closeLock) {
              if (pollWrapper == null)
                  throw new ClosedSelectorException();
              growIfNeeded();
              channelArray[totalChannels] = ski;
              ski.setIndex(totalChannels);
              fdMap.put(ski);
              keys.add(ski);
              pollWrapper.addEntry(totalChannels, ski);
              totalChannels++;
          }
      }

       

      1. 以當前channel和selector為參數,初始化 SelectionKeyImpl 對象selectionKeyImpl ,并添加附件attachment。
      2. 如果當前channel的數量totalChannels等于SelectionKeyImpl數組大小,對SelectionKeyImpl數組和pollWrapper進行擴容操作。
      3. 如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
      4. pollWrapper.addEntry將把selectionKeyImpl中的socket句柄添加到對應的pollfd。
      5. k.interestOps(ops)方法最終也會把event添加到對應的pollfd。

      所以,不管serverSocketChannel,還是socketChannel,在selector注冊事件后,最終都保存在pollArray中。

      接著,再來看看selector中的select是如何實現一次獲取多個有事件發生的channel的。
      底層由selector實現類的doSelect方法實現,如下:

       protected int doSelect(long timeout) throws IOException {
              if (channelArray == null)
                  throw new ClosedSelectorException();
              this.timeout = timeout; // set selector timeout
              processDeregisterQueue();
              if (interruptTriggered) {
                  resetWakeupSocket();
                  return 0;
              }
              // Calculate number of helper threads needed for poll. If necessary
              // threads are created here and start waiting on startLock
              adjustThreadsCount();
              finishLock.reset(); // reset finishLock
              // Wakeup helper threads, waiting on startLock, so they start polling.
              // Redundant threads will exit here after wakeup.
              startLock.startThreads();
              // do polling in the main thread. Main thread is responsible for
              // first MAX_SELECTABLE_FDS entries in pollArray.
              try {
                  begin();
                  try {
                      subSelector.poll();
                  } catch (IOException e) {
                      finishLock.setException(e); // Save this exception
                  }
                  // Main thread is out of poll(). Wakeup others and wait for them
                  if (threads.size() > 0)
                      finishLock.waitForHelperThreads();
                } finally {
                    end();
                }
              // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
              finishLock.checkForException();
              processDeregisterQueue();
              int updated = updateSelectedKeys();
              // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
              resetWakeupSocket();
              return updated;
          }

       

      其中 subSelector.poll() 是select的核心,由native函數poll0實現,readFds、writeFds 和exceptFds數組用來保存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其余位置存放發生事件的socket句柄fd。

      private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
      private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
      private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
      private int poll() throws IOException{ // poll for the main thread
           return poll0(pollWrapper.pollArrayAddress,
                Math.min(totalChannels, MAX_SELECTABLE_FDS),
                   readFds, writeFds, exceptFds, timeout);
      }

      執行 selector.select() ,poll0函數把指向socket句柄和事件的內存地址傳給底層函數。

      1. 如果之前沒有發生事件,程序就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間內如果沒有事件,也會返回。
      2. 一旦有對應的事件發生,poll0方法就會返回。
      3. processDeregisterQueue方法會清理那些已經cancelled的SelectionKey
      4. updateSelectedKeys方法統計有事件發生的SelectionKey數量,并把符合條件發生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續使用。

      在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實現,是基于IO復用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。

      epoll原理

      epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。

      先看看使用c封裝的3個epoll系統調用:

      • int epoll_create(int size)
        epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。
      • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
        epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
      • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
        epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。

      大概看看epoll內部是怎么實現的:

      1. epoll初始化時,會向內核注冊一個文件系統,用于存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開辟自己的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用于存儲準備就緒的事件。
      2. 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后,就把socket插入到就緒鏈表里。
      3. 當epoll_wait調用時,僅僅觀察就緒鏈表里有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。

      epoll的兩種工作模式:

      • LT:level-trigger,水平觸發模式,只要某個socket處于readable/writable狀態,無論什么時候進行epoll_wait都會返回該socket。
      • ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會返回該socket。

      socket讀數據


      socket讀數據


      socket寫數據


      socket寫數據

      read實現

      通過遍歷selector中的SelectionKeyImpl數組,獲取發生事件的socketChannel對象,其中保存了對應的socket句柄,實現如下。

      public int read(ByteBuffer buf) throws IOException {
          if (buf == null)
              throw new NullPointerException();
          synchronized (readLock) {
              if (!ensureReadOpen())
                  return -1;
              int n = 0;
              try {
                  begin();
                  synchronized (stateLock) {
                      if (!isOpen()) {         
                          return 0;
                      }
                      readerThread = NativeThread.current();
                  }
                  for (;;) {
                      n = IOUtil.read(fd, buf, -1, nd);
                      if ((n == IOStatus.INTERRUPTED) && isOpen()) {
                          // The system call was interrupted but the channel
                          // is still open, so retry
                          continue;
                      }
                      return IOStatus.normalize(n);
                  }
              } finally {
                  readerCleanup();        // Clear reader thread
                  // The end method, which 
                  end(n > 0 || (n == IOStatus.UNAVAILABLE));
      
                  // Extra case for socket channels: Asynchronous shutdown
                  //
                  synchronized (stateLock) {
                      if ((n <= 0) && (!isInputOpen))
                          return IOStatus.EOF;
                  }
                  assert IOStatus.check(n);
              }
          }
      }

      通過Buffer的方式讀取socket的數據。

      wakeup實現

      public Selector wakeup() {
          synchronized (interruptLock) {
              if (!interruptTriggered) {
                  setWakeupSocket();
                  interruptTriggered = true;
              }
          }
          return this;
      }
      
      // Sets Windows wakeup socket to a signaled state.
      private void setWakeupSocket() {
         setWakeupSocket0(wakeupSinkFd);
      }
      private native void setWakeupSocket0(int wakeupSinkFd);

       

      看來wakeupSinkFd這個變量是為wakeup方法使用的。
      其中interruptTriggered為中斷已觸發標志,當pollWrapper.interrupt()之后,該標志即為true了;因為這個標志,連續兩次wakeup,只會有一次效果。

      為了實現client和server的數據交互,Linux下采用管道pipe實現,windows下采用兩個socket之間的通信進行實現,它們都有這樣的特性:

        1. 都有兩個端,一個 是read端,一個是write端,windows中兩個socket也是read和write的角色。
        2. 當往write端寫入 數據,則read端即可以收到數據。
      posted @ 2017-04-24 15:07  _1900  閱讀(3411)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 老司机久久99久久精品播放免费| 香港日本三级亚洲三级| 无码综合天天久久综合网| 亚洲精品日韩在线观看| 亚洲综合色婷婷中文字幕| 26uuu另类亚洲欧美日本| 日本视频一两二两三区| 国产日韩精品欧美一区灰| 亚洲成人av高清在线| 国产无套精品一区二区| 国产成人自拍小视频在线| 日本一本无道码日韩精品| 午夜国产精品福利一二| 午夜天堂精品久久久久| 四虎成人精品在永久免费| 国产精品免费中文字幕| 国产精品久久久久久福利| 免费播放一区二区三区| 欧美在线人视频在线观看| 久久99精品久久久久久9| 中文国产人精品久久蜜桃| 成人啪精品视频网站午夜 | 永久免费av网站可以直接看的 | 国产一级三级三级在线视| 精品国产成人国产在线观看| 国产午夜精品一区二区三| 九九成人免费视频| 婷婷综合久久中文字幕| 免费大片av手机看片高清| av在线播放日韩亚洲欧| 国产精品成人一区二区三区| 国内自拍第一区二区三区| 国产福利一区二区三区在线观看| 国产一区二区三区粉嫩av| 福利一区二区在线播放| 亚洲制服无码一区二区三区| 中文字幕av一区| 98精品全国免费观看视频| 精品国产一国产二国产三| 99国产欧美另类久久久精品| 麻豆国产传媒精品视频|