<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(六) MyNetty ByteBuf實(shí)現(xiàn)

      從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(六) MyNetty ByteBuf實(shí)現(xiàn)

      1. jdk Buffer介紹

      在上一篇博客中,lab5版本的MyNetty中實(shí)現(xiàn)了FastThreadLocal,為后續(xù)實(shí)現(xiàn)池化內(nèi)存分配功能打下了基礎(chǔ)。池化內(nèi)存分配是netty中非常核心也非常復(fù)雜的一個(gè)功能,沒(méi)法在一次迭代中完整的實(shí)現(xiàn),MyNetty打算分為4個(gè)迭代逐步的將其實(shí)現(xiàn)。按照計(jì)劃,本篇博客中,lab6版本的MyNetty需要實(shí)現(xiàn)一個(gè)非常基礎(chǔ)的,非池化的ByteBuf作為后續(xù)迭代的基礎(chǔ)。
      由于本文屬于系列博客,讀者需要對(duì)之前的博客內(nèi)容有所了解才能更好地理解本文內(nèi)容。

      在前面的實(shí)驗(yàn)中,MyNetty中用來(lái)承載消息的容器一直是java中nio包下的ByteBuffer。與FastThreadLocal類似,Netty同樣不滿足于jdk自帶的ByteBuffer,而是基于ByteBuffer實(shí)現(xiàn)了性能更好,功能更強(qiáng)大的ByteBuf容器。
      但在學(xué)習(xí)Netty的ByteBuf容器之前,我們還是需要先了解jdk中的ByteBuffer工作原理。只有在理解了jdk原生的ByteBuffer的實(shí)現(xiàn)原理和優(yōu)缺點(diǎn)后,我們才能更好的理解Netty中的ByteBuf和它的優(yōu)勢(shì)。

      jdk中的Buffer是一個(gè)巨大的多維層次體系,按照所存儲(chǔ)的數(shù)據(jù)類型可以分為byte、int、short等,按照底層承載數(shù)據(jù)的內(nèi)存區(qū)域的不同可以分為基于堆內(nèi)存(heap)的和基于堆外內(nèi)存(direct),按照是否僅可讀可以分為普通可讀可寫(xiě)的buffer和只讀buffer。
      Buffer按照類的繼承關(guān)系將這幾個(gè)維度以多層的子類繼承關(guān)系組織起來(lái),其中java.nio.Buffer是最頂層的抽象類。
      第二層按照所存儲(chǔ)的數(shù)據(jù)類型可以分為ByteBuffer、IntBuffer和ShortBuffer等直接子類;按照底層承載數(shù)據(jù)的內(nèi)存區(qū)域可以進(jìn)一步分為HeapByteBuffer、DirectByteBuffer、HeapIntBuffer等;更進(jìn)一步的對(duì)于只讀的Buffer容器又有HeapByteBufferR、DirectByteBufferR、HeapIntBufferR等子類實(shí)現(xiàn)(結(jié)尾的R是ReadOnly的意思)。

      Buffer主要子類示意圖

      image

      計(jì)算機(jī)中數(shù)據(jù)底層基本都是以Byte字節(jié)維度存儲(chǔ)的,而Int、Short、Double等數(shù)據(jù)類型都是基于字節(jié)的,因此整個(gè)Buffer體系中最核心的就是ByteBuffer。
      限于篇幅,本文中將只重點(diǎn)分析最重要的ByteBuffer以及其直接子類HeapByteBuffer的工作原理,相信在理解了整個(gè)Buffer體系中最核心的機(jī)制后,讀者能對(duì)jdk的ByteBuffer體系能有一個(gè)大致的理解,有利于后續(xù)理解Netty的ByteBuf容器。

      Buffer類

      首先我們先看看最頂層的Buffer類,在Buffer類中共定義了四個(gè)非常關(guān)鍵的int類型的屬性,分別是capacity、limit、position和mark。

      • capacity屬性代表著當(dāng)前Buffer容器的可容納的最大總元素個(gè)數(shù),不能為負(fù)數(shù)并且不可變。capacity=10,對(duì)于ByteBuffer代表著最多能存放10個(gè)字節(jié)的數(shù)據(jù);而對(duì)于IntBuffer則代表著最多能存放是10個(gè)整型的數(shù)據(jù)。可以簡(jiǎn)單的將Buffer邏輯上視為有著一個(gè)長(zhǎng)度為capacity的底層數(shù)組,數(shù)組的類型與當(dāng)前Buffer的類型一致(實(shí)際也可能是用Byte數(shù)組模擬Int數(shù)組)。
      • jdk中的Buffer分為兩種模式,一種是寫(xiě)模式,一種是讀模式。寫(xiě)模式下只能往Buffer中put寫(xiě)入數(shù)據(jù),不能讀取數(shù)據(jù);讀模式下只能從Buffer中g(shù)et讀出數(shù)據(jù),不能寫(xiě)入數(shù)據(jù)。
        Buffer在剛被創(chuàng)建初始化時(shí),是空的,屬于寫(xiě)模式。在寫(xiě)入數(shù)據(jù)后,可以通過(guò)flip方法將Buffer容器切換為讀模式(具體的原理下面會(huì)分析)。
      • position屬性在寫(xiě)模式下代表著當(dāng)前的寫(xiě)指針,下一次寫(xiě)入的數(shù)據(jù)將會(huì)被寫(xiě)入底層數(shù)組中index=position的位置;而在讀模式下,則代表著當(dāng)前的讀指針,即下一次讀取的數(shù)據(jù)將會(huì)是底層數(shù)組中index=position的位置。
      • limit屬性用于校驗(yàn)以防止越界,在寫(xiě)模式下代表著第一個(gè)不可寫(xiě)的位置,默認(rèn)情況下寫(xiě)模式中l(wèi)imit的值等于capacity。
        而在讀模式下,代表著第一個(gè)不可讀的位置。舉個(gè)例子,一個(gè)capacity=10的ByteBuffer,在寫(xiě)入了8個(gè)字節(jié)后,limit=8;通過(guò)flip方法轉(zhuǎn)為讀模式后,limit就由寫(xiě)模式下的10,轉(zhuǎn)變?yōu)榱?,代表著最多只能讀取8個(gè)字節(jié)。因?yàn)樵偻笞x,就相當(dāng)于數(shù)組下標(biāo)越界了,讀取了從未寫(xiě)過(guò)的數(shù)據(jù)。
      • mark屬性用于臨時(shí)記錄一下當(dāng)前position的位置,比如處理拆包黏包問(wèn)題時(shí)可以嘗試著向后讀取更多的數(shù)據(jù),但未獲取到完整包時(shí)可以通過(guò)mark指針退回到初始的位置,等待完整的包來(lái)臨。
      • 四個(gè)指針屬性之間存在著嚴(yán)格的大小關(guān)系,即mark <= position <= limit <= capacity。
        capacity用于控制整個(gè)Buffer的大小,任意的讀寫(xiě)都不可越過(guò)capacity的限制。
        limit用于規(guī)范不可讀寫(xiě)的位置,防止越界,其不會(huì)超過(guò)capacity;其無(wú)論是在寫(xiě)模式還是讀模式下作為position的邊界值,都一定大于或等于position。
        position用于記錄當(dāng)前的相對(duì)讀寫(xiě)位置,其無(wú)法超過(guò)limit。
        mark用于記錄當(dāng)前position的位置,其不會(huì)超過(guò)position。當(dāng)limit或者position因?yàn)槟承┰蚨s小,導(dǎo)致其小于當(dāng)前的mark時(shí),mark會(huì)被廢棄被還原為初始值-1。
      • Buffer類中還提供了三個(gè)用于切換容器形態(tài)的重要方法,分別是flip、rewind和clear方法。
      • flip方法用于在寫(xiě)入完成后,令Buffer轉(zhuǎn)換到讀模式。flip操作會(huì)將position歸零,而limit被設(shè)置為之前寫(xiě)模式下的position以避免讀取越界。
      • rewind方法一般用于在讀取數(shù)據(jù)后,重置讀指針以方便重復(fù)的讀取。rewind操作中只會(huì)簡(jiǎn)單的將position歸零,但不修改limit的值。
      • clear方法用于清空Buffer容器,重新寫(xiě)入新的數(shù)據(jù)。clear操作會(huì)將position設(shè)置為0,而limit設(shè)置為等于capacity。設(shè)置完成后的指針位置與Buffer剛被創(chuàng)建時(shí)一樣。
      Buffer結(jié)構(gòu)圖

      Buffer-Structure

      /**
       * 基本copy自jdk的Buffer類,但做了簡(jiǎn)化
       * */
      public abstract class MyBuffer {
      
          /**
           * buffer的capacity代表著總?cè)萘看笮∠拗疲荒転樨?fù)數(shù)并且不可變
           * */
          private int capacity;
      
          /**
           * buffer的limit標(biāo)識(shí)著第一個(gè)不可讀或者寫(xiě)的index位置,不能為負(fù)數(shù)并且不能大于capacity
           * */
          private int limit;
      
          /**
           * buffer的position標(biāo)識(shí)著下一個(gè)讀或者寫(xiě)的元素的index位置,不能為負(fù)數(shù)并且不能大于limit
           * */
          private int position = 0;
      
          /**
           * buffer的mark是用于reset方法(重置)被調(diào)用時(shí),將position的值重試為mark對(duì)應(yīng)下標(biāo).
           * mark并不總是被定義,但當(dāng)它被定義時(shí),它不會(huì)為負(fù)數(shù),并且不會(huì)超過(guò)position
           * 如果mark被定義了,則它將會(huì)在position或者limit被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
           * 如果mark沒(méi)有被定義,則在調(diào)用reset方法時(shí)將會(huì)拋出InvalidMarkException
           * */
          private int mark = -1;
      
          // Used only by direct buffers
          // NOTE: hoisted here for speed in JNI GetDirectBufferAddress
          /**
           * 有兩種Buffer,分別基于堆內(nèi)內(nèi)存和堆外內(nèi)存
           * 堆外內(nèi)存中,這個(gè)屬性標(biāo)示堆外內(nèi)存具體的起始地址, MyNetty中暫時(shí)用不到
           * */
          long address;
      
          MyBuffer(int mark, int pos, int lim, int cap) {       // package-private
              if (cap < 0) {
                  // The capacity of a buffer is never negative
                  throw new IllegalArgumentException("Negative capacity: " + cap);
              }
              this.capacity = cap;
              setLimit(lim);
              setPosition(pos);
              if (mark >= 0) {
                  // The mark is not always defined, but when it is defined it is never negative and is never greater than the position.
                  if (mark > pos) {
                      throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")");
                  }
                  this.mark = mark;
              }
          }
      
          public int getCapacity() {
              return capacity;
          }
      
          public int getPosition() {
              return position;
          }
      
          public int getLimit() {
              return limit;
          }
      
          public int getMark() {
              return mark;
          }
      
          final void discardMark() {
              mark = -1;
          }
      
          /**
           * 是否是只讀的
           * */
          public abstract boolean isReadOnly();
      
          public final MyBuffer setLimit(int newLimit) {
              if ((newLimit > capacity) || (newLimit < 0)) {
                  throw new IllegalArgumentException();
              }
      
              // 給limit賦值
              limit = newLimit;
              if (position > newLimit) {
                  // 確保position不能大于limit
                  position = newLimit;
              }
              if (mark > newLimit) {
                  // 如果mark被定義了,則它將會(huì)在position被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
                  mark = -1;
              }
      
              return this;
          }
      
          public final MyBuffer setPosition(int newPosition) {
              if ((newPosition > limit) || (newPosition < 0)) {
                  throw new IllegalArgumentException("invalid newPosition=" + newPosition + " limit=" + limit);
              }
              if (mark > newPosition) {
                  // 如果mark被定義了,則它將會(huì)在limit被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
                  mark = -1;
              }
              // 給position賦值
              position = newPosition;
              return this;
          }
      
          public final MyBuffer mark() {
              // 將mark記錄為當(dāng)前position的位置
              this.mark = this.position;
              return this;
          }
      
          /**
           * buffer的mark是用于reset方法(重置)被調(diào)用時(shí),將position的值重試為mark對(duì)應(yīng)下標(biāo).
           * */
          public final MyBuffer reset() {
              int m = mark;
              if (m < 0) {
                  // 如果mark沒(méi)有被定義,則在調(diào)用reset方法時(shí)將會(huì)拋出InvalidMarkException
                  throw new InvalidMarkException();
              }
              position = m;
              return this;
          }
      
          /**
           * 令buffer準(zhǔn)備好作為一個(gè)新的序列用于channel讀操作或者說(shuō)讓channel將數(shù)據(jù)put進(jìn)來(lái)
           * 設(shè)置limit為capacity并且將position設(shè)置為0
           *
           * 注意:clear并沒(méi)有真正的將buffer里的數(shù)據(jù)完全清零,而僅僅是通過(guò)修改關(guān)鍵屬性的方式邏輯進(jìn)行了邏輯上的clear,這樣性能更好
           * */
          public final MyBuffer clear() {
              position = 0;
              limit = capacity;
              mark = -1;
              return this;
          }
      
          /**
           * 令buffer準(zhǔn)備好作為一個(gè)新的序列用于channel寫(xiě)操作或者說(shuō)讓channel將寫(xiě)進(jìn)去的數(shù)據(jù)get走:
           * 設(shè)置limit為當(dāng)前的position的值,并且將position設(shè)置為0
           * */
          public final MyBuffer flip() {
              limit = position;
              position = 0;
              mark = -1;
              return this;
          }
      
          /**
           * 讓一個(gè)buffer準(zhǔn)備好重新讀取數(shù)據(jù),即在limit不變的情況下,將position設(shè)置為0
           * 因?yàn)樽x取操作會(huì)不斷地推進(jìn)position的位置,重置position為0,相當(dāng)于允許讀取重頭讀(類似磁帶進(jìn)行了倒帶,即rewind)
           * */
          public final MyBuffer rewind() {
              position = 0;
              mark = -1;
              return this;
          }
      
          /**
           * 返回當(dāng)前buffer還剩余可用的元素個(gè)數(shù)(即limit-position)
           * */
          public final int remaining() {
              int rem = limit - position;
              return Math.max(rem, 0);
          }
      
          /**
           * 是否還有剩余可用的元素個(gè)數(shù)
           * */
          public final boolean hasRemaining() {
              return position < limit;
          }
      
          static void checkBounds(int off, int len, int size) {
              if ((off | len | (off + len) | (size - (off + len))) < 0) {
                  throw new IndexOutOfBoundsException();
              }
          }
      
          final int nextGetIndex() {
              int p = position;
              if (p >= limit) {
                  throw new BufferUnderflowException();
              }
              position = p + 1;
              return p;
          }
      
          final int nextGetIndex(int nb) {
              int p = position;
              if (limit - p < nb){
                  throw new BufferUnderflowException();
              }
              position = p + nb;
              return p;
          }
      
          final int nextPutIndex() {
              int p = position;
              if (p >= limit) {
                  // 相比nextGetIndex,拋出的異常不同
                  throw new BufferOverflowException();
              }
              position = p + 1;
              return p;
          }
      
          final int nextPutIndex(int nb) {
              int p = position;
              if (limit - p < nb) {
                  throw new BufferOverflowException();
              }
              position = p + nb;
              return p;
          }
      
          final int checkIndex(int i) {
              if ((i < 0) || (i >= limit)) {
                  throw new IndexOutOfBoundsException();
              }
              return i;
          }
      
          final int checkIndex(int i, int nb) {
              if ((i < 0) || (nb > limit - i)) {
                  throw new IndexOutOfBoundsException();
              }
              return i;
          }
      }
      

      ByteBuffer類

      • ByteBuffer類顧名思義,是承載Byte類型的數(shù)據(jù)的Buffer容器。前面提到Buffer承載底層數(shù)組的內(nèi)存分為兩種類型,一種是位于jvm堆內(nèi)存的HeapXXXBuffer,一種是位于堆外的直接內(nèi)存的DirectXXXBuffer。
        HeapByteBuffer中底層數(shù)組直接就是成員變量byte[] hb。而DirectByteBuffer中的底層數(shù)組位于堆外,數(shù)組的內(nèi)存起始地址由父類ByteBuffer中的address維護(hù),訪問(wèn)時(shí)通過(guò)Unsafe方法address與實(shí)際的偏移量來(lái)訪問(wèn)對(duì)應(yīng)位置的數(shù)據(jù)。
        在本篇博客中,限于篇幅只介紹基于堆內(nèi)存的ByteBuffer實(shí)現(xiàn)HeapByteBuffer,因?yàn)樽鳛锽uffer其基本的工作原理上HeapByteBuffer與DirectByteBuffer并沒(méi)有特別大的不同,都是在底層數(shù)組上讀寫(xiě)對(duì)應(yīng)的數(shù)據(jù)。
      • ByteBuffer提供了基于各種數(shù)據(jù)類型的讀(get)與寫(xiě)(put)方法,其中分為相對(duì)操作與絕對(duì)操作之分,絕對(duì)操作與相對(duì)操作的區(qū)別在于多了一個(gè)index參數(shù),比如相對(duì)操作get()和絕對(duì)操作get(int index)。
        絕對(duì)操作很好理解,就是在底層數(shù)組指定的index位置讀取或者寫(xiě)入對(duì)應(yīng)的數(shù)據(jù)。而相對(duì)操作其實(shí)也需要一個(gè)index指針,而這個(gè)index指針就是前面介紹過(guò)的位于Buffer內(nèi)部的position指針。相對(duì)操作在進(jìn)行了讀或者寫(xiě)操作后,會(huì)增加或者說(shuō)推進(jìn)position的值。
        絕大多數(shù)情況下,我們都是使用相對(duì)操作來(lái)讀寫(xiě)操作Buffer的,因?yàn)榭梢宰孊uffer內(nèi)部自行維護(hù)讀或者寫(xiě)指針,比較方便。
      • ByteBuffer因?yàn)槠浯鎯?chǔ)的是最基礎(chǔ)的Byte類型,因此其很容易拓展轉(zhuǎn)換為其它數(shù)據(jù)類型的Buffer容器。
        ByteBuffer除了提供byte類型的讀寫(xiě)方法外,也提供了getInt、putInt、getShort、putLong等等其它數(shù)據(jù)類型讀寫(xiě)的方法。其底層本質(zhì)上是將1或N個(gè)字節(jié)看做一個(gè)完整的特定數(shù)據(jù)類型進(jìn)行寫(xiě)入或讀取。
        比如通過(guò)相對(duì)寫(xiě)操作putInt將一個(gè)int類型的數(shù)據(jù)寫(xiě)入Buffer等價(jià)于一次寫(xiě)入了4個(gè)byte,position自增4。讀取操作則是從特定位置開(kāi)始,將包含自身在內(nèi)的共四個(gè)byte視作一個(gè)int類型返回。
        同時(shí)ByteBuffer也提供了諸如asCharBuffer、asIntBuffer等等將自身轉(zhuǎn)換成邏輯上等價(jià)的其它類型Buffer的方法方便使用。
        Buffer容器中,特別是關(guān)于堆外內(nèi)存的使用還有很多細(xì)節(jié)值得研究,但這不是MyNetty系列博客的重點(diǎn),感興趣的讀者可以自行閱讀jdk的對(duì)應(yīng)源碼或者相關(guān)資料,限于個(gè)人能力這里就不再展開(kāi)了。
      大端法與小端法
      • 前面我們提到,Buffer對(duì)于int或者long等多個(gè)byte字節(jié)構(gòu)成的數(shù)據(jù),是通過(guò)將多個(gè)字節(jié)合并在一起視為整體來(lái)實(shí)現(xiàn)的。而實(shí)際上這里面存在一個(gè)問(wèn)題,即從前到后的哪個(gè)字節(jié)代表高位,哪個(gè)字節(jié)代表低位。如果網(wǎng)絡(luò)傳輸?shù)葓?chǎng)景下兩邊的表示方式不一致,則讀取到的字節(jié)流可能會(huì)被錯(cuò)誤的解析。
      • 內(nèi)存地址的示意圖一般是從左到右,從低到高排列的。作為普通人來(lái)說(shuō),看數(shù)字時(shí)也習(xí)慣了高位在左(前),低位在右(后)的模式(1024,千位在前,個(gè)位在后)。這種低位內(nèi)存的字節(jié)代表高位,高位內(nèi)存的字節(jié)代表低位的表示方法叫做大端法。與之相對(duì)的,低位內(nèi)存代表低位,高位內(nèi)存代表高位的表示方法則被叫做小端法。
      • 讀者可能會(huì)有疑問(wèn),既然大端法符合人們的直觀理解,為什么不讓所有的軟硬件系統(tǒng)都統(tǒng)一使用大端法存儲(chǔ)數(shù)據(jù)呢?免得還要互相之間各種約定,避免轉(zhuǎn)換錯(cuò)誤。
        這是因?yàn)樾《朔ㄔ谧鲆恍?qiáng)制類型轉(zhuǎn)換等基礎(chǔ)的底層操作時(shí),硬件性能更好。舉個(gè)例子,有一個(gè)占4字節(jié)的int數(shù)據(jù),想要強(qiáng)制轉(zhuǎn)換成一個(gè)short類型。對(duì)于小端法而言,只需要從起始位置開(kāi)始尋址找到兩個(gè)字節(jié)返回即可,因?yàn)榈臀蛔止?jié)代表低位,強(qiáng)轉(zhuǎn)時(shí)忽略高位即可。而大端法必須基于起始位置四個(gè)字節(jié)中的后兩個(gè)字節(jié)才能轉(zhuǎn)換成功。
        在性能重于一切的底層硬件或操作系統(tǒng)層面,這種效率上的差異不能完全的忽略。所以時(shí)至今日,依然有很多的硬件和操作系統(tǒng)使用小端法維護(hù)int等類型的數(shù)據(jù)。
      • 因此,jdk的ByteBuffer中支持指定以小端法或者大端法來(lái)存儲(chǔ)數(shù)據(jù),這樣可以在網(wǎng)絡(luò)傳輸?shù)刃枰獙?nèi)存數(shù)據(jù)編碼為字節(jié)流的場(chǎng)景下,讓用戶能自由的轉(zhuǎn)換為約定好的,統(tǒng)一的表示方式。
      /**
       * 基本copy自jdk的ByteBuffer類,但做了簡(jiǎn)化
       * */
      public abstract class MyByteBuffer extends MyBuffer {
      
          // These fields are declared here rather than in Heap-X-Buffer in order to
          // reduce the number of virtual method invocations needed to access these
          // values, which is especially costly when coding small buffers.
          //
          final byte[] hb;                  // Non-null only for heap buffers
          final int offset;
          boolean isReadOnly;                 // Valid only for heap buffers
      
          MyByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
              super(mark, pos, lim, cap);
      
              this.hb = hb;
              this.offset = offset;
          }
      
          /**
           * 創(chuàng)建一個(gè)指定了capacity的堆內(nèi)ByteBuffer
           * */
          public static MyByteBuffer allocate(int capacity) {
              if (capacity < 0) {
                  throw new IllegalArgumentException();
              }
      
              // 簡(jiǎn)單起見(jiàn),只支持基于堆內(nèi)存的HeapByteBuffer
              return new MyHeapByteBuffer(capacity, capacity);
          }
      
          /**
           * 相對(duì)讀操作
           * */
          public abstract byte get();
      
          /**
           * 絕對(duì)讀操作
           * */
          public abstract byte get(int index);
      
          /**
           * 相對(duì)寫(xiě)操作
           * */
          public abstract MyByteBuffer put(byte b);
      
          public final MyByteBuffer put(byte[] src) {
              return put(src, 0, src.length);
          }
      
          public MyByteBuffer put(byte[] src, int offset, int length) {
              checkBounds(offset, length, src.length);
              if (length > remaining()) {
                  throw new BufferOverflowException();
              }
              int end = offset + length;
              for (int i = offset; i < end; i++) {
                  // 循環(huán)put寫(xiě)入整個(gè)字節(jié)數(shù)組
                  this.put(src[i]);
              }
              return this;
          }
      
          /**
           * 絕對(duì)寫(xiě)操作
           * */
          public abstract MyByteBuffer put(int index, byte b);
      
          /**
           * 相對(duì)的批量讀操作
           * 將當(dāng)前buffer的length個(gè)字節(jié),寫(xiě)入指定dst數(shù)組。寫(xiě)入的起始下標(biāo)位置是offset
           * */
          public MyByteBuffer get(byte[] dst, int offset, int length) {
              checkBounds(offset, length, dst.length);
              if (length > remaining()) {
                  // 所要讀取的字節(jié)數(shù)不能超過(guò)當(dāng)前buffer總的剩余可讀取數(shù)量
                  throw new BufferUnderflowException();
              }
              int end = offset + length;
              for (int i = offset; i < end; i++) {
                  dst[i] = get();
              }
              return this;
          }
      
          /**
           * 相對(duì)的批量讀操作
           * 將當(dāng)前buffer中的數(shù)據(jù)寫(xiě)入指定dst數(shù)組。寫(xiě)入的起始下標(biāo)是0,length為dst的總長(zhǎng)度
           * */
          public MyByteBuffer get(byte[] dst) {
              return get(dst, 0, dst.length);
          }
      
          /**
           * 壓縮操作(也算寫(xiě)操作)
           * */
          public abstract MyByteBuffer compact();
      
          /**
           * 默認(rèn)是大端
           * */
          boolean bigEndian = true;
      
          public final ByteOrder order() {
              return bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN;
          }
      
          public final MyByteBuffer order(ByteOrder bo) {
              bigEndian = (bo == ByteOrder.BIG_ENDIAN);
      
              return this;
          }
      
          // 簡(jiǎn)單起見(jiàn)省略掉別的數(shù)據(jù)類型,僅支持int類型
      
          public abstract int getInt();
      
          public abstract int getInt(int index);
      
          public abstract MyByteBuffer putInt(int value);
      
          public abstract MyByteBuffer putInt(int index, int value);
      
          abstract byte _get(int i);                          // package-private
          abstract void _put(int i, byte b);                  // package-private
      
      }
      
      /**
       * 基本copy自jdk的HeapByteBuffer,但做了簡(jiǎn)化
       * */
      public class MyHeapByteBuffer extends MyByteBuffer{
      
          MyHeapByteBuffer(int cap, int lim) {
              super(-1, 0, lim, cap, new byte[cap], 0);
          }
      
          protected int ix(int i) {
              return i + offset;
          }
      
          @Override
          public byte get() {
              // 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
              int nextGetIndex = nextGetIndex();
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(nextGetIndex);
              return hb[finallyIndex];
          }
      
          @Override
          public byte get(int index) {
              // 檢查index的合法性,必須是0 <= index < limit,避免越界
              checkIndex(index);
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(index);
              return hb[finallyIndex];
          }
      
          @Override
          public MyByteBuffer put(byte b) {
              // 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
              int nextPutIndex = nextPutIndex();
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(nextPutIndex);
      
              // 將b放入對(duì)應(yīng)的index處
              hb[finallyIndex] = b;
              return this;
          }
      
          @Override
          public MyByteBuffer put(int index, byte b) {
              // 檢查index的合法性,必須是0 <= index < limit,避免越界
              checkIndex(index);
      
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(index);
      
              // 將b放入對(duì)應(yīng)的index處
              hb[finallyIndex] = b;
              return this;
          }
      
          @Override
          byte _get(int i) {
              return hb[i];
          }
      
          @Override
          void _put(int i, byte b) {
              hb[i] = b;
          }
      
          @Override
          public MyByteBuffer compact() {
              // 把底層數(shù)組中(position+offset)到(limit+offset)之間的內(nèi)容整體往前挪,挪到數(shù)組起始處(0+offset), 實(shí)際內(nèi)容的長(zhǎng)度=remaining
              System.arraycopy(hb, ix(getPosition()), hb, ix(0), remaining());
      
              // 要理解下面position和limit的變化,需要意識(shí)到compact是一次從讀模式切換到寫(xiě)模式的操作(之前讀完了,就把剩下的還沒(méi)有讀完的壓縮整理一下)
      
              // 壓縮后實(shí)際內(nèi)容的長(zhǎng)度=remaining,所以壓縮完后position,也就是后續(xù)開(kāi)始寫(xiě)的位置就是從remaining開(kāi)始
              setPosition(remaining());
      
              // 寫(xiě)模式下,limit當(dāng)然就變成了capacity了
              setLimit(getCapacity());
      
              // 壓縮后,mark沒(méi)意義了,就直接丟棄掉
              discardMark();
              return this;
          }
      
          @Override
          public int getInt() {
              // 相比get方法獲得1個(gè)字節(jié),getInt一次要讀取4個(gè)字節(jié),所以position一次性推進(jìn)4字節(jié)
              int nextGetIndex = nextGetIndex(4);
              int finallyIndex = ix(nextGetIndex);
      
              // 從指定的index處讀取4個(gè)字節(jié),構(gòu)造成1個(gè)int類型返回(基于bigEndian,決定如何解析這4個(gè)字節(jié)(大端還是小端))
              return BitsUtil.getInt(this, finallyIndex, bigEndian);
          }
      
          @Override
          public int getInt(int index) {
              // 檢查index的合法性,必須是0 <= index < limit-4,避免越界
              checkIndex(index,4);
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(index);
      
              // 從指定的index處讀取4個(gè)字節(jié),構(gòu)造成1個(gè)int類型返回(基于bigEndian,決定如何解析這4個(gè)字節(jié)(大端還是小端))
              return BitsUtil.getInt(this, finallyIndex, bigEndian);
          }
      
          @Override
          public MyByteBuffer putInt(int value) {
              // 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
              // 相比put方法寫(xiě)入1個(gè)字節(jié),putInt一次要讀取4個(gè)字節(jié),所以position一次性推進(jìn)4字節(jié)
      
              int nextPutIndex = nextPutIndex(4);
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(nextPutIndex);
      
              // 向指定的index處寫(xiě)入大小為4個(gè)字節(jié)的1個(gè)int數(shù)據(jù)(基于bigEndian,決定寫(xiě)入字節(jié)的順序(大端還是小端))
              BitsUtil.putInt(this, finallyIndex, value, bigEndian);
              return this;
          }
      
          @Override
          public MyByteBuffer putInt(int index, int value) {
              // 檢查index的合法性,必須是0 <= index < limit-4,避免越界
              checkIndex(index,4);
      
              // 加上offset偏移量獲得最終的index值
              int finallyIndex = ix(index);
      
              // 向指定的index處寫(xiě)入大小為4個(gè)字節(jié)的1個(gè)int數(shù)據(jù)(基于bigEndian,決定寫(xiě)入字節(jié)的順序(大端還是小端))
              BitsUtil.putInt(this, finallyIndex, value, bigEndian);
              return this;
          }
      
          @Override
          public boolean isReadOnly() {
              // 可讀/可寫(xiě)
              return false;
          }
      }
      

      jdk中Buffer的體系是一個(gè)非常強(qiáng)大而又復(fù)雜的體系,上面介紹的關(guān)于Buffer體系的內(nèi)容只是其中比較核心的部分內(nèi)容。除此之外還有很多關(guān)于Buffer內(nèi)存映射、零拷貝、切片視圖、堆外內(nèi)存回收等等更多的內(nèi)容限于篇幅在本文中完全沒(méi)有涉及。
      但對(duì)于幫助讀者理解為什么Netty要在jdk的ByteBuffer基礎(chǔ)上再封裝一個(gè)ByteBuf,個(gè)人認(rèn)為這些內(nèi)容已經(jīng)足夠了。

      2. Netty ByteBuf介紹

      前面一節(jié)中對(duì)jdk的Buffer容器的核心工作原理進(jìn)行了介紹。Buffer容器是一個(gè)設(shè)計(jì)精巧又功能強(qiáng)大的工具,但其還是存在著以下明顯缺點(diǎn):

      1. 由于存在讀寫(xiě)兩種模式,在使用時(shí)需要時(shí)刻關(guān)注當(dāng)前的模式,并通過(guò)flip、rewind、compact、clear等方法轉(zhuǎn)換模式,一旦搞錯(cuò)模式就會(huì)釀成大錯(cuò)。在面對(duì)較為復(fù)雜的頻繁切換模式的場(chǎng)景時(shí),開(kāi)發(fā)者的心智負(fù)擔(dān)會(huì)很重。
      2. 不支持自動(dòng)的擴(kuò)容。Buffer容器通過(guò)一個(gè)底層數(shù)組來(lái)存儲(chǔ)元素,其自身與數(shù)組一樣不支持動(dòng)態(tài)的擴(kuò)容,一旦在創(chuàng)建時(shí)指定了capacity,后續(xù)無(wú)法再擴(kuò)大。
        不支持?jǐn)U容的Buffer,要么在創(chuàng)建時(shí)預(yù)設(shè)一個(gè)非常大的capacity,要么就需要在容量不足時(shí)手動(dòng)的將Buffer中的數(shù)據(jù)轉(zhuǎn)移到新的更大空間的Buffer中。前者會(huì)浪費(fèi)內(nèi)存,后者則非常麻煩且性能不高。
      3. jdk的DirectBuffer中的堆外底層數(shù)組的內(nèi)存回收基于PhantomReference,在gc時(shí)觸發(fā)回收動(dòng)作。因此在內(nèi)存回收上存在一定的延時(shí)性,在需要大量創(chuàng)建并銷毀Buffer容器的場(chǎng)景下,性能較差。
      4. 沒(méi)有支持池化復(fù)用的機(jī)制。每個(gè)Buffer在需要時(shí)都需要臨時(shí)的分配內(nèi)存空間,并在不需要時(shí)進(jìn)行釋放。在Buffer被大量使用的場(chǎng)景下,反復(fù)的創(chuàng)建和銷毀基于Buffer會(huì)對(duì)GC造成很大壓力,而對(duì)于基于堆外內(nèi)存的DirectBuffer來(lái)說(shuō)由于堆外內(nèi)存回收機(jī)制的延遲也會(huì)對(duì)堆外內(nèi)存的使用帶來(lái)不小的壓力。
        當(dāng)然,缺點(diǎn)都是比較出來(lái)的,相比起Netty中更強(qiáng)大的ByteBuf,jdk中Buffer的缺點(diǎn)遠(yuǎn)不止此。相信讀者在理解了Netty中ByteBuf體系后,會(huì)加深對(duì)其的理解。
      ByteBuf層次體系

      與jdk的Buffer體系類似,Netty中的ByteBuf同樣是通過(guò)不同層次子類的組合來(lái)實(shí)現(xiàn)不同屬性的各種ByteBuf。

      • Netty作為一個(gè)網(wǎng)絡(luò)框架,其只專注于最通用的Byte類型元素的容器存儲(chǔ),所以底層的容器類直接就是ByteBuf,沒(méi)有IntBuf、ShortBuf這些子類。
      • Netty支持基于引用計(jì)數(shù)的自動(dòng)容器回收機(jī)制,可以在容器不再被使用時(shí),即時(shí)的將容器所占用的內(nèi)存回收掉,所以抽象出了AbstractReferenceCountedByteBuf類。
      • Netty支持池化容器,因此設(shè)計(jì)了PooledByteBuf子類。
      • Netty同樣支持堆內(nèi)和堆外兩種不同內(nèi)存類型,因此更進(jìn)一步的抽象出了PooledHeapByteBuf、PooledDirectByteBuf、UnpooledHeapByteBuf和UnpooledDirectByteBuf這四個(gè)核心子類。
      • 除此之外,Netty還提供了注入繞過(guò)數(shù)組越界檢查的基于Unsafe的PooledUnsafeHeapByteBuf、PooledUnsafeDirectByteBuf;也提供了基于切片,邏輯視圖的零拷貝的CompositeByteBuf,以及各種功能強(qiáng)大,用處各異的子類實(shí)現(xiàn)。
      ByteBuf主要子類示意圖

      ByteBuf

      下面我們來(lái)看看netty是如何優(yōu)化上述jdk的Buffer容器的缺點(diǎn)的。

      允許同時(shí)進(jìn)行讀和寫(xiě)
      • 首先針對(duì)Buffer中讀寫(xiě)模式無(wú)法共存,需要時(shí)刻注意當(dāng)前模式的問(wèn)題。Netty中的ByteBuf設(shè)計(jì)中引入了讀寫(xiě)兩個(gè)指針(AbstractByteBuf中的readerIndex和writerIndex),而非只有一個(gè)position指針。
        ByteBuf同樣支持相對(duì)讀寫(xiě)(比如readByte、writeByte)與參數(shù)中指定index下標(biāo)位置的絕對(duì)讀寫(xiě)(比如getByte、setByte)。在相對(duì)讀操作會(huì)自動(dòng)的推進(jìn)讀指針readerIndex,而在相對(duì)寫(xiě)操作中則會(huì)自動(dòng)推進(jìn)寫(xiě)指針writerIndex。
        因?yàn)橥瑫r(shí)維護(hù)了讀寫(xiě)指針的原因,netty的ByteBuf可以同時(shí)的進(jìn)行讀和寫(xiě),而不用關(guān)心當(dāng)前是屬于什么模式,也無(wú)需使用flip等方法切換形態(tài)。
      • 與ByteBuffer類似,netty的ByteBuf內(nèi)部的指針屬性同樣有一個(gè)嚴(yán)格的大小關(guān)系,用于防止寫(xiě)操作超過(guò)容量,也防止讀操作讀取到未實(shí)際寫(xiě)入的非法區(qū)域。即readerIndex <= writerIndex <= capacity <= maxCapacity。
      支持自動(dòng)擴(kuò)容
      • maxCapacity代表最大容量,與ByteBuffer中capacity類似,是ByteBuf的最大容量限制。但與ByteBuffer不同的是,netty的ByteBuf中的maxCapacity不等于其底層數(shù)組實(shí)際的容量,很多情況下只是起到一個(gè)閾值的作用。
      • capacity才代表ByteBuf實(shí)際底層數(shù)組的大小(capacity方法),在寫(xiě)入數(shù)據(jù)時(shí),會(huì)檢查當(dāng)前的capacity是否足以滿足寫(xiě)入的要求。如果發(fā)現(xiàn)capacity不足時(shí),會(huì)觸發(fā)自動(dòng)擴(kuò)容。
        自動(dòng)擴(kuò)容的capacity無(wú)論如何不能擴(kuò)容到超過(guò)maxCapacity,如果超過(guò)maxCapacity依然無(wú)法放入新寫(xiě)入的數(shù)據(jù)則會(huì)報(bào)錯(cuò)(IndexOutOfBoundsException)。
        未超過(guò)maxCapacity時(shí),但寫(xiě)入的數(shù)據(jù)量超過(guò)當(dāng)前底層數(shù)組容量時(shí)則會(huì)進(jìn)行擴(kuò)容。擴(kuò)容時(shí),如果當(dāng)前底層數(shù)組的大小低于閾值(4M)時(shí),則會(huì)較為激進(jìn)的2倍數(shù)擴(kuò)容,以減少未來(lái)可能繼續(xù)擴(kuò)容的次數(shù);當(dāng)超過(guò)閾值時(shí),則以較為保守的方式進(jìn)行擴(kuò)容,以盡量的節(jié)約內(nèi)存。(ByteBufAllocator.calculateNewCapacity方法)
      • 通過(guò)maxCapacity和capacity兩個(gè)不同容量屬性的設(shè)計(jì),ByteBuf在能控制最大內(nèi)存使用量的前提下,能夠不一口氣申請(qǐng)一個(gè)極大的數(shù)組,而是按需的使用內(nèi)存。
      ByteBuf結(jié)構(gòu)圖

      ByteBuf-Structure

      支持基于引用計(jì)數(shù)的手動(dòng)內(nèi)存釋放
      • Netty的ByteBuf能夠基于引用計(jì)數(shù)機(jī)制來(lái)實(shí)現(xiàn)手動(dòng)的內(nèi)存資源釋放(AbstractReferenceCountedByteBuf)。在ByteBuf容器被創(chuàng)建時(shí),其被引用數(shù)被初始化為1。當(dāng)使用者認(rèn)為不需要再使用容器時(shí),就將被引用數(shù)自減1(release方法)。當(dāng)容器的被引用數(shù)被設(shè)置為0時(shí),則會(huì)觸發(fā)ByteBuf容器的銷毀流程,釋放掉底層數(shù)組所占用的內(nèi)存空間。
        當(dāng)ByteBuf容器需要交給其它線程會(huì)處理時(shí),需要通過(guò)retain方法增加被引用數(shù),避免其因?yàn)槠渌褂谜遰elease而被提前銷毀。
      • 因?yàn)锽yteBuf本身不會(huì)互相引用而出現(xiàn)循環(huán)依賴,所以引用計(jì)數(shù)的內(nèi)存管理機(jī)制是非常高效的。比起依賴jvm的gc機(jī)制,在確定不再使用ByteBuf時(shí)主動(dòng)的釋放,雖然略微的增加了開(kāi)發(fā)者的心智負(fù)擔(dān),但卻可以大幅的減輕gc的壓力。
        Netty作為一個(gè)高性能網(wǎng)絡(luò)框架,實(shí)際工作中都是通過(guò)ByteBuf容器來(lái)進(jìn)行通信,往往會(huì)在短時(shí)間內(nèi)大量創(chuàng)建并銷毀ByteBuf。如果完全依靠gc周期性的回收,那么會(huì)給系統(tǒng)帶來(lái)巨大的壓力。
        基于引用計(jì)數(shù)的內(nèi)存管理能夠主動(dòng)和實(shí)時(shí)的進(jìn)行內(nèi)存回收,將內(nèi)存回收的動(dòng)作較為均勻的分?jǐn)偟矫恳粋€(gè)時(shí)間段內(nèi),大大增強(qiáng)了系統(tǒng)的穩(wěn)定性。
      • 其實(shí)就像需要手動(dòng)進(jìn)行對(duì)象回收的語(yǔ)言(比如C語(yǔ)言)在內(nèi)存回收上比自動(dòng)垃圾回收的語(yǔ)言(java)性能通常更好一樣,自動(dòng)的gc雖然解放了開(kāi)發(fā)者的心智負(fù)擔(dān),但比起精細(xì)的手工管理、實(shí)時(shí)的釋放,其在性能上還是略遜一籌。
      支持ByteBuf容器的池化存儲(chǔ)
      • 池化的ByteBuf容器在創(chuàng)建時(shí),底層數(shù)組所需要的內(nèi)存在絕大多數(shù)情況下都能從已經(jīng)預(yù)先申請(qǐng)好的內(nèi)存區(qū)域中獲得,實(shí)際使用中僅需要進(jìn)行一些標(biāo)記即可,無(wú)需jvm或者操作系統(tǒng)進(jìn)行真實(shí)的內(nèi)存分配。而在ByteBuf容器不再使用而被釋放時(shí),也僅僅需要修改一些針對(duì)內(nèi)存區(qū)域控制權(quán)的即可,不需要進(jìn)行實(shí)際的內(nèi)存回收操作。
        池化的容器機(jī)制對(duì)gc非常友好,與平常接觸到的各種連接池、對(duì)象池一樣,通過(guò)避免大量初始化與銷毀的開(kāi)銷,極大的提高了使用特定對(duì)象的吞吐量。
      • ByteBuf容器池化存儲(chǔ)相關(guān)的工作原理比較復(fù)雜,我們?cè)贛yNetty后續(xù)的迭代中會(huì)逐一實(shí)現(xiàn)并在博客中介紹其工作原理。本期關(guān)于ByteBuf的介紹僅限于Unpooled非池化的實(shí)現(xiàn)。
      public abstract class MyAbstractByteBuf extends MyByteBuf {
          // 。。。 僅保留核心邏輯
          
          int readerIndex;
          int writerIndex;
          private int markedReaderIndex;
          private int markedWriterIndex;
          private int maxCapacity;
      
          /**
           * 是否在編輯讀/寫(xiě)指針的時(shí)候進(jìn)行邊界校驗(yàn)
           * 默認(rèn)為true,設(shè)置為false可以不進(jìn)行校驗(yàn)從而略微的提高性能,但可能出現(xiàn)內(nèi)存越界的問(wèn)題
           */
          private static final boolean checkBounds = SystemPropertyUtil.getBoolean("my.netty.check.bounds",true);
      
          protected MyAbstractByteBuf(int maxCapacity) {
              if (maxCapacity < 0) {
                  throw new IllegalArgumentException("maxCapacity must > 0");
              }
              this.maxCapacity = maxCapacity;
          }
      
          @Override
          public int maxCapacity() {
              return maxCapacity;
          }
      
          protected final void maxCapacity(int maxCapacity) {
              this.maxCapacity = maxCapacity;
          }
      
          @Override
          public int readerIndex() {
              return readerIndex;
          }
      
          @Override
          public MyByteBuf readerIndex(int readerIndex) {
              if (checkBounds) {
                  checkIndexBounds(readerIndex, writerIndex, capacity());
              }
              this.readerIndex = readerIndex;
              return this;
          }
      
          @Override
          public int writerIndex() {
              return writerIndex;
          }
      
          @Override
          public MyByteBuf writerIndex(int writerIndex) {
              if (checkBounds) {
                  checkIndexBounds(readerIndex, writerIndex, capacity());
              }
              this.writerIndex = writerIndex;
              return this;
          }
      
          @Override
          public MyByteBuf markReaderIndex() {
              markedReaderIndex = readerIndex;
              return this;
          }
          
          @Override
          public MyByteBuf markWriterIndex() {
              markedWriterIndex = writerIndex;
              return this;
          }
          
          @Override
          public byte getByte(int index) {
              checkIndex(index,1);
              return _getByte(index);
          }
      
          @Override
          public byte readByte() {
              // 檢查是否可以讀1個(gè)字節(jié)
              checkReadableBytes0(1);
              int i = readerIndex;
              byte b = _getByte(i);
              // 和jdk的實(shí)現(xiàn)一樣,在getByte的基礎(chǔ)上,推進(jìn)讀指針
              readerIndex = i + 1;
              return b;
          }
          
          protected abstract byte _getByte(int index);
      
          @Override
          public MyByteBuf setByte(int index, int value) {
              checkIndex(index,1);
              _setByte(index, value);
              return this;
          }
      
          protected abstract void _setByte(int index, int value);
      
          public abstract MyByteBufAllocator alloc();
      
          @Override
          public MyByteBuf writeByte(int value) {
              ensureWritable0(1);
              _setByte(writerIndex++, value);
              return this;
          }
      
          @Override
          public MyByteBuf writeBytes(byte[] src) {
              return writeBytes(src, 0, src.length);
          }
          
          @Override
          public MyByteBuf readBytes(byte[] dst) {
              readBytes(dst, 0, dst.length);
              return this;
          }
          
          @Override
          public boolean isReadable() {
              return writerIndex > readerIndex;
          }
      }
      
      /**
       * 參考自netty的UnpooledHeapByteBuf,在其基礎(chǔ)上做了簡(jiǎn)化(只實(shí)現(xiàn)了最基礎(chǔ)的一些功能以作參考)
       * */
      public class MyUnPooledHeapByteBuf extends MyAbstractReferenceCountedByteBuf{
      
          private final MyByteBufAllocator alloc;
      
          private ByteBuffer tmpNioBuf;
      
          public static final byte[] EMPTY_BYTES = {};
          private byte[] array;
      
          public MyUnPooledHeapByteBuf(MyByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
              super(maxCapacity);
      
              if (initialCapacity > maxCapacity) {
                  throw new IllegalArgumentException(String.format(
                          "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
              }
      
              this.alloc = alloc;
              this.array = new byte[initialCapacity];
              setIndex(0, 0);
          }
      
          @Override
          public int capacity() {
              // heapByteBuf的capacity就是內(nèi)部數(shù)組的長(zhǎng)度
              return array.length;
          }
      
          @Override
          public MyByteBuf capacity(int newCapacity) {
              checkNewCapacity(newCapacity);
      
              byte[] oldArray = array;
              int oldCapacity = oldArray.length;
              if (newCapacity == oldCapacity) {
                  // 特殊情況,如果與之前的容量一樣則無(wú)事發(fā)生
                  return this;
              }
      
              int bytesToCopy;
              if (newCapacity > oldCapacity) {
                  // 如果新的capacity比之前的大,那么就將原來(lái)內(nèi)部數(shù)組中的內(nèi)容整個(gè)copy到新數(shù)組中
                  bytesToCopy = oldCapacity;
              } else {
                  // 如果新的capacity比之前的小,那么可能需要截?cái)嘀暗臄?shù)組內(nèi)容
                  if (writerIndex() > newCapacity) {
                      // 寫(xiě)指針大于newCapacity,確定需要截?cái)?                this.readerIndex = Math.min(readerIndex(), newCapacity);
                      this.writerIndex = newCapacity;
                  }
                  bytesToCopy = newCapacity;
              }
      
              // 將原始內(nèi)部數(shù)組中的內(nèi)容copy到新數(shù)組中
              byte[] newArray = new byte[newCapacity];
              System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
              this.array = newArray;
              return this;
          }
      
          @Override
          public int readBytes(GatheringByteChannel out, int length) throws IOException {
              int readBytes = getBytes(readerIndex, out, length, true);
              readerIndex += readBytes;
              return readBytes;
          }
      
          @Override
          public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
              return getBytes(index, out, length, false);
          }
      
          @Override
          public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
              try {
                  return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
              } catch (ClosedChannelException ignored) {
                  return -1;
              }
          }
      
          @Override
          public byte[] array() {
              return this.array;
          }
      
          @Override
          public int arrayOffset() {
              // 非Pool的,沒(méi)有偏移量
              return 0;
          }
      
          @Override
          public MyByteBuf getBytes(int index, MyByteBuf dst, int dstIndex, int length) {
              // 帶上dst的偏移量
              getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
              return this;
          }
      
          @Override
          public MyByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
              System.arraycopy(array, index, dst, dstIndex, length);
              return this;
          }
      
          @Override
          public MyByteBuf setBytes(int index, MyByteBuf src, int srcIndex, int length) {
              // 帶上src的偏移量
              setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
              return this;
          }
      
      
          @Override
          public MyByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
              System.arraycopy(src, srcIndex, array, index, length);
              return this;
          }
      
          @Override
          protected byte _getByte(int index) {
              return array[index];
          }
      
          @Override
          protected void _setByte(int index, int value) {
              this.array[index] = (byte) value;
          }
      
          @Override
          public MyByteBufAllocator alloc() {
              return alloc;
          }
      
          @Override
          protected int _getInt(int index) {
              return BitsUtil.getInt(this.array,index);
          }
      
          @Override
          protected int _getIntLE(int index) {
              return BitsUtil.getIntLE(this.array,index);
          }
      
          @Override
          protected void deallocate() {
              // heapByteBuf的回收很簡(jiǎn)單,就是清空內(nèi)部數(shù)組,等待gc回收掉原來(lái)的數(shù)組對(duì)象即可
              array = EMPTY_BYTES;
          }
      
          @Override
          public ByteBuffer internalNioBuffer(int index, int length) {
              checkIndex(index, length);
              return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
          }
      
          private ByteBuffer internalNioBuffer() {
              ByteBuffer tmpNioBuf = this.tmpNioBuf;
              if (tmpNioBuf == null) {
                  this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
              }
              return tmpNioBuf;
          }
      
          private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
              ByteBuffer tmpBuf;
              if (internal) {
                  tmpBuf = internalNioBuffer();
              } else {
                  tmpBuf = ByteBuffer.wrap(array);
              }
              return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
          }
      }
      
      /**
       * 參考最初版本的netty AbstractReferenceCountedByteBuf實(shí)現(xiàn)(4.16.Final)
       * 高版本的netty進(jìn)行了性能上的優(yōu)化,但是細(xì)節(jié)太多,太復(fù)雜。我們這個(gè)netty學(xué)習(xí)的demo更關(guān)注netty整體的結(jié)構(gòu),細(xì)節(jié)上的優(yōu)化先放過(guò)
       * 具體原理可以參考大佬的博客:http://www.rzrgm.cn/binlovetech/p/18369244
       * */
      public abstract class MyAbstractReferenceCountedByteBuf extends MyAbstractByteBuf{
      
          /**
           * 原子更新refCnt字段
           * */
          private static final AtomicIntegerFieldUpdater<MyAbstractReferenceCountedByteBuf> refCntUpdater =
                  AtomicIntegerFieldUpdater.newUpdater(MyAbstractReferenceCountedByteBuf.class, "refCnt");
      
          /**
           * 被引用的次數(shù)
           *
           * 主要用于實(shí)現(xiàn)ReferenceCounted接口相關(guān)的邏輯
           * */
          private volatile int refCnt;
      
          protected MyAbstractReferenceCountedByteBuf(int maxCapacity) {
              super(maxCapacity);
              // 被創(chuàng)建就說(shuō)明被引用了,被引用數(shù)初始化為1
              refCntUpdater.set(this, 1);
          }
      
          @Override
          public int refCnt() {
              return this.refCnt;
          }
      
          protected final void setRefCnt(int refCnt) {
              refCntUpdater.set(this, refCnt);
          }
      
          public MyByteBuf retain() {
              return this.retain0(1);
          }
      
          public MyByteBuf retain(int increment) {
              if(increment <= 0){
                  throw new IllegalArgumentException("increment must > 0");
              }
      
              return this.retain0(increment);
          }
      
          private MyByteBuf retain0(int increment) {
              int refCnt;
              int nextCnt;
              do {
                  refCnt = this.refCnt;
                  // 先算出更新后預(yù)期的值,用于cas
                  nextCnt = refCnt + increment;
                  if (nextCnt <= increment) {
                      // 參數(shù)有問(wèn)題
                      throw new IllegalArgumentException("illegal retain refCnt=" + refCnt + ", increment=" + increment);
                  }
                  // cas原子更新,如果compareAndSet返回false,則說(shuō)明出現(xiàn)了并發(fā)更新
                  // doWhile循環(huán)重新計(jì)算過(guò),直到更新成功
              } while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));
      
              return this;
          }
      
          public boolean release() {
              return this.release0(1);
          }
      
          public boolean release(int decrement) {
              if(decrement <= 0){
                  throw new IllegalArgumentException("decrement must > 0");
              }
      
              return this.release0(decrement);
          }
      
          private boolean release0(int decrement) {
              int refCnt;
              do {
                  refCnt = this.refCnt;
                  if (refCnt < decrement) {
                      // 參數(shù)有問(wèn)題,減的太多了
                      throw new IllegalArgumentException("illegal retain refCnt=" + refCnt + ", decrement=" + decrement);
                  }
              } while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
      
              if (refCnt == decrement) {
                  // refCnt減為0了,釋放該byteBuf(具體釋放方式由子類處理)
                  this.deallocate();
                  // 返回true,說(shuō)明當(dāng)前release后,成功釋放了
                  return true;
              } else {
                  // 返回false,說(shuō)明當(dāng)前release后還有別的地方仍然在引用該ByteBuf
                  return false;
              }
          }
      
          protected abstract void deallocate();
      }
      

      總結(jié)

      • 在本篇博客中,先對(duì)jdk的Buffer容器體系進(jìn)行了介紹,并基于Buffer容器的一些缺點(diǎn)引出了Netty的ByteBuf容器。ByteBuf容器在jdk的Buffer容器基礎(chǔ)上,做了非常多的拓展以改進(jìn)Buffer容器的缺點(diǎn)。
      • 限于個(gè)人水平,博客中對(duì)jdk的Buffer和Netty的ByteBuf容器的工作原理介紹都是點(diǎn)到即止,僅僅分析了一些最基礎(chǔ)和核心的點(diǎn)。要想更好的理解其底層原理,還是需要讀者仔細(xì)的閱讀資料和源碼才行。
      • lab6中實(shí)現(xiàn)的非池化ByteBuf機(jī)制雖然非常簡(jiǎn)單,但為后續(xù)迭代中真正核心且復(fù)雜的PooledByteBuf即池化內(nèi)存管理的實(shí)現(xiàn)打下了基礎(chǔ)。后續(xù)的lab7-lab9中,MyNetty將會(huì)參考netty逐步的實(shí)現(xiàn)一個(gè)簡(jiǎn)化版的池化內(nèi)存管理體系,幫助讀者更好的理解netty。

      博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab6_bytebuf 分支),內(nèi)容如有錯(cuò)誤,還請(qǐng)多多指教。

      posted on 2025-08-08 21:33  小熊餐館  閱讀(183)  評(píng)論(0)    收藏  舉報(bào)

      主站蜘蛛池模板: 无码日韩人妻精品久久| 成人午夜免费无码视频在线观看 | 少妇又紧又色又爽又刺激视频| 泸西县| 91老肥熟女九色老女人| 久久精品蜜芽亚洲国产AV| 日韩丝袜欧美人妻制服| 欧洲无码一区二区三区在线观看| 亚洲精品日韩中文字幕| 亚洲午夜亚洲精品国产成人| 色就色中文字幕在线视频| 亚洲人成小说网站色在线| 久久精品麻豆日日躁夜夜躁| 成人午夜电影福利免费| 嫖妓丰满肥熟妇在线精品| 激情国产一区二区三区四区| 免费看的日韩精品黄色片| 欧美高清精品一区二区| 日本欧美大码a在线观看| 国产99在线 | 欧美| 精品国产福利一区二区在线 | 亚洲成av人无码免费观看| 深夜精品免费在线观看| 国产色a在线观看| 亚洲人成网线在线播放VA| 景宁| 在线观看成人av天堂不卡| 色狠狠色噜噜AV一区| 大余县| 天天看片视频免费观看| 亚洲成av人片在线观看www| 国产精品自在线拍国产手机版| 欧美亚洲h在线一区二区| 国产真实乱人偷精品人妻| 天津市| 一本加勒比hezyo无码人妻| 国产清纯在线一区二区| 泰宁县| 少妇人妻偷人精品免费| 西西人体www大胆高清| 7878成人国产在线观看|