從零開始實現簡易版Netty(一) MyNetty Reactor模式
從零開始實現簡易版Netty(一) MyNetty Reactor模式
自從18年作為一個java程序員入行以來,所接觸到的大量組件如dubbo、rocketmq、redisson等都是基于netty這一高性能網絡框架實現的。
限于個人水平,在過去很長一段時間中都只能算是netty的初級使用者;在使用基于netty的中間件時,總是因為對netty底層不夠了解而導致排查問題時效率不高。
因此,在過去的一段時間中我對netty源碼進行了一定的研究,并以博客的形式將心得分享出來,希望能幫助到同樣對netty工作原理感興趣的讀者。
非常感謝大佬bin的技術小屋,在我學習netty的過程中給了我很大的幫助。
1. MyNetty介紹
不同于大多數博客直接針對netty官方源碼進行解析的方式,本系列博客通過從零到一的實現一個簡易版的netty(即MyNetty)來幫助讀者更好的理解netty的工作原理。
相比于完整版的netty,MyNetty只實現了netty中最核心的功能點,目的是降低復雜度,避免初學者在學習netty的過程中,對netty源碼中復雜的抽象及過深的調用鏈感到畏懼。
本博客會按照以下順序,通過一個接一個的小迭代由簡單到復雜的實現MyNetty,每一個迭代都會有一篇與之對應的技術博客。
- Reactor模式
- Pipeline管道
- 高效的數據讀取
- 高效的數據寫出
- FastThreadLocal
- ByteBuf
- Normal級別的池化內存分配(伙伴算法)
- Small級別的池化內存分配(slab算法)
- 池化內存分配支持線程本地緩存(ThreadLocalCache)
- 常用的編解碼器(FixedLengthFrameDecoder/LineBasedFrameDecoder等)
MyNetty的核心邏輯主要參考自netty 4.1.80.Final版本。
2. 操作系統I/O模型與Reactor模式介紹
作為MyNetty系列的第一篇博客,按照規劃,第一個迭代中需要實現基于NIO的reactor模式。這也是netty最核心的功能,一個基于事件循環的reactor線程工作模型。
在學習的過程中,我們要盡量做到知其然且知其所以然。
因此,在介紹Reactor模式之前,先簡單介紹一下兩種常見的操作系統網絡I/O模型,只要在了解其各自的優缺點后,才能幫助我們更好的理解為什么Netty最終選擇了reactor模式。
2.1 操作系統I/O模型介紹
同步阻塞I/O(BIO)
同步阻塞IO,顧名思義,其讀寫是阻塞性的,在數據還沒有準備好時(比如客戶端還未發送新請求,或者未收到服務端響應),當前處理IO的線程是處于阻塞態的,直到數據就緒(比如接受到客戶端發送的請求,或收到服務端響應)時才會被喚醒。
由于其阻塞的特性,因此在服務端并發時,每一個新的客戶端連接都需要一個獨立的線程來承載。
| BIO | 詳情 |
|---|---|
| 優點 | 簡單易理解,同步阻塞式的線性代碼執行流符合人的直覺。因此普通的web業務后臺服務器大多是基于BIO模型開發的 |
| 缺點 | 由于客戶端連接數與服務器線程數是1:1的,而服務器由于線程上下文切換的CPU開銷和內存大小限制,難以應對大規模的并發連接(大幾千甚至幾萬),性能較差 |
BIO服務端demo
public class BIOEchoServer {
private static final ExecutorService threadPool = Executors.newCachedThreadPool();
public static void main(String[] args) throws IOException {
int port = 8080;
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("BIOServer started on port " + port);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket.getInetAddress());
// 每個新的連接都啟用一個線程去處理
threadPool.execute(
() -> handleClientConnect(clientSocket)
);
}
}
private static void handleClientConnect(Socket clientSocket) {
try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("Received from client: " + inputLine);
// echo message
String responseMessage = "server echo: " + inputLine;
out.println(responseMessage);
System.out.println("Sent response: " + responseMessage);
}
} catch (IOException e) {
System.out.println("Client connection closed: " + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println("clientSocket closed! " + clientSocket.getInetAddress());
} catch (IOException e) {
System.err.println("Error closing client socket: " + e.getMessage());
}
}
}
}
BIO客戶端demo
public class BIOClient {
public static void main(String[] args) throws IOException {
String hostname = "127.0.0.1";
int port = 8080;
try (Socket socket = new Socket(hostname, port);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
System.out.println("Connected to server. Type messages (type 'exit' to quit)");
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println("Server response: " + in.readLine());
}
}
}
}
I/O多路復用
I/O多路復用,顧名思義,其不同于BIO中一個線程對應一個客戶端連接的模式。I/O多路復用模型中,一個服務端線程能夠同時處理多個客戶端連接。
I/O多路復用解決了傳統BIO模型下面對海量并發時系統資源不足的問題,但同時也引入了一些新的問題。
| I/O多路復用 | 詳情 |
|---|---|
| 優點 | 性能好,吞吐量高。單個線程即可處理海量連接 |
| 缺點 | 比起BIO的阻塞模式,基于事件觸發的編程模型非常復雜。 |
IO多路復用服務端demo
public class NIOEchoServer {
public static void main(String[] args) throws IOException {
SelectorProvider selectorProvider = SelectorProvider.provider();
Selector selector = selectorProvider.openSelector();
// 服務端監聽accept事件的channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
for(;;){
try{
int keys = selector.select(60000);
if (keys == 0) {
System.out.println("server 60s未監聽到事件,繼續監聽!");
continue;
}
// processSelectedKeysPlain
Iterator<SelectionKey> selectionKeyItr = selector.selectedKeys().iterator();
while (selectionKeyItr.hasNext()) {
SelectionKey key = selectionKeyItr.next();
System.out.println("process SelectionKey=" + key.readyOps());
try {
// 拿出來后,要把集合中已經獲取到的事件移除掉,避免重復的處理
selectionKeyItr.remove();
if (key.isAcceptable()) {
// 處理accept事件(接受到來自客戶端的連接請求)
processAcceptEvent(key);
}
if (key.isReadable()) {
// 處理read事件
processReadEvent(key);
}
}catch (Exception e){
System.out.println("server event loop process an selectionKey error! " + e.getMessage());
e.printStackTrace();
key.cancel();
if(key.channel() != null){
System.out.println("has error, close channel! " + key.channel());
key.channel().close();
}
}
}
}catch (Exception e){
System.out.println("server event loop error! ");
e.getStackTrace();
}
}
}
private static void processAcceptEvent(SelectionKey key) throws IOException {
// 能收到accept事件的channel一定是ServerSocketChannel
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
// 獲得與客戶端建立的那個連接
SocketChannel socketChannel = ssChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.finishConnect();
System.out.println("socketChannel=" + socketChannel + " finishConnect!");
// 將接受到的連接注冊到同樣的selector中,并監聽read事件
socketChannel.register(key.selector(),SelectionKey.OP_READ);
}
private static void processReadEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel)key.channel();
// 簡單起見,buffer不緩存,每次讀事件來都新創建一個
// 暫時也不考慮黏包/拆包場景(Netty中靠ByteToMessageDecoder解決,后續再分析其原理),理想的認為每個消息都小于1024,且每次讀事件都只有一個消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int byteRead = socketChannel.read(readBuffer);
if(byteRead == -1){
// 簡單起見不考慮tcp半連接的情況,返回-1直接關掉連接
socketChannel.close();
}else{
// 將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作
readBuffer.flip();
// 根據緩沖區可讀字節數創建字節數組
byte[] bytes = new byte[readBuffer.remaining()];
// 將緩沖區可讀字節數組復制到新建的數組中
readBuffer.get(bytes);
String receivedStr = new String(bytes, StandardCharsets.UTF_8);
System.out.println("received message:" + receivedStr + " ,from " + socketChannel.socket().getRemoteSocketAddress());
// 讀完了,echo服務器準備回寫數據到客戶端
String echoMessage = "server echo:" + receivedStr;
ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
writeBuffer.put(echoMessage.getBytes(StandardCharsets.UTF_8));
writeBuffer.flip(); // 寫完了,flip供后續去讀取
socketChannel.write(writeBuffer);
}
}
}
IO多路復用客戶端demo
public class NIOClient {
private static volatile SocketChannel clientSocketChannel;
public static void main(String[] args) throws Exception {
SelectorProvider selectorProvider = SelectorProvider.provider();
Selector selector = selectorProvider.openSelector();
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(()->{
try {
startClient(selector,countDownLatch);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
countDownLatch.await();
System.out.println("please input message:");
while(true){
Scanner sc = new Scanner(System.in);
String msg = sc.next();
System.out.println("get input message:" + msg);
// 發送消息
ByteBuffer writeBuffer = ByteBuffer.allocate(64);
writeBuffer.put(msg.getBytes(StandardCharsets.UTF_8));
writeBuffer.flip(); // 寫完了,flip供后續去讀取
clientSocketChannel.write(writeBuffer);
}
}
private static void startClient(Selector selector, CountDownLatch countDownLatch) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
clientSocketChannel = socketChannel;
// doConnect
// Returns: true if a connection was established,
// false if this channel is in non-blocking mode and the connection operation is in progress;
if(!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080))) {
// 配置為非阻塞,會返回false,通過注冊并監聽connect事件的方式進行交互
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}
for(;;){
try {
int keys = selector.select(60000);
if (keys == 0) {
System.out.println("client 60s未監聽到事件,繼續監聽!");
continue;
}
// processSelectedKeysPlain
Iterator<SelectionKey> selectionKeyItr = selector.selectedKeys().iterator();
while (selectionKeyItr.hasNext()) {
SelectionKey key = selectionKeyItr.next();
try {
System.out.println("process SelectionKey=" + key.readyOps());
// 拿出來后,要把集合中已經獲取到的事件移除掉,避免重復的處理
selectionKeyItr.remove();
if (key.isConnectable()) {
// 處理連接相關事件
processConnectEvent(key,countDownLatch);
}
if (key.isReadable()){
processReadEvent(key);
}
if (key.isWritable()){
System.out.println("watch an write event!");
}
} catch (Exception e) {
System.out.println("client event loop process an selectionKey error! " + e.getMessage());
key.cancel();
if(key.channel() != null){
key.channel().close();
System.out.println("has error, close channel!" );
}
}
}
} catch (Exception e) {
System.out.println("client event loop error! ");
e.getStackTrace();
}
}
}
private static void processConnectEvent(SelectionKey key, CountDownLatch countDownLatch) throws IOException {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
int ops = key.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
key.interestOps(ops);
SocketChannel socketChannel = (SocketChannel) key.channel();
if(socketChannel.finishConnect()){
// 確認完成連接
System.out.println("client channel connected!");
countDownLatch.countDown();
}else{
// 連接建立失敗,程序退出
System.out.println("client channel connect failed!");
System.exit(1);
}
}
private static void processReadEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 創建ByteBuffer,并開辟一個1M的緩沖區
ByteBuffer buffer = ByteBuffer.allocate(64);
// 讀取請求碼流,返回讀取到的字節數
int readBytes = socketChannel.read(buffer);
// 讀取到字節,對字節進行編解碼
if(readBytes > 0){
// 將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作
buffer.flip();
// 根據緩沖區可讀字節數創建字節數組
byte[] bytes = new byte[buffer.remaining()];
// 將緩沖區可讀字節數組復制到新建的數組中
buffer.get(bytes);
String response = new String(bytes, StandardCharsets.UTF_8);
System.out.println("client received response message: " + response);
}
// 讀取到了EOF,關閉連接
if(readBytes < 0){
socketChannel.close();
}
}
}
上述對于操作系統I/O模型的介紹限于篇幅,點到為止。想進一步了解的讀者可以參考我之前寫的博客:談談對不同I/O模型的理解
2.2 Reactor模式
從上面的介紹中我們可以看到,I/O多路復用模型的高性能、高吞吐的特點更加適合互聯網時代海量連接的場景,所以netty自然也是基于I/O多路復用模型的。
但上述給出的I/O多路復用的demo中存在兩個很嚴重的問題,第一個問題是java中NIO的能力過于底層,在開發業務時所需要考慮的細節太多,一個簡單的、不考慮各種異常、邊界場景的echo服務器都要寫近百行的代碼。
第二個問題則是服務端單線程的I/O多路復用模型沒法很好的利用現代的多核CPU硬件,會出現處理大量連接時一核有難八核圍觀的問題。
針對第一個問題,正是netty作為java NIO的更高層次封裝而誕生的原因,我們會在后續的迭代中逐步的優化這一問題。
而第二個問題的解決方案便是本章要引出的主題,reactor模式。
I/O多路復用模型與多線程并不沖突,一個線程可以獨自處理所有連接,也可以用多個線程來均勻的分攤所有來自客戶端的連接。
在reactor模式下,接收連接與處理連接后續讀寫的任務的線程會被分離開。接受客戶端連接的邏輯較為簡單,因此一個線程(cpu核心)通常足夠處理這一任務。
相對的,處理連接建立后的讀寫操作則壓力會大的多,所以需要多個CPU核心(多個線程)來分攤壓力。
在reactor模式下,將專門用于接受連接的線程稱為Boss線程,而連接建立后處理讀寫操作的線程成為Worker線程(Boss工作壓力小,Worker工作壓力大;Boss接了單子后把活直接派給Worker)。
reactor模式示意圖

3. MyNetty reactor模式實現源碼解析
從上文IO多路復用的demo可以看到,程序最核心的邏輯便是處理selector.select獲取到的事件key集合。
當前線程會不斷地嘗試獲取到激活的事件集合,然后按順序處理,并循環往復。這一工作機制被稱為事件循環(EventLoop)。
事件被抽象為4種類型,OP_READ(可讀事件)、OP_WRITE(可寫事件)、OP_CONNECT(連接建立事件)和OP_ACCEPT(連接接受事件),而在demo中我們已經接觸到了除了OP_WRITE事件外的三種(OP_WRITE事件會在lab4高效的數據寫出中再展開介紹)。
針對事件循環,Netty中抽象出了兩個概念,EventLoopGroup和EventLoop,EventLoop對應的就是上述的無限循環處理IO事件的線程,而EventLoopGroup顧名思義便是將一組EventLoop統一管理的集合。
下面我們結合MyNetty的源碼,來進一步講解reactor模式的工作原理。
MyNetty NioServer源碼
public class MyNettyNioServer {
private static final Logger logger = LoggerFactory.getLogger(MyNettyNioServer.class);
private final InetSocketAddress endpointAddress;
private final MyNioEventLoopGroup bossGroup;
public MyNettyNioServer(InetSocketAddress endpointAddress, MyEventHandler myEventHandler,
int bossThreads, int childThreads) {
this.endpointAddress = endpointAddress;
MyNioEventLoopGroup childGroup = new MyNioEventLoopGroup(myEventHandler,childThreads);
this.bossGroup = new MyNioEventLoopGroup(myEventHandler,bossThreads,childGroup);
}
public void start() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
MyNioEventLoop myNioEventLoop = this.bossGroup.next();
myNioEventLoop.execute(()->{
try {
Selector selector = myNioEventLoop.getUnwrappedSelector();
serverSocketChannel.socket().bind(endpointAddress);
SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
// 監聽accept事件
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
logger.info("MyNioServer do start! endpointAddress={}",endpointAddress);
} catch (IOException e) {
logger.error("MyNioServer do bind error!",e);
}
});
}
}
MyNetty NioClient源碼
public class MyNettyNioClient {
private static final Logger logger = LoggerFactory.getLogger(MyNettyNioClient.class);
private final InetSocketAddress remoteAddress;
private final MyNioEventLoopGroup eventLoopGroup;
private SocketChannel socketChannel;
public MyNettyNioClient(InetSocketAddress remoteAddress, MyEventHandler myEventHandler, int nThreads) {
this.remoteAddress = remoteAddress;
this.eventLoopGroup = new MyNioEventLoopGroup(myEventHandler,nThreads);
}
public void start() throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
this.socketChannel = socketChannel;
MyNioEventLoop myNioEventLoop = this.eventLoopGroup.next();
myNioEventLoop.execute(()->{
try {
Selector selector = myNioEventLoop.getUnwrappedSelector();
// doConnect
// Returns: true if a connection was established,
// false if this channel is in non-blocking mode and the connection operation is in progress;
if(!socketChannel.connect(remoteAddress)){
SelectionKey selectionKey = socketChannel.register(selector, 0);
int clientInterestOps = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;
selectionKey.interestOps(selectionKey.interestOps() | clientInterestOps);
}
// 監聽connect事件
logger.info("MyNioClient do start! remoteAddress={}",remoteAddress);
} catch (IOException e) {
logger.error("MyNioClient do connect error!",e);
}
});
}
}
MyNetty EventLoop源碼
public class MyNioEventLoop implements Executor {
private static final Logger logger = LoggerFactory.getLogger(MyNioEventLoop.class);
/**
* 原始的jdk中的selector
* */
private final Selector unwrappedSelector;
private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
private volatile Thread thread;
private final MyNioEventLoopGroup childGroup;
private final AtomicBoolean threadStartedFlag = new AtomicBoolean(false);
private MyEventHandler myEventHandler;
public MyNioEventLoop(){
this(null);
}
public MyNioEventLoop(MyNioEventLoopGroup childGroup) {
this.childGroup = childGroup;
SelectorProvider selectorProvider = SelectorProvider.provider();
try {
this.unwrappedSelector = selectorProvider.openSelector();
} catch (IOException e) {
throw new RuntimeException("open selector error!",e);
}
}
@Override
public void execute(Runnable task) {
// 將任務加入eventLoop所屬的任務隊列,事件循環中會
taskQueue.add(task);
if(this.thread != Thread.currentThread()){
// 如果執行execute方法的線程不是當前線程,可能當前eventLoop對應的thread還沒有啟動
// 嘗試啟動當前eventLoop對應的線程(cas防并發)
if(threadStartedFlag.compareAndSet(false,true)){
// 類似netty的ThreadPerTaskExecutor,啟動一個線程來執行事件循環
new Thread(()->{
// 將eventLoop的thread與新啟動的這個thread進行綁定
this.thread = Thread.currentThread();
// 執行監聽selector的事件循環
doEventLoop();
}).start();
}
}
}
public Selector getUnwrappedSelector() {
return unwrappedSelector;
}
public void setMyEventHandler(MyEventHandler myEventHandler) {
this.myEventHandler = myEventHandler;
}
private void doEventLoop(){
// 事件循環
for(;;){
try{
if(taskQueue.isEmpty()){
int keys = unwrappedSelector.select(60000);
if (keys == 0) {
logger.info("server 60s未監聽到事件,繼續監聽!");
continue;
}
}else{
// 確保任務隊列里的任務能夠被觸發
unwrappedSelector.selectNow();
}
// 簡單起見,暫不實現基于時間等元素的更為公平的執行策略
// 直接先處理io,再處理所有task(ioRatio=100)
try {
// 處理監聽到的io事件
processSelectedKeys();
}finally {
// Ensure we always run tasks.
// 處理task隊列里的任務
runAllTasks();
}
}catch (Throwable e){
logger.error("server event loop error!",e);
}
}
}
private void processSelectedKeys() throws IOException {
// processSelectedKeysPlain
Iterator<SelectionKey> selectionKeyItr = unwrappedSelector.selectedKeys().iterator();
while (selectionKeyItr.hasNext()) {
SelectionKey key = selectionKeyItr.next();
logger.info("process SelectionKey={}",key.readyOps());
try {
// 拿出來后,要把集合中已經獲取到的事件移除掉,避免重復的處理
selectionKeyItr.remove();
if (key.isConnectable()) {
// 處理客戶端連接建立相關事件
processConnectEvent(key);
}
if (key.isAcceptable()) {
// 處理服務端accept事件(接受到來自客戶端的連接請求)
processAcceptEvent(key);
}
if (key.isReadable()) {
// 處理read事件
processReadEvent(key);
}
}catch (Throwable e){
logger.error("server event loop process an selectionKey error!",e);
// 處理io事件有異常,取消掉監聽的key,并且嘗試把channel也關閉掉
key.cancel();
if(key.channel() != null){
logger.error("has error, close channel={} ",key.channel());
key.channel().close();
}
}
}
}
private void runAllTasks(){
for (;;) {
// 通過無限循環,直到把隊列里的任務全部撈出來執行掉
Runnable task = taskQueue.poll();
if (task == null) {
return;
}
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
}
private void processAcceptEvent(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = ssChannel.accept();
if(this.childGroup != null){
// boss/worker模式,boss線程只負責接受和建立連接
// 將建立的連接交給child線程組去處理后續的讀寫
MyNioEventLoop childEventLoop = childGroup.next();
childEventLoop.execute(()->{
doRegister(childEventLoop,socketChannel);
});
}else{
doRegister(this,socketChannel);
}
}
private void processConnectEvent(SelectionKey key) throws IOException {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = key.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
key.interestOps(ops);
SocketChannel socketChannel = (SocketChannel) key.channel();
if(socketChannel.finishConnect()){
// 確認完成連接
logger.info("client channel connected! socketChannel={}",socketChannel);
}else{
logger.error("client channel connect failed!");
// 連接建立失敗,連接關閉(上層catch住會關閉連接)
throw new Error();
}
}
private void processReadEvent(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel)key.channel();
// 簡單起見,buffer不緩存,每次讀事件來都新創建一個
// 暫時也不考慮黏包/拆包場景(Netty中靠ByteToMessageDecoder解決,后續再分析其原理),理想的認為每個消息都小于1024,且每次讀事件都只有一個消息
ByteBuffer readBuffer = ByteBuffer.allocate(64);
int byteRead = socketChannel.read(readBuffer);
if(byteRead == -1){
// 簡單起見不考慮tcp半連接的情況,返回-1直接關掉連接
socketChannel.close();
// 取消key的監聽
key.cancel();
}else{
// 將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作
readBuffer.flip();
// 根據緩沖區可讀字節數創建字節數組
byte[] bytes = new byte[readBuffer.remaining()];
// 將緩沖區可讀字節數組復制到新建的數組中
readBuffer.get(bytes);
if(myEventHandler != null) {
myEventHandler.fireChannelRead(socketChannel, bytes);
}
}
}
private void doRegister(SocketChannel socketChannel){
try {
// nio的非阻塞channel
socketChannel.configureBlocking(false);
socketChannel.finishConnect();
logger.info("socketChannel={} finishConnect!",socketChannel);
// 將接受到的連接注冊到selector中,并監聽read事件
socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
logger.info("socketChannel={} doRegister success!",socketChannel);
}catch (Exception e){
logger.error("register socketChannel={} error!",socketChannel,e);
try {
socketChannel.close();
} catch (IOException ex) {
logger.error("register channel close={} error!",socketChannel,ex);
}
}
}
}
MyNetty EventLoopGroup源碼
public class MyNioEventLoopGroup {
private final MyNioEventLoop[] executors;
private final int nThreads;
private final AtomicInteger atomicInteger = new AtomicInteger();
public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads) {
this(myEventHandler,nThreads,null);
}
public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads, MyNioEventLoopGroup childGroup) {
if(nThreads <= 0){
throw new IllegalArgumentException("MyNioEventLoopGroup nThreads must > 0");
}
this.nThreads = nThreads;
// 基于參數,初始化對應數量的eventLoop
executors = new MyNioEventLoop[nThreads];
for(int i=0; i<nThreads; i++){
MyNioEventLoop myNioEventLoop = new MyNioEventLoop(childGroup);
myNioEventLoop.setMyEventHandler(myEventHandler);
executors[i] = myNioEventLoop;
}
}
public MyNioEventLoop next(){
// 輪訓分攤負載
int index = atomicInteger.getAndIncrement() % nThreads;
return executors[index];
}
}
- 在Netty的服務端中,基于reactor模式設置了兩個EventLoopGroup,一個被稱為BossGroup專門用于接受新連接;而在接受到新連接后,會按照round-robin算法將接收到的新連接均勻的派發給所屬的ChildGroup中的執行器,ChildGroup管理的就是Worker線程集合。而Netty的客戶端中,則相對簡單只有一個EventLoopGroup。
- EventLoop的實現與上述demo中的事件循環處理邏輯幾乎一致,最主要的不同是EventLoop對象雖然在EventLoopGroup中會很早被創建。但其所屬的Thread線程只在第一次執行execute方法時才會啟動(cas防并發 + task隊列多寫單讀)。
- 服務端BossGroup的線程數一般為1(一個監聽端口對應一個Boss線程),而Worker線程由于I/O多路復用的原因,其數量應該與所屬機器的CPU核心數相匹配才能最大限度的吃滿硬件。在Netty中,一個ChildGroup默認的Worker線程數為可用CPU核數的兩倍。
總結
- 本篇博客中,我們介紹了兩種最常見的操作系統I/O模型,并結合MyNetty的源碼分析了reactor模式的工作原理。
- 相比Netty,MyNetty關于EventLoop的實現十分簡單,僅相當于一個極簡版的Netty NioEventLoop,既沒有抽象出各種不同子類的實現(比如EpollEventLoop等),也沒有去實現關于jdk空輪訓bug的優化等。
這么做的主要目的是希望通過揭示出最核心的邏輯讓讀者更輕松的理解netty的工作原理。相信在理解了MyNetty簡易版的實現后,在未來著手理解晦澀復雜的Netty源碼時,能夠按圖索驥,將所掌握的核心邏輯作為樹干,更好的理解相關的旁路邏輯。
博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab1_nio_reactor 分支),內容如有錯誤,還請多多指教。
浙公網安備 33010602011771號