從零開始實現(xiàn)簡易版Netty(二) MyNetty pipeline流水線
從零開始實現(xiàn)簡易版Netty(二) MyNetty pipeline流水線
1. Netty pipeline流水線介紹
在上一篇博客中l(wèi)ab1版本的MyNetty參考netty實現(xiàn)了一個極其精簡的reactor模型。按照計劃,lab2版本的MyNetty需要實現(xiàn)pipeline流水線,以支持不同的邏輯處理模塊的解耦。
由于本文屬于系列博客,讀者需要對之前的博客內(nèi)容有所了解才能更好地理解本文內(nèi)容。
在lab1版本中,MyNetty的EventLoop處理邏輯中,允許使用者配置一個EventHandler,并在處理read事件時調(diào)用其實現(xiàn)的自定義fireChannelRead方法。
這一機制在實現(xiàn)demo中的簡易echo服務(wù)器時是夠用的,但在實際的場景中,一個完備的網(wǎng)絡(luò)程序,業(yè)務(wù)想要處理的IO事件有很多類型,并且不希望在一個大而全的臃腫的處理器中處理所有的IO事件,而是能夠模塊化的拆分不同的處理邏輯,實現(xiàn)架構(gòu)上的靈活解耦。
因此netty提供了pipeline流水線機制,允許用戶在使用netty時能按照自己的需求,按順序組裝自己的處理器鏈條。
1.1 netty的IO事件
在實際的網(wǎng)絡(luò)環(huán)境中,有非常多不同類型的IO事件,最典型的就是讀取來自遠端的數(shù)據(jù)(read)以及向遠端寫出發(fā)送數(shù)據(jù)(write)。
netty對這些IO事件進行了抽象,并允許用戶編寫自定義的處理器監(jiān)聽或是主動觸發(fā)這些事件。
netty按照事件數(shù)據(jù)流的傳播方向?qū)O事件分成了入站(InBound)與出站(OutBound)兩大類,由遠端輸入傳播到本地應(yīng)用程序的事件被叫做入站事件,從本地應(yīng)用程序觸發(fā)向遠端傳播的事件叫出站事件。
主要的入站事件有channelRead、channelActive等,主要的出站事件有write、connect、bind等。
1.2 netty的IO事件處理器與pipeline流水線
針對InBound入站IO事件,netty抽象出了ChannelInboundHandler接口;針對OutBound出站IO事件,netty抽象出了ChannelOutboundHandler接口。
用戶可以編寫一系列繼承自對應(yīng)ChannelHandler接口的自定義處理器,將其綁定到ChannelPipeline中。每一個Channel都對應(yīng)一個ChannelPipeline,ChannelPipeline實例是獨屬于某個特定channel連接的。

2. MyNetty實現(xiàn)pipeline流水線
經(jīng)過上述對于netty的IO事件與pipeline流水線簡要介紹后,讀者對netty的流水線雖然有了一定的概念,但對具體的細節(jié)還是知之甚少。下面我們結(jié)合MyNetty的源碼,展開介紹netty的流水線機制實現(xiàn)。
2.1 MyNetty的事件處理器
/**
* 事件處理器(相當(dāng)于netty中ChannelInboundHandler和ChannelOutboundHandler合在一起)
* */
public interface MyChannelEventHandler {
// ========================= inbound入站事件 ==============================
void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception;
void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) throws Exception;
// ========================= outbound出站事件 ==============================
void close(MyChannelHandlerContext ctx) throws Exception;
void write(MyChannelHandlerContext ctx, Object msg) throws Exception;
}
- 前面說到,netty將入站與出站事件用兩個不同的ChannelEventHandler接口進行了抽象,而在MyNetty中因為最終要支持的IO事件沒有netty那么多,所以出站、入站的處理接口進行了合并。
這樣做雖然在架構(gòu)上不如netty那樣拆分開來的設(shè)計優(yōu)雅,但相對來說理解起來會更加簡單。 - 未來MyChannelEventHandler還會隨著迭代支持更多的IO事件,但這是個漸進的過程,目前l(fā)ab2中只需要支持少數(shù)幾個IO事件便能滿足需求。
2.2 MyNetty的pipeline流水線與ChannelHandler上下文
MyNetty pipeline流水線實現(xiàn)
public interface MyChannelEventInvoker {
// ========================= inbound入站事件 ==============================
void fireChannelRead(Object msg);
void fireExceptionCaught(Throwable cause);
// ========================= outbound出站事件 ==============================
void close();
void write(Object msg);
}
/**
* pipeline首先自己也是一個Invoker
*
* 包括head和tail兩個哨兵節(jié)點
* */
public class MyChannelPipeline implements MyChannelEventInvoker {
private static final Logger logger = LoggerFactory.getLogger(MyChannelPipeline.class);
private final MyNioChannel channel;
/**
* 整條pipeline上的,head和tail兩個哨兵節(jié)點
*
* inbound入站事件默認都從head節(jié)點開始向tail傳播
* outbound出站事件默認都從tail節(jié)點開始向head傳播
* */
private final MyAbstractChannelHandlerContext head;
private final MyAbstractChannelHandlerContext tail;
public MyChannelPipeline(MyNioChannel channel) {
this.channel = channel;
head = new MyChannelPipelineHeadContext(this);
tail = new MyChannelPipelineTailContext(this);
head.setNext(tail);
tail.setPrev(head);
}
@Override
public void fireChannelRead(Object msg) {
// 從head節(jié)點開始傳播讀事件(入站)
MyChannelPipelineHeadContext.invokeChannelRead(head,msg);
}
@Override
public void fireExceptionCaught(Throwable cause) {
// 異常傳播到了pipeline的末尾,打印異常信息
onUnhandledInboundException(cause);
}
@Override
public void close() {
// 出站事件,從尾節(jié)點向頭結(jié)點傳播
tail.close();
}
@Override
public void write(Object msg) {
tail.write(msg);
}
public void addFirst(MyChannelEventHandler handler){
// 非sharable的handler是否重復(fù)加入的校驗
checkMultiplicity(handler);
MyAbstractChannelHandlerContext newCtx = newContext(handler);
MyAbstractChannelHandlerContext oldFirstCtx = head.getNext();
newCtx.setPrev(head);
newCtx.setNext(oldFirstCtx);
head.setNext(newCtx);
oldFirstCtx.setPrev(newCtx);
}
public void addLast(MyChannelEventHandler handler){
// 非sharable的handler是否重復(fù)加入的校驗
checkMultiplicity(handler);
MyAbstractChannelHandlerContext newCtx = newContext(handler);
// 加入鏈表尾部節(jié)點之前
MyAbstractChannelHandlerContext oldLastCtx = tail.getPrev();
newCtx.setPrev(oldLastCtx);
newCtx.setNext(tail);
oldLastCtx.setNext(newCtx);
tail.setPrev(newCtx);
}
private static void checkMultiplicity(MyChannelEventHandler handler) {
if (handler instanceof MyChannelEventHandlerAdapter) {
MyChannelEventHandlerAdapter h = (MyChannelEventHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
// 一個handler實例不是sharable,但是被加入到了pipeline一次以上,有問題
throw new MyNettyException(
h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
}
// 第一次被引入,當(dāng)前handler實例標記為已加入
h.added = true;
}
}
public MyNioChannel getChannel() {
return channel;
}
private void onUnhandledInboundException(Throwable cause) {
logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
}
private MyAbstractChannelHandlerContext newContext(MyChannelEventHandler handler) {
return new MyDefaultChannelHandlerContext(this,handler);
}
}
- pipeline實現(xiàn)了ChannelEventInvoker接口,ChannelEventInvoker與ChannelEventHandler中對應(yīng)IO事件的方法是一一對應(yīng)的,唯一的區(qū)別在于其方法中缺失了(MyChannelHandlerContext ctx)參數(shù)。
Invoker接口用于netty內(nèi)部觸發(fā)流水線的事件傳播,而Handler接口用于用戶自定義IO事件觸發(fā)時的事件處理器。 - 同時,pipeline流水線中定義了兩個關(guān)鍵屬性,head和tail,其都是AbstractChannelHandlerContext類型的,其內(nèi)部工作原理我們在下一小節(jié)展開。
pipeline提供了addFirst和addLast兩個方法(netty中提供了非常多功能類似的方法,MyNetty簡單起見只實現(xiàn)了最常用的兩個),允許將用戶自定義的ChannelHandler掛載在pipeline中,與head、tail組成一個雙向鏈表,而入站出站事件會按照雙向鏈表中節(jié)點的順序進行傳播。 - 對于入站事件(比如fireChannelRead),事件從head節(jié)點開始,從前到后的在流水線的handler鏈表中傳播;而出站事件(比如write), 事件則從tail節(jié)點開始,從后往前的在流水線的handler鏈表中傳播。
2.3 MyNetty ChannelHandlerContext上下文實現(xiàn)
下面我們來深入講解ChannelHandlerContext上下文原理,看看一個具體的事件在pipeline的雙向鏈表中的傳播是如何實現(xiàn)的。
MyChannelHandlerContext上下文接口定義
public interface MyChannelHandlerContext extends MyChannelEventInvoker {
MyNioChannel channel();
MyChannelEventHandler handler();
MyChannelPipeline getPipeline();
MyNioEventLoop executor();
}
MyAbstractChannelHandlerContext上下文骨架類
public abstract class MyAbstractChannelHandlerContext implements MyChannelHandlerContext{
private static final Logger logger = LoggerFactory.getLogger(MyAbstractChannelHandlerContext.class);
private final MyChannelPipeline pipeline;
private final int executionMask;
/**
* 雙向鏈表前驅(qū)/后繼節(jié)點
* */
private MyAbstractChannelHandlerContext prev;
private MyAbstractChannelHandlerContext next;
public MyAbstractChannelHandlerContext(MyChannelPipeline pipeline, Class<? extends MyChannelEventHandler> handlerClass) {
this.pipeline = pipeline;
this.executionMask = MyChannelHandlerMaskManager.mask(handlerClass);
}
@Override
public MyNioChannel channel() {
return pipeline.getChannel();
}
public MyAbstractChannelHandlerContext getPrev() {
return prev;
}
public void setPrev(MyAbstractChannelHandlerContext prev) {
this.prev = prev;
}
public MyAbstractChannelHandlerContext getNext() {
return next;
}
public void setNext(MyAbstractChannelHandlerContext next) {
this.next = next;
}
@Override
public MyNioEventLoop executor() {
return this.pipeline.getChannel().getMyNioEventLoop();
}
@Override
public void fireChannelRead(Object msg) {
// 找到當(dāng)前鏈條下最近的一個支持channelRead方法的MyAbstractChannelHandlerContext(inbound事件,從前往后找)
MyAbstractChannelHandlerContext nextHandlerContext = findContextInbound(MyChannelHandlerMaskManager.MASK_CHANNEL_READ);
// 調(diào)用找到的那個ChannelHandlerContext其handler的channelRead方法
MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
if(myNioEventLoop.inEventLoop()){
invokeChannelRead(nextHandlerContext,msg);
}else{
// 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
myNioEventLoop.execute(()->{
invokeChannelRead(nextHandlerContext,msg);
});
}
}
@Override
public void fireExceptionCaught(Throwable cause) {
// 找到當(dāng)前鏈條下最近的一個支持exceptionCaught方法的MyAbstractChannelHandlerContext(inbound事件,從前往后找)
MyAbstractChannelHandlerContext nextHandlerContext = findContextInbound(MyChannelHandlerMaskManager.MASK_EXCEPTION_CAUGHT);
// 調(diào)用找到的那個ChannelHandlerContext其handler的exceptionCaught方法
MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
if(myNioEventLoop.inEventLoop()){
invokeExceptionCaught(nextHandlerContext,cause);
}else{
// 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
myNioEventLoop.execute(()->{
invokeExceptionCaught(nextHandlerContext,cause);
});
}
}
@Override
public void close() {
// 找到當(dāng)前鏈條下最近的一個支持close方法的MyAbstractChannelHandlerContext(outbound事件,從后往前找)
MyAbstractChannelHandlerContext nextHandlerContext = findContextOutbound(MyChannelHandlerMaskManager.MASK_CLOSE);
MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
if(myNioEventLoop.inEventLoop()){
doClose(nextHandlerContext);
}else{
// 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
myNioEventLoop.execute(()->{
doClose(nextHandlerContext);
});
}
}
private void doClose(MyAbstractChannelHandlerContext nextHandlerContext){
try {
nextHandlerContext.handler().close(nextHandlerContext);
} catch (Throwable t) {
logger.error("{} do close error!",nextHandlerContext,t);
}
}
@Override
public void write(Object msg) {
// 找到當(dāng)前鏈條下最近的一個支持write方法的MyAbstractChannelHandlerContext(outbound事件,從后往前找)
MyAbstractChannelHandlerContext nextHandlerContext = findContextOutbound(MyChannelHandlerMaskManager.MASK_WRITE);
MyNioEventLoop myNioEventLoop = nextHandlerContext.executor();
if(myNioEventLoop.inEventLoop()){
doWrite(nextHandlerContext,msg);
}else{
// 防并發(fā),每個針對channel的操作都由自己的eventLoop線程去執(zhí)行
myNioEventLoop.execute(()->{
doWrite(nextHandlerContext,msg);
});
}
}
private void doWrite(MyAbstractChannelHandlerContext nextHandlerContext, Object msg) {
try {
nextHandlerContext.handler().write(nextHandlerContext,msg);
} catch (Throwable t) {
logger.error("{} do write error!",nextHandlerContext,t);
}
}
@Override
public MyChannelPipeline getPipeline() {
return pipeline;
}
public static void invokeChannelRead(MyAbstractChannelHandlerContext next, Object msg) {
try {
next.handler().channelRead(next, msg);
}catch (Throwable t){
// 處理拋出的異常
next.invokeExceptionCaught(t);
}
}
public static void invokeExceptionCaught(MyAbstractChannelHandlerContext next, Throwable cause) {
next.invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
try {
this.handler().exceptionCaught(this, cause);
} catch (Throwable error) {
// 如果捕獲異常的handler依然拋出了異常,則打印debug日志
logger.error(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
}
}
private MyAbstractChannelHandlerContext findContextInbound(int mask) {
MyAbstractChannelHandlerContext ctx = this;
do {
// inbound事件,從前往后找
ctx = ctx.next;
} while (needSkipContext(ctx, mask));
return ctx;
}
private MyAbstractChannelHandlerContext findContextOutbound(int mask) {
MyAbstractChannelHandlerContext ctx = this;
do {
// outbound事件,從后往前找
ctx = ctx.prev;
} while (needSkipContext(ctx, mask));
return ctx;
}
private static boolean needSkipContext(MyAbstractChannelHandlerContext ctx, int mask) {
// 如果與運算后為0,說明不支持對應(yīng)掩碼的操作,需要跳過
return (ctx.executionMask & (mask)) == 0;
}
}
MyChannelPipelineHeadContext pipeline哨兵頭結(jié)點
/**
* pipeline的head哨兵節(jié)點
* */
public class MyChannelPipelineHeadContext extends MyAbstractChannelHandlerContext implements MyChannelEventHandler {
public MyChannelPipelineHeadContext(MyChannelPipeline pipeline) {
super(pipeline,MyChannelPipelineHeadContext.class);
}
@Override
public void channelRead(MyChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
@Override
public void close(MyChannelHandlerContext ctx) throws Exception {
// 調(diào)用jdk原生的channel方法,關(guān)閉掉連接
ctx.getPipeline().getChannel().getJavaChannel().close();
}
@Override
public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
// 往外寫的操作,一定是socketChannel
SocketChannel socketChannel = (SocketChannel) ctx.getPipeline().getChannel().getJavaChannel();
if(msg instanceof ByteBuffer){
socketChannel.write((ByteBuffer) msg);
}else{
// msg走到head節(jié)點的時候,必須是ByteBuffer類型
throw new Error();
}
}
@Override
public MyChannelEventHandler handler() {
return this;
}
}
MyChannelPipelineHeadContext pipeline哨兵尾結(jié)點
/**
* pipeline的tail哨兵節(jié)點
* */
public class MyChannelPipelineTailContext extends MyAbstractChannelHandlerContext implements MyChannelEventHandler {
private static final Logger logger = LoggerFactory.getLogger(MyChannelPipelineTailContext.class);
public MyChannelPipelineTailContext(MyChannelPipeline pipeline) {
super(pipeline, MyChannelPipelineTailContext.class);
}
@Override
public void channelRead(MyChannelHandlerContext ctx, Object msg) {
// 如果channelRead事件傳播到了tail節(jié)點,說明用戶自定義的handler沒有處理好,但問題不大,打日志警告下
onUnhandledInboundMessage(ctx,msg);
}
@Override
public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) {
// 如果exceptionCaught事件傳播到了tail節(jié)點,說明用戶自定義的handler沒有處理好,但問題不大,打日志警告下
onUnhandledInboundException(cause);
}
@Override
public void close(MyChannelHandlerContext ctx) throws Exception {
// do nothing
logger.info("close op, tail context do nothing");
}
@Override
public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
// do nothing
logger.info("write op, tail context do nothing");
}
@Override
public MyChannelEventHandler handler() {
return this;
}
private void onUnhandledInboundException(Throwable cause) {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
}
private void onUnhandledInboundMessage(MyChannelHandlerContext ctx, Object msg) {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.getPipeline(), ctx.channel());
}
}
- AbstractChannelHandlerContext作為ChannelHandlerContext子類的基礎(chǔ)骨架,是理解Netty中IO事件傳播機制的重中之重。AbstractChannelHandlerContext做為ChannelPipeline的實際節(jié)點,其擁有prev和next兩個屬性,用于關(guān)聯(lián)鏈表中的前驅(qū)和后繼。
- 在觸發(fā)IO事件時,AbstractChannelHandlerContext會按照一定的規(guī)則(具體原理在下一節(jié)展開)找到下一個需要處理當(dāng)前類型IO事件的事件處理器(findContextInbound與findContextOutBound方法)。
- 在找到后會先判斷當(dāng)前線程與目標MyAbstractChannelHandlerContext的執(zhí)行器線程是否相同(inEventLoop),如果是則直接觸發(fā)對應(yīng)handler的回調(diào)方法;如果不是則將當(dāng)前事件包裝成一個任務(wù)交給next節(jié)點的executor執(zhí)行。
這樣設(shè)計的主要原因是netty作為一個高性能網(wǎng)絡(luò)框架,是非常忌諱使用同步鎖的。EventLoop線程是按照引入taskQueue隊列多寫單讀的方式消費IO事件以及相關(guān)任務(wù)的,這樣可以避免處理IO事件時防止不同線程間并發(fā)而大量加鎖。 - 舉個例子,一個聊天服務(wù)器,用戶a通過連接A發(fā)送了一條消息給服務(wù)端,而服務(wù)端需要通過連接b將消息同步給用戶b,連接a和連接b屬于不同的EventLoop線程。
連接a所在的EventLoop在接受到讀事件后,需要往連接b寫出數(shù)據(jù),此時不能直接由連接a的線程執(zhí)行channel的寫出操作(inEventLoop為false),而必須通過execute方法寫入taskQueue交給管理連接b的EventLoop線程,讓它異步的處理。
試想如果能允許別的EventLoop線程來回調(diào)觸發(fā)不屬于它的channel的IO事件,那么所有的ChannelHandler都必須考慮多線程并發(fā)的問題而被迫引入同步機制,導(dǎo)致性能大幅降低。 - netty中可以在ChannelHandler中主動的觸發(fā)一些IO事件,比如write寫出事件。如果是使用ChannelHandlerContext.write寫出,則傳播的起點是當(dāng)前Handler節(jié)點;而如果是ChannelHandlerContext.channel.write的方式寫出,其底層就是調(diào)用的是pipeline.write,其傳播的起點則是tail哨兵節(jié)點。
結(jié)合MyNetty中上述pipeline相關(guān)的代碼,相信讀者應(yīng)該能更好的理解netty中的這一傳播機制。
2.4 ChannelHandler mask掩碼過濾機制
通常情況,用戶自定義的IO事件處理器一般都是各司其職的,不會對每一種IO事件都感興趣。比如最經(jīng)典的編解碼handler,一般來說encode編碼處理器只關(guān)心寫出到遠端的出站事件,而decode解碼處理器只關(guān)心讀取到數(shù)據(jù)的入站事件。
但編碼、解碼處理器都是位于pipeline的同一個鏈表中的,因此IO事件理論上會在鏈表中的所有處理器中傳播。同時由于netty允許ChannelHandler在內(nèi)部自行決定是否將事件往下一個handler節(jié)點傳播,因此如果不引入特別的機制,則意味著用戶自定義的每一個ChannelHandler都必須實現(xiàn)所有的接口方法,并在內(nèi)部添加模版代碼來確保事件能夠繼續(xù)在pipeline中傳播(比如都必須實現(xiàn)fireChannelRead方法,并且都調(diào)用ctx.fireChannelRead方法讓事件能向后傳播)。
netty中為ChannelHandler定義了非常多的IO事件接口,如果每個ChannelHandler都必須實現(xiàn)所有的IO事件接口,netty的用戶在實現(xiàn)自定義處理器時會非常痛苦,同時在高并發(fā)下不必要的方法調(diào)用也會對性能有所影響。
為了解決上述問題,netty提供了Skip機制,允許用戶在編寫自定義處理器時僅關(guān)心自己感興趣的IO事件,而其它事件在進行傳播時能自動的跳過當(dāng)前handler節(jié)點在pipeline中繼續(xù)傳播。
在2.3的AbstractChannelHandlerContext實現(xiàn)中,可以發(fā)現(xiàn)事件傳播的過程中關(guān)鍵的兩個方法(findContextInbound/findContextOutbound)都是基于needSkipContext方法來實現(xiàn)的。
needSkipContext方法中基于AbstractChannelHandlerContext中的一個屬性executionMask來決定是否需要跳過某個ChannelHandler。
下面我們結(jié)合MyNetty的源碼來看看這個executionMask屬性是如何被計算得出,又是如何基于該掩碼進行handler過濾的。
/**
* 計算并緩存每一個類型的handler所需要處理方法掩碼的管理器
*
* 參考自netty的ChannelHandlerMask類
* */
public class MyChannelHandlerMaskManager {
private static final Logger logger = LoggerFactory.getLogger(MyChannelHandlerMaskManager.class);
public static final int MASK_EXCEPTION_CAUGHT = 1;
// ==================== inbound ==========================
public static final int MASK_CHANNEL_REGISTERED = 1 << 1;
public static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
public static final int MASK_CHANNEL_ACTIVE = 1 << 3;
public static final int MASK_CHANNEL_INACTIVE = 1 << 4;
public static final int MASK_CHANNEL_READ = 1 << 5;
public static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
// static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
// static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
// ===================== outbound =========================
public static final int MASK_BIND = 1 << 9;
public static final int MASK_CONNECT = 1 << 10;
// static final int MASK_DISCONNECT = 1 << 11;
public static final int MASK_CLOSE = 1 << 12;
// static final int MASK_DEREGISTER = 1 << 13;
public static final int MASK_READ = 1 << 14;
public static final int MASK_WRITE = 1 << 15;
public static final int MASK_FLUSH = 1 << 16;
private static final ThreadLocal<Map<Class<? extends MyChannelEventHandler>, Integer>> MASKS =
ThreadLocal.withInitial(() -> new WeakHashMap<>(32));
public static int mask(Class<? extends MyChannelEventHandler> clazz) {
// 對于非共享的handler,會隨著channel的創(chuàng)建而被大量創(chuàng)建
// 為了避免反復(fù)的計算同樣類型handler的mask掩碼而引入緩存,優(yōu)先從緩存中獲得對應(yīng)處理器類的掩碼
Map<Class<? extends MyChannelEventHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 緩存中不存在,計算出對應(yīng)類型的掩碼值
mask = calculateChannelHandlerMask(clazz);
cache.put(clazz, mask);
}
return mask;
}
private static int calculateChannelHandlerMask(Class<? extends MyChannelEventHandler> handlerType) {
int mask = 0;
// MyChannelEventHandler中的方法一一對應(yīng),如果支持就通過掩碼的或運算將對應(yīng)的bit位設(shè)置為1
if(!needSkip(handlerType,"channelRead", MyChannelHandlerContext.class,Object.class)){
mask |= MASK_CHANNEL_READ;
}
if(!needSkip(handlerType,"exceptionCaught", MyChannelHandlerContext.class,Throwable.class)){
mask |= MASK_EXCEPTION_CAUGHT;
}
if(!needSkip(handlerType,"close", MyChannelHandlerContext.class)){
mask |= MASK_CLOSE;
}
if(!needSkip(handlerType,"write", MyChannelHandlerContext.class,Object.class)){
mask |= MASK_WRITE;
}
return mask;
}
private static boolean needSkip(Class<?> handlerType, String methodName, Class<?>... paramTypes) {
try {
Method method = handlerType.getMethod(methodName, paramTypes);
// 如果有skip注解,說明需要跳過
return method.isAnnotationPresent(Skip.class);
} catch (NoSuchMethodException e) {
// 沒有這個方法,就不需要設(shè)置掩碼
return false;
}
}
}
/**
* 用于簡化用戶自定義的handler的適配器
*
* 由于所有支持的方法都加上了@Skip注解,子類只需要重寫想要關(guān)注的方法即可,其它未重寫的方法將會在事件傳播時被跳過
* */
public class MyChannelEventHandlerAdapter implements MyChannelEventHandler{
/**
* 當(dāng)前是否已經(jīng)被加入sharable緩存
* */
public volatile boolean added;
@Skip
@Override
public void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void exceptionCaught(MyChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Skip
@Override
public void close(MyChannelHandlerContext ctx) throws Exception {
ctx.close();
}
@Skip
@Override
public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
private static ConcurrentHashMap<Class<?>, Boolean> isSharableCacheMap = new ConcurrentHashMap<>();
public boolean isSharable() {
/**
* MyNetty中直接用全局的ConcurrentHashMap來緩存handler類是否是sharable可共享的,實現(xiàn)起來很簡單
* 而netty中利用FastThreadLocal做了優(yōu)化,避免了不同線程之間的鎖爭搶
* 高并發(fā)下每分每秒都會創(chuàng)建大量的鏈接以及所屬的Handler,優(yōu)化后性能會有很大提升
*
* See <a >#2289</a>.
*/
Class<?> clazz = getClass();
Boolean sharable = isSharableCacheMap.computeIfAbsent(
clazz, k -> clazz.isAnnotationPresent(Sharable.class));
return sharable;
}
}
- 在ChannelHandler被加入到pipeline時,會被包裝成AbstractChannelHandlerContext節(jié)點加入鏈表。在AbstractChannelHandlerContext的構(gòu)造方法中,計算出對應(yīng)ChannelHandler的掩碼。
- ChannelHandler中的每個IO事件的方法都對應(yīng)mask掩碼的一個bit位,bit位為1則代表對該IO事件感興趣,為0則代表不感興趣需要跳過。在IO事件傳播時,通過對應(yīng)掩碼進行與操作快速的判斷是否需要跳過該節(jié)點。
- 具體每一位的掩碼值是通過方法上是否含有@Skip注解來判斷的,帶上了該注解就表示對當(dāng)前IO事件不感興趣,傳播時需要跳過該ChannelHandler。
- 掩碼的計算引入了map緩存,相同類型的ChannelHandler實例的掩碼不需要重復(fù)計算,在創(chuàng)建大量連接時,其對應(yīng)pipeline中的AbstractChannelHandlerContext實例也會被大量創(chuàng)建,使用緩存能很好的提高性能。
- Netty為入站,出站的ChannelHandler分別提供了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter兩個適配器,其方法中默認都帶上了@Skip注解。
在實際開發(fā)時,用戶可以選擇令自己的自定義ChannelHandler繼承對應(yīng)的Adapter,重寫感興趣的IO事件的方法。重寫后的方法不會帶@Skip注解,會在IO事件傳播時觸發(fā)其自定義的方法邏輯。
2.5 @Sharable防共享檢測原理簡單介紹
netty還提供了防共享檢測機制,用來避免用戶錯誤的使用共享ChannelHandler。
- 正常情況下,每個ChannelPipeline中對應(yīng)的ChannelEventHandler實例都是互相獨立的,但在一些場景下使用共享的ChannelHandler能帶來更好的性能。對于一些無狀態(tài)的,或者架構(gòu)上就是全局唯一的handler(比如dubbo中維護業(yè)務(wù)線程池的Handler),令其在不同的Channel中共享是一個好的選擇。
- netty會在ChannelHandler加入到pipeline時對其進行檢查,如果存在一個ChannelHandler實例被不止一次的注冊到netty中,netty會認為其被錯誤的注冊。因為默認情況下,一個ChannelHandler實例不能同時被注冊到一個以上的channel中,否則其將出現(xiàn)并發(fā)問題,netty會拋出異常來警告用戶。
而只有當(dāng)用戶在對應(yīng)的ChannelHandler上顯式標記上@Sharable注解,明確了其就是可以共享,已經(jīng)考慮過并發(fā)的可能性時,才能在重復(fù)注冊時通過校驗。 - 從個人的經(jīng)歷來說,我在初次使用netty時曾對@Sharable注解的功能有過誤解。第一感覺是在構(gòu)造流水線時,被打上了@Sharable注解的Handler會類似spring的單例模式一樣,即使重復(fù)注冊也會被netty自動的弄成全局唯一。
但在了解了其工作原理后發(fā)現(xiàn)是反過來的,@Sharable更多的是起到一個檢查的作用,避免用戶錯誤的重復(fù)注冊并發(fā)不安全的ChannelHandler。
2.6 EventLoop改造接入pipeline流水線
目前l(fā)ab2版本的EventLoop還比較簡單,只是在處理讀事件的時候從原來的直接調(diào)用EventHandler的fireChannelRead方法,改造成了調(diào)用pipeline的fireChannelRead方法,令讀事件在整個ChannelHandler流水線中傳播。
private void processReadEvent(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel)key.channel();
// 目前所有的attachment都是MyNioChannel
MyNioSocketChannel myNioChannel = (MyNioSocketChannel) key.attachment();
// 簡單起見,buffer不緩存,每次讀事件來都新創(chuàng)建一個
// 暫時也不考慮黏包/拆包場景(Netty中靠ByteToMessageDecoder解決,后續(xù)再分析其原理),理想的認為每個消息都小于1024,且每次讀事件都只有一個消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int byteRead = socketChannel.read(readBuffer);
logger.info("processReadEvent byteRead={}",byteRead);
if(byteRead == -1){
// 簡單起見不考慮tcp半連接的情況,返回-1直接關(guān)掉連接
socketChannel.close();
}else{
// 將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作
readBuffer.flip();
// 根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
byte[] bytes = new byte[readBuffer.remaining()];
// 將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
readBuffer.get(bytes);
if(myNioChannel != null) {
// 觸發(fā)pipeline的讀事件
myNioChannel.getChannelPipeline().fireChannelRead(bytes);
}else{
logger.error("processReadEvent attachment myNioChannel is null!");
}
}
}
3.MyNettyBootstrap與新版本Echo服務(wù)器demo實現(xiàn)
在實現(xiàn)了pipeline流水線功能后,配置自定義事件處理器的方式也要有所改變,MyNetty參考netty實現(xiàn)了一個簡單的Client/Server的Bootstrap。
其中構(gòu)建pipeline的方式與netty有所不同,netty中使用了一個特殊的ChannelInboundHandler,即ChannelInitializer。ChannelInitializer會在連接被注冊時觸發(fā)initChannel方法,執(zhí)行用戶自定義的組裝pipeline的邏輯,然后再將這個特殊的Handler從鏈表中remove掉以完成最終channel鏈表的構(gòu)建。
而MyNetty簡單起見,并沒有支持用戶自定義channel的惰性創(chuàng)建,也不支持在運行時動態(tài)的增加或刪除pipeline中鏈表中的handler(所以沒有那些handler狀態(tài)的臨界值判斷),而是直接設(shè)計了一個MyChannelPipelineSupplier接口,在MyNIOChannel被創(chuàng)建時,也一并創(chuàng)建pipeline中的handler鏈表。
服務(wù)端Bootstrap
public class MyNioServerBootstrap {
private static final Logger logger = LoggerFactory.getLogger(MyNioServerBootstrap.class);
private final InetSocketAddress endpointAddress;
private final MyNioEventLoopGroup bossGroup;
public MyNioServerBootstrap(InetSocketAddress endpointAddress,
MyChannelPipelineSupplier childChannelPipelineSupplier,
int bossThreads, int childThreads) {
this.endpointAddress = endpointAddress;
MyNioEventLoopGroup childGroup = new MyNioEventLoopGroup(childChannelPipelineSupplier,childThreads);
this.bossGroup = new MyNioEventLoopGroup(childChannelPipelineSupplier, bossThreads, childGroup);
}
public void start() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
MyNioEventLoop myNioEventLoop = this.bossGroup.next();
myNioEventLoop.execute(()->{
try {
Selector selector = myNioEventLoop.getUnwrappedSelector();
serverSocketChannel.socket().bind(endpointAddress);
SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
// 監(jiān)聽accept事件
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
logger.info("MyNioServer do start! endpointAddress={}",endpointAddress);
} catch (IOException e) {
logger.error("MyNioServer do bind error!",e);
}
});
}
}
客戶端Bootstrap
public class MyNioClientBootstrap {
private static final Logger logger = LoggerFactory.getLogger(MyNioClientBootstrap.class);
private final InetSocketAddress remoteAddress;
private final MyNioEventLoopGroup eventLoopGroup;
private MyNioSocketChannel myNioSocketChannel;
private final MyChannelPipelineSupplier myChannelPipelineSupplier;
public MyNioClientBootstrap(InetSocketAddress remoteAddress, MyChannelPipelineSupplier myChannelPipelineSupplier) {
this.remoteAddress = remoteAddress;
this.eventLoopGroup = new MyNioEventLoopGroup(myChannelPipelineSupplier, 1);
this.myChannelPipelineSupplier = myChannelPipelineSupplier;
}
public void start() throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
MyNioEventLoop myNioEventLoop = this.eventLoopGroup.next();
myNioEventLoop.execute(()->{
try {
Selector selector = myNioEventLoop.getUnwrappedSelector();
myNioSocketChannel = new MyNioSocketChannel(selector,socketChannel,myChannelPipelineSupplier);
myNioEventLoop.register(myNioSocketChannel);
// doConnect
// Returns: true if a connection was established,
// false if this channel is in non-blocking mode and the connection operation is in progress;
if(!socketChannel.connect(remoteAddress)){
// 簡單起見也監(jiān)聽READ事件,相當(dāng)于netty中開啟了autoRead
int clientInterestOps = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;
myNioSocketChannel.getSelectionKey().interestOps(clientInterestOps);
// 監(jiān)聽connect事件
logger.info("MyNioClient do start! remoteAddress={}",remoteAddress);
}else{
logger.info("MyNioClient do start connect error! remoteAddress={}",remoteAddress);
// connect操作直接失敗,關(guān)閉連接
socketChannel.close();
}
} catch (IOException e) {
logger.error("MyNioClient do connect error!",e);
}
});
}
}
原來的Echo服務(wù)端/客戶端demo也對邏輯進行了拆分,將業(yè)務(wù)邏輯和編解碼邏輯拆分成了不同的ChannelHandler。
Echo服務(wù)器與客戶端編解碼處理器實現(xiàn)
public class EchoMessageEncoder extends MyChannelEventHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(EchoMessageEncoder.class);
@Override
public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
// 寫事件從tail向head傳播,msg一定是string類型
String message = (String) msg;
ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
writeBuffer.put(message.getBytes(StandardCharsets.UTF_8));
writeBuffer.flip();
logger.info("EchoMessageEncoder message to byteBuffer, " +
"message={}, writeBuffer={}",message,writeBuffer);
ctx.write(writeBuffer);
}
}
public class EchoMessageDecoder extends MyChannelEventHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(EchoMessageDecoder.class);
@Override
public void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception {
// 讀事件從head向tail傳播,msg一定是string類型
String receivedStr = new String((byte[]) msg, StandardCharsets.UTF_8);
logger.info("EchoMessageDecoder byteBuffer to message, " +
"msg={}, receivedStr={}",msg,receivedStr);
// 當(dāng)前版本,不考慮黏包拆包等各種問題,decoder只負責(zé)將byte轉(zhuǎn)為string
ctx.fireChannelRead(receivedStr);
}
}
Echo服務(wù)器與客戶端demo
public class ServerDemo {
public static void main(String[] args) throws IOException {
MyNioServerBootstrap myNioServerBootstrap = new MyNioServerBootstrap(
new InetSocketAddress(8080),
// 先簡單一點,只支持childEventGroup自定義配置pipeline
new MyChannelPipelineSupplier() {
@Override
public MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {
MyChannelPipeline myChannelPipeline = new MyChannelPipeline(myNioChannel);
// 注冊自定義的EchoServerEventHandler
myChannelPipeline.addLast(new EchoMessageEncoder());
myChannelPipeline.addLast(new EchoMessageDecoder());
myChannelPipeline.addLast(new EchoServerEventHandler());
return myChannelPipeline;
}
},1,5);
myNioServerBootstrap.start();
}
}
public class ClientDemo {
public static void main(String[] args) throws IOException {
MyNioClientBootstrap myNioClientBootstrap = new MyNioClientBootstrap(new InetSocketAddress(8080),new MyChannelPipelineSupplier() {
@Override
public MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {
MyChannelPipeline myChannelPipeline = new MyChannelPipeline(myNioChannel);
// 注冊自定義的EchoClientEventHandler
myChannelPipeline.addLast(new EchoMessageEncoder());
myChannelPipeline.addLast(new EchoMessageDecoder());
myChannelPipeline.addLast(new EchoClientEventHandler());
return myChannelPipeline;
}
});
myNioClientBootstrap.start();
}
}
總結(jié)
- 在lab2中,MyNetty實現(xiàn)了pipeline流水線機制,允許用戶構(gòu)造自定義處理器鏈條,進行功能的解耦。同時也提供了一個Bootstrap腳手架幫助用戶更快捷的實現(xiàn)自己的網(wǎng)絡(luò)應(yīng)用程序。
相信在了解了MyNetty的簡易版本流水線功能實現(xiàn)后,能幫助讀者更好的理解netty中更加復(fù)雜的pipeline工作原理。 - 目前為止,受限于MyNetty現(xiàn)版本的簡陋功能,我們的Echo服務(wù)應(yīng)用程序還非常原始,大量極端場景下的臨界條件都沒有處理。比如分配的ByteBuffer是固定大小,無法動態(tài)擴容,接受的消息體過大就會出錯;發(fā)送和接受的消息也存在黏包、拆包的問題,等等。千里之行始于足下,在后續(xù)的lab中,MyNetty會逐步的完善上述提到的問題。
- 在迭代MyNetty的過程中,讀者也將能夠體會到Netty的強大之處。因為在普通使用者無法直接感知的地方,netty底層處理了大量的邊界情況,這才使得普通開發(fā)者能夠基于netty高效的構(gòu)建起一個健壯的網(wǎng)絡(luò)應(yīng)用程序。
博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab2_pipeline_handle 分支),內(nèi)容如有錯誤,還請多多指教。
浙公網(wǎng)安備 33010602011771號