從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(六) MyNetty ByteBuf實(shí)現(xiàn)
從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(六) MyNetty ByteBuf實(shí)現(xiàn)
1. jdk Buffer介紹
在上一篇博客中,lab5版本的MyNetty中實(shí)現(xiàn)了FastThreadLocal,為后續(xù)實(shí)現(xiàn)池化內(nèi)存分配功能打下了基礎(chǔ)。池化內(nèi)存分配是netty中非常核心也非常復(fù)雜的一個(gè)功能,沒(méi)法在一次迭代中完整的實(shí)現(xiàn),MyNetty打算分為4個(gè)迭代逐步的將其實(shí)現(xiàn)。按照計(jì)劃,本篇博客中,lab6版本的MyNetty需要實(shí)現(xiàn)一個(gè)非常基礎(chǔ)的,非池化的ByteBuf作為后續(xù)迭代的基礎(chǔ)。
由于本文屬于系列博客,讀者需要對(duì)之前的博客內(nèi)容有所了解才能更好地理解本文內(nèi)容。
- lab1版本博客:從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(一) MyNetty Reactor模式
- lab2版本博客:從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(二) MyNetty pipeline流水線
- lab3版本博客:從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(三) MyNetty 高效的數(shù)據(jù)讀取實(shí)現(xiàn)
- lab4版本博客:從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(四) MyNetty 高效的數(shù)據(jù)寫(xiě)出實(shí)現(xiàn)
- lab5版本博客:從零開(kāi)始實(shí)現(xiàn)簡(jiǎn)易版Netty(五) MyNetty FastThreadLocal實(shí)現(xiàn)
在前面的實(shí)驗(yàn)中,MyNetty中用來(lái)承載消息的容器一直是java中nio包下的ByteBuffer。與FastThreadLocal類似,Netty同樣不滿足于jdk自帶的ByteBuffer,而是基于ByteBuffer實(shí)現(xiàn)了性能更好,功能更強(qiáng)大的ByteBuf容器。
但在學(xué)習(xí)Netty的ByteBuf容器之前,我們還是需要先了解jdk中的ByteBuffer工作原理。只有在理解了jdk原生的ByteBuffer的實(shí)現(xiàn)原理和優(yōu)缺點(diǎn)后,我們才能更好的理解Netty中的ByteBuf和它的優(yōu)勢(shì)。
jdk中的Buffer是一個(gè)巨大的多維層次體系,按照所存儲(chǔ)的數(shù)據(jù)類型可以分為byte、int、short等,按照底層承載數(shù)據(jù)的內(nèi)存區(qū)域的不同可以分為基于堆內(nèi)存(heap)的和基于堆外內(nèi)存(direct),按照是否僅可讀可以分為普通可讀可寫(xiě)的buffer和只讀buffer。
Buffer按照類的繼承關(guān)系將這幾個(gè)維度以多層的子類繼承關(guān)系組織起來(lái),其中java.nio.Buffer是最頂層的抽象類。
第二層按照所存儲(chǔ)的數(shù)據(jù)類型可以分為ByteBuffer、IntBuffer和ShortBuffer等直接子類;按照底層承載數(shù)據(jù)的內(nèi)存區(qū)域可以進(jìn)一步分為HeapByteBuffer、DirectByteBuffer、HeapIntBuffer等;更進(jìn)一步的對(duì)于只讀的Buffer容器又有HeapByteBufferR、DirectByteBufferR、HeapIntBufferR等子類實(shí)現(xiàn)(結(jié)尾的R是ReadOnly的意思)。
Buffer主要子類示意圖

計(jì)算機(jī)中數(shù)據(jù)底層基本都是以Byte字節(jié)維度存儲(chǔ)的,而Int、Short、Double等數(shù)據(jù)類型都是基于字節(jié)的,因此整個(gè)Buffer體系中最核心的就是ByteBuffer。
限于篇幅,本文中將只重點(diǎn)分析最重要的ByteBuffer以及其直接子類HeapByteBuffer的工作原理,相信在理解了整個(gè)Buffer體系中最核心的機(jī)制后,讀者能對(duì)jdk的ByteBuffer體系能有一個(gè)大致的理解,有利于后續(xù)理解Netty的ByteBuf容器。
Buffer類
首先我們先看看最頂層的Buffer類,在Buffer類中共定義了四個(gè)非常關(guān)鍵的int類型的屬性,分別是capacity、limit、position和mark。
- capacity屬性代表著當(dāng)前Buffer容器的可容納的最大總元素個(gè)數(shù),不能為負(fù)數(shù)并且不可變。capacity=10,對(duì)于ByteBuffer代表著最多能存放10個(gè)字節(jié)的數(shù)據(jù);而對(duì)于IntBuffer則代表著最多能存放是10個(gè)整型的數(shù)據(jù)。可以簡(jiǎn)單的將Buffer邏輯上視為有著一個(gè)長(zhǎng)度為capacity的底層數(shù)組,數(shù)組的類型與當(dāng)前Buffer的類型一致(實(shí)際也可能是用Byte數(shù)組模擬Int數(shù)組)。
- jdk中的Buffer分為兩種模式,一種是寫(xiě)模式,一種是讀模式。寫(xiě)模式下只能往Buffer中put寫(xiě)入數(shù)據(jù),不能讀取數(shù)據(jù);讀模式下只能從Buffer中g(shù)et讀出數(shù)據(jù),不能寫(xiě)入數(shù)據(jù)。
Buffer在剛被創(chuàng)建初始化時(shí),是空的,屬于寫(xiě)模式。在寫(xiě)入數(shù)據(jù)后,可以通過(guò)flip方法將Buffer容器切換為讀模式(具體的原理下面會(huì)分析)。 - position屬性在寫(xiě)模式下代表著當(dāng)前的寫(xiě)指針,下一次寫(xiě)入的數(shù)據(jù)將會(huì)被寫(xiě)入底層數(shù)組中index=position的位置;而在讀模式下,則代表著當(dāng)前的讀指針,即下一次讀取的數(shù)據(jù)將會(huì)是底層數(shù)組中index=position的位置。
- limit屬性用于校驗(yàn)以防止越界,在寫(xiě)模式下代表著第一個(gè)不可寫(xiě)的位置,默認(rèn)情況下寫(xiě)模式中l(wèi)imit的值等于capacity。
而在讀模式下,代表著第一個(gè)不可讀的位置。舉個(gè)例子,一個(gè)capacity=10的ByteBuffer,在寫(xiě)入了8個(gè)字節(jié)后,limit=8;通過(guò)flip方法轉(zhuǎn)為讀模式后,limit就由寫(xiě)模式下的10,轉(zhuǎn)變?yōu)榱?,代表著最多只能讀取8個(gè)字節(jié)。因?yàn)樵偻笞x,就相當(dāng)于數(shù)組下標(biāo)越界了,讀取了從未寫(xiě)過(guò)的數(shù)據(jù)。 - mark屬性用于臨時(shí)記錄一下當(dāng)前position的位置,比如處理拆包黏包問(wèn)題時(shí)可以嘗試著向后讀取更多的數(shù)據(jù),但未獲取到完整包時(shí)可以通過(guò)mark指針退回到初始的位置,等待完整的包來(lái)臨。
- 四個(gè)指針屬性之間存在著嚴(yán)格的大小關(guān)系,即mark <= position <= limit <= capacity。
capacity用于控制整個(gè)Buffer的大小,任意的讀寫(xiě)都不可越過(guò)capacity的限制。
limit用于規(guī)范不可讀寫(xiě)的位置,防止越界,其不會(huì)超過(guò)capacity;其無(wú)論是在寫(xiě)模式還是讀模式下作為position的邊界值,都一定大于或等于position。
position用于記錄當(dāng)前的相對(duì)讀寫(xiě)位置,其無(wú)法超過(guò)limit。
mark用于記錄當(dāng)前position的位置,其不會(huì)超過(guò)position。當(dāng)limit或者position因?yàn)槟承┰蚨s小,導(dǎo)致其小于當(dāng)前的mark時(shí),mark會(huì)被廢棄被還原為初始值-1。
- Buffer類中還提供了三個(gè)用于切換容器形態(tài)的重要方法,分別是flip、rewind和clear方法。
- flip方法用于在寫(xiě)入完成后,令Buffer轉(zhuǎn)換到讀模式。flip操作會(huì)將position歸零,而limit被設(shè)置為之前寫(xiě)模式下的position以避免讀取越界。
- rewind方法一般用于在讀取數(shù)據(jù)后,重置讀指針以方便重復(fù)的讀取。rewind操作中只會(huì)簡(jiǎn)單的將position歸零,但不修改limit的值。
- clear方法用于清空Buffer容器,重新寫(xiě)入新的數(shù)據(jù)。clear操作會(huì)將position設(shè)置為0,而limit設(shè)置為等于capacity。設(shè)置完成后的指針位置與Buffer剛被創(chuàng)建時(shí)一樣。
Buffer結(jié)構(gòu)圖

/**
* 基本copy自jdk的Buffer類,但做了簡(jiǎn)化
* */
public abstract class MyBuffer {
/**
* buffer的capacity代表著總?cè)萘看笮∠拗疲荒転樨?fù)數(shù)并且不可變
* */
private int capacity;
/**
* buffer的limit標(biāo)識(shí)著第一個(gè)不可讀或者寫(xiě)的index位置,不能為負(fù)數(shù)并且不能大于capacity
* */
private int limit;
/**
* buffer的position標(biāo)識(shí)著下一個(gè)讀或者寫(xiě)的元素的index位置,不能為負(fù)數(shù)并且不能大于limit
* */
private int position = 0;
/**
* buffer的mark是用于reset方法(重置)被調(diào)用時(shí),將position的值重試為mark對(duì)應(yīng)下標(biāo).
* mark并不總是被定義,但當(dāng)它被定義時(shí),它不會(huì)為負(fù)數(shù),并且不會(huì)超過(guò)position
* 如果mark被定義了,則它將會(huì)在position或者limit被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
* 如果mark沒(méi)有被定義,則在調(diào)用reset方法時(shí)將會(huì)拋出InvalidMarkException
* */
private int mark = -1;
// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
/**
* 有兩種Buffer,分別基于堆內(nèi)內(nèi)存和堆外內(nèi)存
* 堆外內(nèi)存中,這個(gè)屬性標(biāo)示堆外內(nèi)存具體的起始地址, MyNetty中暫時(shí)用不到
* */
long address;
MyBuffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0) {
// The capacity of a buffer is never negative
throw new IllegalArgumentException("Negative capacity: " + cap);
}
this.capacity = cap;
setLimit(lim);
setPosition(pos);
if (mark >= 0) {
// The mark is not always defined, but when it is defined it is never negative and is never greater than the position.
if (mark > pos) {
throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")");
}
this.mark = mark;
}
}
public int getCapacity() {
return capacity;
}
public int getPosition() {
return position;
}
public int getLimit() {
return limit;
}
public int getMark() {
return mark;
}
final void discardMark() {
mark = -1;
}
/**
* 是否是只讀的
* */
public abstract boolean isReadOnly();
public final MyBuffer setLimit(int newLimit) {
if ((newLimit > capacity) || (newLimit < 0)) {
throw new IllegalArgumentException();
}
// 給limit賦值
limit = newLimit;
if (position > newLimit) {
// 確保position不能大于limit
position = newLimit;
}
if (mark > newLimit) {
// 如果mark被定義了,則它將會(huì)在position被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
mark = -1;
}
return this;
}
public final MyBuffer setPosition(int newPosition) {
if ((newPosition > limit) || (newPosition < 0)) {
throw new IllegalArgumentException("invalid newPosition=" + newPosition + " limit=" + limit);
}
if (mark > newPosition) {
// 如果mark被定義了,則它將會(huì)在limit被調(diào)整為小于mark時(shí)被廢棄(變成未定義的狀態(tài))
mark = -1;
}
// 給position賦值
position = newPosition;
return this;
}
public final MyBuffer mark() {
// 將mark記錄為當(dāng)前position的位置
this.mark = this.position;
return this;
}
/**
* buffer的mark是用于reset方法(重置)被調(diào)用時(shí),將position的值重試為mark對(duì)應(yīng)下標(biāo).
* */
public final MyBuffer reset() {
int m = mark;
if (m < 0) {
// 如果mark沒(méi)有被定義,則在調(diào)用reset方法時(shí)將會(huì)拋出InvalidMarkException
throw new InvalidMarkException();
}
position = m;
return this;
}
/**
* 令buffer準(zhǔn)備好作為一個(gè)新的序列用于channel讀操作或者說(shuō)讓channel將數(shù)據(jù)put進(jìn)來(lái)
* 設(shè)置limit為capacity并且將position設(shè)置為0
*
* 注意:clear并沒(méi)有真正的將buffer里的數(shù)據(jù)完全清零,而僅僅是通過(guò)修改關(guān)鍵屬性的方式邏輯進(jìn)行了邏輯上的clear,這樣性能更好
* */
public final MyBuffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
/**
* 令buffer準(zhǔn)備好作為一個(gè)新的序列用于channel寫(xiě)操作或者說(shuō)讓channel將寫(xiě)進(jìn)去的數(shù)據(jù)get走:
* 設(shè)置limit為當(dāng)前的position的值,并且將position設(shè)置為0
* */
public final MyBuffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
/**
* 讓一個(gè)buffer準(zhǔn)備好重新讀取數(shù)據(jù),即在limit不變的情況下,將position設(shè)置為0
* 因?yàn)樽x取操作會(huì)不斷地推進(jìn)position的位置,重置position為0,相當(dāng)于允許讀取重頭讀(類似磁帶進(jìn)行了倒帶,即rewind)
* */
public final MyBuffer rewind() {
position = 0;
mark = -1;
return this;
}
/**
* 返回當(dāng)前buffer還剩余可用的元素個(gè)數(shù)(即limit-position)
* */
public final int remaining() {
int rem = limit - position;
return Math.max(rem, 0);
}
/**
* 是否還有剩余可用的元素個(gè)數(shù)
* */
public final boolean hasRemaining() {
return position < limit;
}
static void checkBounds(int off, int len, int size) {
if ((off | len | (off + len) | (size - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
}
}
final int nextGetIndex() {
int p = position;
if (p >= limit) {
throw new BufferUnderflowException();
}
position = p + 1;
return p;
}
final int nextGetIndex(int nb) {
int p = position;
if (limit - p < nb){
throw new BufferUnderflowException();
}
position = p + nb;
return p;
}
final int nextPutIndex() {
int p = position;
if (p >= limit) {
// 相比nextGetIndex,拋出的異常不同
throw new BufferOverflowException();
}
position = p + 1;
return p;
}
final int nextPutIndex(int nb) {
int p = position;
if (limit - p < nb) {
throw new BufferOverflowException();
}
position = p + nb;
return p;
}
final int checkIndex(int i) {
if ((i < 0) || (i >= limit)) {
throw new IndexOutOfBoundsException();
}
return i;
}
final int checkIndex(int i, int nb) {
if ((i < 0) || (nb > limit - i)) {
throw new IndexOutOfBoundsException();
}
return i;
}
}
ByteBuffer類
- ByteBuffer類顧名思義,是承載Byte類型的數(shù)據(jù)的Buffer容器。前面提到Buffer承載底層數(shù)組的內(nèi)存分為兩種類型,一種是位于jvm堆內(nèi)存的HeapXXXBuffer,一種是位于堆外的直接內(nèi)存的DirectXXXBuffer。
HeapByteBuffer中底層數(shù)組直接就是成員變量byte[] hb。而DirectByteBuffer中的底層數(shù)組位于堆外,數(shù)組的內(nèi)存起始地址由父類ByteBuffer中的address維護(hù),訪問(wèn)時(shí)通過(guò)Unsafe方法address與實(shí)際的偏移量來(lái)訪問(wèn)對(duì)應(yīng)位置的數(shù)據(jù)。
在本篇博客中,限于篇幅只介紹基于堆內(nèi)存的ByteBuffer實(shí)現(xiàn)HeapByteBuffer,因?yàn)樽鳛锽uffer其基本的工作原理上HeapByteBuffer與DirectByteBuffer并沒(méi)有特別大的不同,都是在底層數(shù)組上讀寫(xiě)對(duì)應(yīng)的數(shù)據(jù)。 - ByteBuffer提供了基于各種數(shù)據(jù)類型的讀(get)與寫(xiě)(put)方法,其中分為相對(duì)操作與絕對(duì)操作之分,絕對(duì)操作與相對(duì)操作的區(qū)別在于多了一個(gè)index參數(shù),比如相對(duì)操作get()和絕對(duì)操作get(int index)。
絕對(duì)操作很好理解,就是在底層數(shù)組指定的index位置讀取或者寫(xiě)入對(duì)應(yīng)的數(shù)據(jù)。而相對(duì)操作其實(shí)也需要一個(gè)index指針,而這個(gè)index指針就是前面介紹過(guò)的位于Buffer內(nèi)部的position指針。相對(duì)操作在進(jìn)行了讀或者寫(xiě)操作后,會(huì)增加或者說(shuō)推進(jìn)position的值。
絕大多數(shù)情況下,我們都是使用相對(duì)操作來(lái)讀寫(xiě)操作Buffer的,因?yàn)榭梢宰孊uffer內(nèi)部自行維護(hù)讀或者寫(xiě)指針,比較方便。 - ByteBuffer因?yàn)槠浯鎯?chǔ)的是最基礎(chǔ)的Byte類型,因此其很容易拓展轉(zhuǎn)換為其它數(shù)據(jù)類型的Buffer容器。
ByteBuffer除了提供byte類型的讀寫(xiě)方法外,也提供了getInt、putInt、getShort、putLong等等其它數(shù)據(jù)類型讀寫(xiě)的方法。其底層本質(zhì)上是將1或N個(gè)字節(jié)看做一個(gè)完整的特定數(shù)據(jù)類型進(jìn)行寫(xiě)入或讀取。
比如通過(guò)相對(duì)寫(xiě)操作putInt將一個(gè)int類型的數(shù)據(jù)寫(xiě)入Buffer等價(jià)于一次寫(xiě)入了4個(gè)byte,position自增4。讀取操作則是從特定位置開(kāi)始,將包含自身在內(nèi)的共四個(gè)byte視作一個(gè)int類型返回。
同時(shí)ByteBuffer也提供了諸如asCharBuffer、asIntBuffer等等將自身轉(zhuǎn)換成邏輯上等價(jià)的其它類型Buffer的方法方便使用。
Buffer容器中,特別是關(guān)于堆外內(nèi)存的使用還有很多細(xì)節(jié)值得研究,但這不是MyNetty系列博客的重點(diǎn),感興趣的讀者可以自行閱讀jdk的對(duì)應(yīng)源碼或者相關(guān)資料,限于個(gè)人能力這里就不再展開(kāi)了。
大端法與小端法
- 前面我們提到,Buffer對(duì)于int或者long等多個(gè)byte字節(jié)構(gòu)成的數(shù)據(jù),是通過(guò)將多個(gè)字節(jié)合并在一起視為整體來(lái)實(shí)現(xiàn)的。而實(shí)際上這里面存在一個(gè)問(wèn)題,即從前到后的哪個(gè)字節(jié)代表高位,哪個(gè)字節(jié)代表低位。如果網(wǎng)絡(luò)傳輸?shù)葓?chǎng)景下兩邊的表示方式不一致,則讀取到的字節(jié)流可能會(huì)被錯(cuò)誤的解析。
- 內(nèi)存地址的示意圖一般是從左到右,從低到高排列的。作為普通人來(lái)說(shuō),看數(shù)字時(shí)也習(xí)慣了高位在左(前),低位在右(后)的模式(1024,千位在前,個(gè)位在后)。這種低位內(nèi)存的字節(jié)代表高位,高位內(nèi)存的字節(jié)代表低位的表示方法叫做大端法。與之相對(duì)的,低位內(nèi)存代表低位,高位內(nèi)存代表高位的表示方法則被叫做小端法。
- 讀者可能會(huì)有疑問(wèn),既然大端法符合人們的直觀理解,為什么不讓所有的軟硬件系統(tǒng)都統(tǒng)一使用大端法存儲(chǔ)數(shù)據(jù)呢?免得還要互相之間各種約定,避免轉(zhuǎn)換錯(cuò)誤。
這是因?yàn)樾《朔ㄔ谧鲆恍?qiáng)制類型轉(zhuǎn)換等基礎(chǔ)的底層操作時(shí),硬件性能更好。舉個(gè)例子,有一個(gè)占4字節(jié)的int數(shù)據(jù),想要強(qiáng)制轉(zhuǎn)換成一個(gè)short類型。對(duì)于小端法而言,只需要從起始位置開(kāi)始尋址找到兩個(gè)字節(jié)返回即可,因?yàn)榈臀蛔止?jié)代表低位,強(qiáng)轉(zhuǎn)時(shí)忽略高位即可。而大端法必須基于起始位置四個(gè)字節(jié)中的后兩個(gè)字節(jié)才能轉(zhuǎn)換成功。
在性能重于一切的底層硬件或操作系統(tǒng)層面,這種效率上的差異不能完全的忽略。所以時(shí)至今日,依然有很多的硬件和操作系統(tǒng)使用小端法維護(hù)int等類型的數(shù)據(jù)。 - 因此,jdk的ByteBuffer中支持指定以小端法或者大端法來(lái)存儲(chǔ)數(shù)據(jù),這樣可以在網(wǎng)絡(luò)傳輸?shù)刃枰獙?nèi)存數(shù)據(jù)編碼為字節(jié)流的場(chǎng)景下,讓用戶能自由的轉(zhuǎn)換為約定好的,統(tǒng)一的表示方式。
/**
* 基本copy自jdk的ByteBuffer類,但做了簡(jiǎn)化
* */
public abstract class MyByteBuffer extends MyBuffer {
// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
MyByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
/**
* 創(chuàng)建一個(gè)指定了capacity的堆內(nèi)ByteBuffer
* */
public static MyByteBuffer allocate(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException();
}
// 簡(jiǎn)單起見(jiàn),只支持基于堆內(nèi)存的HeapByteBuffer
return new MyHeapByteBuffer(capacity, capacity);
}
/**
* 相對(duì)讀操作
* */
public abstract byte get();
/**
* 絕對(duì)讀操作
* */
public abstract byte get(int index);
/**
* 相對(duì)寫(xiě)操作
* */
public abstract MyByteBuffer put(byte b);
public final MyByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
public MyByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length);
if (length > remaining()) {
throw new BufferOverflowException();
}
int end = offset + length;
for (int i = offset; i < end; i++) {
// 循環(huán)put寫(xiě)入整個(gè)字節(jié)數(shù)組
this.put(src[i]);
}
return this;
}
/**
* 絕對(duì)寫(xiě)操作
* */
public abstract MyByteBuffer put(int index, byte b);
/**
* 相對(duì)的批量讀操作
* 將當(dāng)前buffer的length個(gè)字節(jié),寫(xiě)入指定dst數(shù)組。寫(xiě)入的起始下標(biāo)位置是offset
* */
public MyByteBuffer get(byte[] dst, int offset, int length) {
checkBounds(offset, length, dst.length);
if (length > remaining()) {
// 所要讀取的字節(jié)數(shù)不能超過(guò)當(dāng)前buffer總的剩余可讀取數(shù)量
throw new BufferUnderflowException();
}
int end = offset + length;
for (int i = offset; i < end; i++) {
dst[i] = get();
}
return this;
}
/**
* 相對(duì)的批量讀操作
* 將當(dāng)前buffer中的數(shù)據(jù)寫(xiě)入指定dst數(shù)組。寫(xiě)入的起始下標(biāo)是0,length為dst的總長(zhǎng)度
* */
public MyByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}
/**
* 壓縮操作(也算寫(xiě)操作)
* */
public abstract MyByteBuffer compact();
/**
* 默認(rèn)是大端
* */
boolean bigEndian = true;
public final ByteOrder order() {
return bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN;
}
public final MyByteBuffer order(ByteOrder bo) {
bigEndian = (bo == ByteOrder.BIG_ENDIAN);
return this;
}
// 簡(jiǎn)單起見(jiàn)省略掉別的數(shù)據(jù)類型,僅支持int類型
public abstract int getInt();
public abstract int getInt(int index);
public abstract MyByteBuffer putInt(int value);
public abstract MyByteBuffer putInt(int index, int value);
abstract byte _get(int i); // package-private
abstract void _put(int i, byte b); // package-private
}
/**
* 基本copy自jdk的HeapByteBuffer,但做了簡(jiǎn)化
* */
public class MyHeapByteBuffer extends MyByteBuffer{
MyHeapByteBuffer(int cap, int lim) {
super(-1, 0, lim, cap, new byte[cap], 0);
}
protected int ix(int i) {
return i + offset;
}
@Override
public byte get() {
// 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
int nextGetIndex = nextGetIndex();
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(nextGetIndex);
return hb[finallyIndex];
}
@Override
public byte get(int index) {
// 檢查index的合法性,必須是0 <= index < limit,避免越界
checkIndex(index);
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(index);
return hb[finallyIndex];
}
@Override
public MyByteBuffer put(byte b) {
// 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
int nextPutIndex = nextPutIndex();
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(nextPutIndex);
// 將b放入對(duì)應(yīng)的index處
hb[finallyIndex] = b;
return this;
}
@Override
public MyByteBuffer put(int index, byte b) {
// 檢查index的合法性,必須是0 <= index < limit,避免越界
checkIndex(index);
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(index);
// 將b放入對(duì)應(yīng)的index處
hb[finallyIndex] = b;
return this;
}
@Override
byte _get(int i) {
return hb[i];
}
@Override
void _put(int i, byte b) {
hb[i] = b;
}
@Override
public MyByteBuffer compact() {
// 把底層數(shù)組中(position+offset)到(limit+offset)之間的內(nèi)容整體往前挪,挪到數(shù)組起始處(0+offset), 實(shí)際內(nèi)容的長(zhǎng)度=remaining
System.arraycopy(hb, ix(getPosition()), hb, ix(0), remaining());
// 要理解下面position和limit的變化,需要意識(shí)到compact是一次從讀模式切換到寫(xiě)模式的操作(之前讀完了,就把剩下的還沒(méi)有讀完的壓縮整理一下)
// 壓縮后實(shí)際內(nèi)容的長(zhǎng)度=remaining,所以壓縮完后position,也就是后續(xù)開(kāi)始寫(xiě)的位置就是從remaining開(kāi)始
setPosition(remaining());
// 寫(xiě)模式下,limit當(dāng)然就變成了capacity了
setLimit(getCapacity());
// 壓縮后,mark沒(méi)意義了,就直接丟棄掉
discardMark();
return this;
}
@Override
public int getInt() {
// 相比get方法獲得1個(gè)字節(jié),getInt一次要讀取4個(gè)字節(jié),所以position一次性推進(jìn)4字節(jié)
int nextGetIndex = nextGetIndex(4);
int finallyIndex = ix(nextGetIndex);
// 從指定的index處讀取4個(gè)字節(jié),構(gòu)造成1個(gè)int類型返回(基于bigEndian,決定如何解析這4個(gè)字節(jié)(大端還是小端))
return BitsUtil.getInt(this, finallyIndex, bigEndian);
}
@Override
public int getInt(int index) {
// 檢查index的合法性,必須是0 <= index < limit-4,避免越界
checkIndex(index,4);
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(index);
// 從指定的index處讀取4個(gè)字節(jié),構(gòu)造成1個(gè)int類型返回(基于bigEndian,決定如何解析這4個(gè)字節(jié)(大端還是小端))
return BitsUtil.getInt(this, finallyIndex, bigEndian);
}
@Override
public MyByteBuffer putInt(int value) {
// 獲得position的位置(相對(duì)操作,有副作用,會(huì)推進(jìn)position)
// 相比put方法寫(xiě)入1個(gè)字節(jié),putInt一次要讀取4個(gè)字節(jié),所以position一次性推進(jìn)4字節(jié)
int nextPutIndex = nextPutIndex(4);
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(nextPutIndex);
// 向指定的index處寫(xiě)入大小為4個(gè)字節(jié)的1個(gè)int數(shù)據(jù)(基于bigEndian,決定寫(xiě)入字節(jié)的順序(大端還是小端))
BitsUtil.putInt(this, finallyIndex, value, bigEndian);
return this;
}
@Override
public MyByteBuffer putInt(int index, int value) {
// 檢查index的合法性,必須是0 <= index < limit-4,避免越界
checkIndex(index,4);
// 加上offset偏移量獲得最終的index值
int finallyIndex = ix(index);
// 向指定的index處寫(xiě)入大小為4個(gè)字節(jié)的1個(gè)int數(shù)據(jù)(基于bigEndian,決定寫(xiě)入字節(jié)的順序(大端還是小端))
BitsUtil.putInt(this, finallyIndex, value, bigEndian);
return this;
}
@Override
public boolean isReadOnly() {
// 可讀/可寫(xiě)
return false;
}
}
jdk中Buffer的體系是一個(gè)非常強(qiáng)大而又復(fù)雜的體系,上面介紹的關(guān)于Buffer體系的內(nèi)容只是其中比較核心的部分內(nèi)容。除此之外還有很多關(guān)于Buffer內(nèi)存映射、零拷貝、切片視圖、堆外內(nèi)存回收等等更多的內(nèi)容限于篇幅在本文中完全沒(méi)有涉及。
但對(duì)于幫助讀者理解為什么Netty要在jdk的ByteBuffer基礎(chǔ)上再封裝一個(gè)ByteBuf,個(gè)人認(rèn)為這些內(nèi)容已經(jīng)足夠了。
2. Netty ByteBuf介紹
前面一節(jié)中對(duì)jdk的Buffer容器的核心工作原理進(jìn)行了介紹。Buffer容器是一個(gè)設(shè)計(jì)精巧又功能強(qiáng)大的工具,但其還是存在著以下明顯缺點(diǎn):
- 由于存在讀寫(xiě)兩種模式,在使用時(shí)需要時(shí)刻關(guān)注當(dāng)前的模式,并通過(guò)flip、rewind、compact、clear等方法轉(zhuǎn)換模式,一旦搞錯(cuò)模式就會(huì)釀成大錯(cuò)。在面對(duì)較為復(fù)雜的頻繁切換模式的場(chǎng)景時(shí),開(kāi)發(fā)者的心智負(fù)擔(dān)會(huì)很重。
- 不支持自動(dòng)的擴(kuò)容。Buffer容器通過(guò)一個(gè)底層數(shù)組來(lái)存儲(chǔ)元素,其自身與數(shù)組一樣不支持動(dòng)態(tài)的擴(kuò)容,一旦在創(chuàng)建時(shí)指定了capacity,后續(xù)無(wú)法再擴(kuò)大。
不支持?jǐn)U容的Buffer,要么在創(chuàng)建時(shí)預(yù)設(shè)一個(gè)非常大的capacity,要么就需要在容量不足時(shí)手動(dòng)的將Buffer中的數(shù)據(jù)轉(zhuǎn)移到新的更大空間的Buffer中。前者會(huì)浪費(fèi)內(nèi)存,后者則非常麻煩且性能不高。 - jdk的DirectBuffer中的堆外底層數(shù)組的內(nèi)存回收基于PhantomReference,在gc時(shí)觸發(fā)回收動(dòng)作。因此在內(nèi)存回收上存在一定的延時(shí)性,在需要大量創(chuàng)建并銷毀Buffer容器的場(chǎng)景下,性能較差。
- 沒(méi)有支持池化復(fù)用的機(jī)制。每個(gè)Buffer在需要時(shí)都需要臨時(shí)的分配內(nèi)存空間,并在不需要時(shí)進(jìn)行釋放。在Buffer被大量使用的場(chǎng)景下,反復(fù)的創(chuàng)建和銷毀基于Buffer會(huì)對(duì)GC造成很大壓力,而對(duì)于基于堆外內(nèi)存的DirectBuffer來(lái)說(shuō)由于堆外內(nèi)存回收機(jī)制的延遲也會(huì)對(duì)堆外內(nèi)存的使用帶來(lái)不小的壓力。
當(dāng)然,缺點(diǎn)都是比較出來(lái)的,相比起Netty中更強(qiáng)大的ByteBuf,jdk中Buffer的缺點(diǎn)遠(yuǎn)不止此。相信讀者在理解了Netty中ByteBuf體系后,會(huì)加深對(duì)其的理解。
ByteBuf層次體系
與jdk的Buffer體系類似,Netty中的ByteBuf同樣是通過(guò)不同層次子類的組合來(lái)實(shí)現(xiàn)不同屬性的各種ByteBuf。
- Netty作為一個(gè)網(wǎng)絡(luò)框架,其只專注于最通用的Byte類型元素的容器存儲(chǔ),所以底層的容器類直接就是ByteBuf,沒(méi)有IntBuf、ShortBuf這些子類。
- Netty支持基于引用計(jì)數(shù)的自動(dòng)容器回收機(jī)制,可以在容器不再被使用時(shí),即時(shí)的將容器所占用的內(nèi)存回收掉,所以抽象出了AbstractReferenceCountedByteBuf類。
- Netty支持池化容器,因此設(shè)計(jì)了PooledByteBuf子類。
- Netty同樣支持堆內(nèi)和堆外兩種不同內(nèi)存類型,因此更進(jìn)一步的抽象出了PooledHeapByteBuf、PooledDirectByteBuf、UnpooledHeapByteBuf和UnpooledDirectByteBuf這四個(gè)核心子類。
- 除此之外,Netty還提供了注入繞過(guò)數(shù)組越界檢查的基于Unsafe的PooledUnsafeHeapByteBuf、PooledUnsafeDirectByteBuf;也提供了基于切片,邏輯視圖的零拷貝的CompositeByteBuf,以及各種功能強(qiáng)大,用處各異的子類實(shí)現(xiàn)。
ByteBuf主要子類示意圖

下面我們來(lái)看看netty是如何優(yōu)化上述jdk的Buffer容器的缺點(diǎn)的。
允許同時(shí)進(jìn)行讀和寫(xiě)
- 首先針對(duì)Buffer中讀寫(xiě)模式無(wú)法共存,需要時(shí)刻注意當(dāng)前模式的問(wèn)題。Netty中的ByteBuf設(shè)計(jì)中引入了讀寫(xiě)兩個(gè)指針(AbstractByteBuf中的readerIndex和writerIndex),而非只有一個(gè)position指針。
ByteBuf同樣支持相對(duì)讀寫(xiě)(比如readByte、writeByte)與參數(shù)中指定index下標(biāo)位置的絕對(duì)讀寫(xiě)(比如getByte、setByte)。在相對(duì)讀操作會(huì)自動(dòng)的推進(jìn)讀指針readerIndex,而在相對(duì)寫(xiě)操作中則會(huì)自動(dòng)推進(jìn)寫(xiě)指針writerIndex。
因?yàn)橥瑫r(shí)維護(hù)了讀寫(xiě)指針的原因,netty的ByteBuf可以同時(shí)的進(jìn)行讀和寫(xiě),而不用關(guān)心當(dāng)前是屬于什么模式,也無(wú)需使用flip等方法切換形態(tài)。 - 與ByteBuffer類似,netty的ByteBuf內(nèi)部的指針屬性同樣有一個(gè)嚴(yán)格的大小關(guān)系,用于防止寫(xiě)操作超過(guò)容量,也防止讀操作讀取到未實(shí)際寫(xiě)入的非法區(qū)域。即readerIndex <= writerIndex <= capacity <= maxCapacity。
支持自動(dòng)擴(kuò)容
- maxCapacity代表最大容量,與ByteBuffer中capacity類似,是ByteBuf的最大容量限制。但與ByteBuffer不同的是,netty的ByteBuf中的maxCapacity不等于其底層數(shù)組實(shí)際的容量,很多情況下只是起到一個(gè)閾值的作用。
- capacity才代表ByteBuf實(shí)際底層數(shù)組的大小(capacity方法),在寫(xiě)入數(shù)據(jù)時(shí),會(huì)檢查當(dāng)前的capacity是否足以滿足寫(xiě)入的要求。如果發(fā)現(xiàn)capacity不足時(shí),會(huì)觸發(fā)自動(dòng)擴(kuò)容。
自動(dòng)擴(kuò)容的capacity無(wú)論如何不能擴(kuò)容到超過(guò)maxCapacity,如果超過(guò)maxCapacity依然無(wú)法放入新寫(xiě)入的數(shù)據(jù)則會(huì)報(bào)錯(cuò)(IndexOutOfBoundsException)。
未超過(guò)maxCapacity時(shí),但寫(xiě)入的數(shù)據(jù)量超過(guò)當(dāng)前底層數(shù)組容量時(shí)則會(huì)進(jìn)行擴(kuò)容。擴(kuò)容時(shí),如果當(dāng)前底層數(shù)組的大小低于閾值(4M)時(shí),則會(huì)較為激進(jìn)的2倍數(shù)擴(kuò)容,以減少未來(lái)可能繼續(xù)擴(kuò)容的次數(shù);當(dāng)超過(guò)閾值時(shí),則以較為保守的方式進(jìn)行擴(kuò)容,以盡量的節(jié)約內(nèi)存。(ByteBufAllocator.calculateNewCapacity方法) - 通過(guò)maxCapacity和capacity兩個(gè)不同容量屬性的設(shè)計(jì),ByteBuf在能控制最大內(nèi)存使用量的前提下,能夠不一口氣申請(qǐng)一個(gè)極大的數(shù)組,而是按需的使用內(nèi)存。
ByteBuf結(jié)構(gòu)圖

支持基于引用計(jì)數(shù)的手動(dòng)內(nèi)存釋放
- Netty的ByteBuf能夠基于引用計(jì)數(shù)機(jī)制來(lái)實(shí)現(xiàn)手動(dòng)的內(nèi)存資源釋放(AbstractReferenceCountedByteBuf)。在ByteBuf容器被創(chuàng)建時(shí),其被引用數(shù)被初始化為1。當(dāng)使用者認(rèn)為不需要再使用容器時(shí),就將被引用數(shù)自減1(release方法)。當(dāng)容器的被引用數(shù)被設(shè)置為0時(shí),則會(huì)觸發(fā)ByteBuf容器的銷毀流程,釋放掉底層數(shù)組所占用的內(nèi)存空間。
當(dāng)ByteBuf容器需要交給其它線程會(huì)處理時(shí),需要通過(guò)retain方法增加被引用數(shù),避免其因?yàn)槠渌褂谜遰elease而被提前銷毀。 - 因?yàn)锽yteBuf本身不會(huì)互相引用而出現(xiàn)循環(huán)依賴,所以引用計(jì)數(shù)的內(nèi)存管理機(jī)制是非常高效的。比起依賴jvm的gc機(jī)制,在確定不再使用ByteBuf時(shí)主動(dòng)的釋放,雖然略微的增加了開(kāi)發(fā)者的心智負(fù)擔(dān),但卻可以大幅的減輕gc的壓力。
Netty作為一個(gè)高性能網(wǎng)絡(luò)框架,實(shí)際工作中都是通過(guò)ByteBuf容器來(lái)進(jìn)行通信,往往會(huì)在短時(shí)間內(nèi)大量創(chuàng)建并銷毀ByteBuf。如果完全依靠gc周期性的回收,那么會(huì)給系統(tǒng)帶來(lái)巨大的壓力。
基于引用計(jì)數(shù)的內(nèi)存管理能夠主動(dòng)和實(shí)時(shí)的進(jìn)行內(nèi)存回收,將內(nèi)存回收的動(dòng)作較為均勻的分?jǐn)偟矫恳粋€(gè)時(shí)間段內(nèi),大大增強(qiáng)了系統(tǒng)的穩(wěn)定性。 - 其實(shí)就像需要手動(dòng)進(jìn)行對(duì)象回收的語(yǔ)言(比如C語(yǔ)言)在內(nèi)存回收上比自動(dòng)垃圾回收的語(yǔ)言(java)性能通常更好一樣,自動(dòng)的gc雖然解放了開(kāi)發(fā)者的心智負(fù)擔(dān),但比起精細(xì)的手工管理、實(shí)時(shí)的釋放,其在性能上還是略遜一籌。
支持ByteBuf容器的池化存儲(chǔ)
- 池化的ByteBuf容器在創(chuàng)建時(shí),底層數(shù)組所需要的內(nèi)存在絕大多數(shù)情況下都能從已經(jīng)預(yù)先申請(qǐng)好的內(nèi)存區(qū)域中獲得,實(shí)際使用中僅需要進(jìn)行一些標(biāo)記即可,無(wú)需jvm或者操作系統(tǒng)進(jìn)行真實(shí)的內(nèi)存分配。而在ByteBuf容器不再使用而被釋放時(shí),也僅僅需要修改一些針對(duì)內(nèi)存區(qū)域控制權(quán)的即可,不需要進(jìn)行實(shí)際的內(nèi)存回收操作。
池化的容器機(jī)制對(duì)gc非常友好,與平常接觸到的各種連接池、對(duì)象池一樣,通過(guò)避免大量初始化與銷毀的開(kāi)銷,極大的提高了使用特定對(duì)象的吞吐量。 - ByteBuf容器池化存儲(chǔ)相關(guān)的工作原理比較復(fù)雜,我們?cè)贛yNetty后續(xù)的迭代中會(huì)逐一實(shí)現(xiàn)并在博客中介紹其工作原理。本期關(guān)于ByteBuf的介紹僅限于Unpooled非池化的實(shí)現(xiàn)。
public abstract class MyAbstractByteBuf extends MyByteBuf {
// 。。。 僅保留核心邏輯
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
/**
* 是否在編輯讀/寫(xiě)指針的時(shí)候進(jìn)行邊界校驗(yàn)
* 默認(rèn)為true,設(shè)置為false可以不進(jìn)行校驗(yàn)從而略微的提高性能,但可能出現(xiàn)內(nèi)存越界的問(wèn)題
*/
private static final boolean checkBounds = SystemPropertyUtil.getBoolean("my.netty.check.bounds",true);
protected MyAbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity must > 0");
}
this.maxCapacity = maxCapacity;
}
@Override
public int maxCapacity() {
return maxCapacity;
}
protected final void maxCapacity(int maxCapacity) {
this.maxCapacity = maxCapacity;
}
@Override
public int readerIndex() {
return readerIndex;
}
@Override
public MyByteBuf readerIndex(int readerIndex) {
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
this.readerIndex = readerIndex;
return this;
}
@Override
public int writerIndex() {
return writerIndex;
}
@Override
public MyByteBuf writerIndex(int writerIndex) {
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
this.writerIndex = writerIndex;
return this;
}
@Override
public MyByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public MyByteBuf markWriterIndex() {
markedWriterIndex = writerIndex;
return this;
}
@Override
public byte getByte(int index) {
checkIndex(index,1);
return _getByte(index);
}
@Override
public byte readByte() {
// 檢查是否可以讀1個(gè)字節(jié)
checkReadableBytes0(1);
int i = readerIndex;
byte b = _getByte(i);
// 和jdk的實(shí)現(xiàn)一樣,在getByte的基礎(chǔ)上,推進(jìn)讀指針
readerIndex = i + 1;
return b;
}
protected abstract byte _getByte(int index);
@Override
public MyByteBuf setByte(int index, int value) {
checkIndex(index,1);
_setByte(index, value);
return this;
}
protected abstract void _setByte(int index, int value);
public abstract MyByteBufAllocator alloc();
@Override
public MyByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
@Override
public MyByteBuf writeBytes(byte[] src) {
return writeBytes(src, 0, src.length);
}
@Override
public MyByteBuf readBytes(byte[] dst) {
readBytes(dst, 0, dst.length);
return this;
}
@Override
public boolean isReadable() {
return writerIndex > readerIndex;
}
}
/**
* 參考自netty的UnpooledHeapByteBuf,在其基礎(chǔ)上做了簡(jiǎn)化(只實(shí)現(xiàn)了最基礎(chǔ)的一些功能以作參考)
* */
public class MyUnPooledHeapByteBuf extends MyAbstractReferenceCountedByteBuf{
private final MyByteBufAllocator alloc;
private ByteBuffer tmpNioBuf;
public static final byte[] EMPTY_BYTES = {};
private byte[] array;
public MyUnPooledHeapByteBuf(MyByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
this.array = new byte[initialCapacity];
setIndex(0, 0);
}
@Override
public int capacity() {
// heapByteBuf的capacity就是內(nèi)部數(shù)組的長(zhǎng)度
return array.length;
}
@Override
public MyByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
byte[] oldArray = array;
int oldCapacity = oldArray.length;
if (newCapacity == oldCapacity) {
// 特殊情況,如果與之前的容量一樣則無(wú)事發(fā)生
return this;
}
int bytesToCopy;
if (newCapacity > oldCapacity) {
// 如果新的capacity比之前的大,那么就將原來(lái)內(nèi)部數(shù)組中的內(nèi)容整個(gè)copy到新數(shù)組中
bytesToCopy = oldCapacity;
} else {
// 如果新的capacity比之前的小,那么可能需要截?cái)嘀暗臄?shù)組內(nèi)容
if (writerIndex() > newCapacity) {
// 寫(xiě)指針大于newCapacity,確定需要截?cái)? this.readerIndex = Math.min(readerIndex(), newCapacity);
this.writerIndex = newCapacity;
}
bytesToCopy = newCapacity;
}
// 將原始內(nèi)部數(shù)組中的內(nèi)容copy到新數(shù)組中
byte[] newArray = new byte[newCapacity];
System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
this.array = newArray;
return this;
}
@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
return readBytes;
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
return getBytes(index, out, length, false);
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
} catch (ClosedChannelException ignored) {
return -1;
}
}
@Override
public byte[] array() {
return this.array;
}
@Override
public int arrayOffset() {
// 非Pool的,沒(méi)有偏移量
return 0;
}
@Override
public MyByteBuf getBytes(int index, MyByteBuf dst, int dstIndex, int length) {
// 帶上dst的偏移量
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
return this;
}
@Override
public MyByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
@Override
public MyByteBuf setBytes(int index, MyByteBuf src, int srcIndex, int length) {
// 帶上src的偏移量
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
return this;
}
@Override
public MyByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
@Override
protected byte _getByte(int index) {
return array[index];
}
@Override
protected void _setByte(int index, int value) {
this.array[index] = (byte) value;
}
@Override
public MyByteBufAllocator alloc() {
return alloc;
}
@Override
protected int _getInt(int index) {
return BitsUtil.getInt(this.array,index);
}
@Override
protected int _getIntLE(int index) {
return BitsUtil.getIntLE(this.array,index);
}
@Override
protected void deallocate() {
// heapByteBuf的回收很簡(jiǎn)單,就是清空內(nèi)部數(shù)組,等待gc回收掉原來(lái)的數(shù)組對(duì)象即可
array = EMPTY_BYTES;
}
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
}
private ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
}
return tmpNioBuf;
}
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = ByteBuffer.wrap(array);
}
return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
}
}
/**
* 參考最初版本的netty AbstractReferenceCountedByteBuf實(shí)現(xiàn)(4.16.Final)
* 高版本的netty進(jìn)行了性能上的優(yōu)化,但是細(xì)節(jié)太多,太復(fù)雜。我們這個(gè)netty學(xué)習(xí)的demo更關(guān)注netty整體的結(jié)構(gòu),細(xì)節(jié)上的優(yōu)化先放過(guò)
* 具體原理可以參考大佬的博客:http://www.rzrgm.cn/binlovetech/p/18369244
* */
public abstract class MyAbstractReferenceCountedByteBuf extends MyAbstractByteBuf{
/**
* 原子更新refCnt字段
* */
private static final AtomicIntegerFieldUpdater<MyAbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(MyAbstractReferenceCountedByteBuf.class, "refCnt");
/**
* 被引用的次數(shù)
*
* 主要用于實(shí)現(xiàn)ReferenceCounted接口相關(guān)的邏輯
* */
private volatile int refCnt;
protected MyAbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
// 被創(chuàng)建就說(shuō)明被引用了,被引用數(shù)初始化為1
refCntUpdater.set(this, 1);
}
@Override
public int refCnt() {
return this.refCnt;
}
protected final void setRefCnt(int refCnt) {
refCntUpdater.set(this, refCnt);
}
public MyByteBuf retain() {
return this.retain0(1);
}
public MyByteBuf retain(int increment) {
if(increment <= 0){
throw new IllegalArgumentException("increment must > 0");
}
return this.retain0(increment);
}
private MyByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
// 先算出更新后預(yù)期的值,用于cas
nextCnt = refCnt + increment;
if (nextCnt <= increment) {
// 參數(shù)有問(wèn)題
throw new IllegalArgumentException("illegal retain refCnt=" + refCnt + ", increment=" + increment);
}
// cas原子更新,如果compareAndSet返回false,則說(shuō)明出現(xiàn)了并發(fā)更新
// doWhile循環(huán)重新計(jì)算過(guò),直到更新成功
} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));
return this;
}
public boolean release() {
return this.release0(1);
}
public boolean release(int decrement) {
if(decrement <= 0){
throw new IllegalArgumentException("decrement must > 0");
}
return this.release0(decrement);
}
private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if (refCnt < decrement) {
// 參數(shù)有問(wèn)題,減的太多了
throw new IllegalArgumentException("illegal retain refCnt=" + refCnt + ", decrement=" + decrement);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
if (refCnt == decrement) {
// refCnt減為0了,釋放該byteBuf(具體釋放方式由子類處理)
this.deallocate();
// 返回true,說(shuō)明當(dāng)前release后,成功釋放了
return true;
} else {
// 返回false,說(shuō)明當(dāng)前release后還有別的地方仍然在引用該ByteBuf
return false;
}
}
protected abstract void deallocate();
}
總結(jié)
- 在本篇博客中,先對(duì)jdk的Buffer容器體系進(jìn)行了介紹,并基于Buffer容器的一些缺點(diǎn)引出了Netty的ByteBuf容器。ByteBuf容器在jdk的Buffer容器基礎(chǔ)上,做了非常多的拓展以改進(jìn)Buffer容器的缺點(diǎn)。
- 限于個(gè)人水平,博客中對(duì)jdk的Buffer和Netty的ByteBuf容器的工作原理介紹都是點(diǎn)到即止,僅僅分析了一些最基礎(chǔ)和核心的點(diǎn)。要想更好的理解其底層原理,還是需要讀者仔細(xì)的閱讀資料和源碼才行。
- lab6中實(shí)現(xiàn)的非池化ByteBuf機(jī)制雖然非常簡(jiǎn)單,但為后續(xù)迭代中真正核心且復(fù)雜的PooledByteBuf即池化內(nèi)存管理的實(shí)現(xiàn)打下了基礎(chǔ)。后續(xù)的lab7-lab9中,MyNetty將會(huì)參考netty逐步的實(shí)現(xiàn)一個(gè)簡(jiǎn)化版的池化內(nèi)存管理體系,幫助讀者更好的理解netty。
博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab6_bytebuf 分支),內(nèi)容如有錯(cuò)誤,還請(qǐng)多多指教。
浙公網(wǎng)安備 33010602011771號(hào)