Netty-NIO基礎
一. NIO 基礎
non-blocking io 非阻塞 IO
1. 三大組件
1.1 Channel & Buffer
channel 有一點類似于 stream,它就是讀寫數據的雙向通道,可以從 channel 將數據讀入 buffer,也可以將 buffer 的數據寫入 channel,而之前的 stream 要么是輸入,要么是輸出,channel 比 stream 更為底層
常見的 Channel 有
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer 則用來緩沖讀寫數據,常見的 buffer 有
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
1.2 Selector
selector 單從字面意思不好理解,需要結合服務器的設計演化來理解它的用途
多線程版設計
?? 多線程版缺點
- 內存占用高
- 線程上下文切換成本高
- 只適合連接數少的場景
線程池版設計
?? 線程池版缺點
- 阻塞模式下,線程僅能處理一個 socket 連接
- 僅適合短連接場景
selector 版設計
selector 的作用就是配合一個線程來管理多個 channel,獲取這些 channel 上發生的事件,這些 channel 工作在非阻塞模式下,不會讓線程吊死在一個 channel 上。適合連接數特別多,但流量低的場景(low traffic)
調用 selector 的 select() 會阻塞直到 channel 發生了讀寫就緒事件,這些事件發生,select 方法就會返回這些事件交給 thread 來處理
2. ByteBuffer
有一普通文本文件 data.txt,內容為
1234567890abcd
使用 FileChannel 來讀取文件內容
@Slf4j
public class ChannelDemo1 {
public static void main(String[] args) {
try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
do {
// 向 buffer 寫入
int len = channel.read(buffer);
log.debug("讀到字節數:{}", len);
if (len == -1) {
break;
}
// 切換 buffer 讀模式
buffer.flip();
while(buffer.hasRemaining()) {
log.debug("{}", (char)buffer.get());
}
// 切換 buffer 寫模式
buffer.clear();
} while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
輸出
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節數:10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節數:4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節數:-1
2.1 ByteBuffer 正確使用姿勢
- 向 buffer 寫入數據,例如調用 channel.read(buffer)
- 調用 flip() 切換至讀模式
- 從 buffer 讀取數據,例如調用 buffer.get()
- 調用 clear() 或 compact() 切換至寫模式
- 重復 1~4 步驟
2.2 ByteBuffer 結構
ByteBuffer 有以下重要屬性
- capacity
- position
- limit
一開始

寫模式下,position 是寫入位置,limit 等于容量,下圖表示寫入了 4 個字節后的狀態

flip 動作發生后,position 切換為讀取位置,limit 切換為讀取限制

讀取 4 個字節后,狀態

clear 動作發生后,狀態

compact 方法,是把未讀完的部分向前壓縮,然后切換至寫模式

?? 調試工具類
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有內容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可讀取內容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
2.3 ByteBuffer 常見方法
分配空間
可以使用 allocate 方法為 ByteBuffer 分配空間,其它 buffer 類也有該方法
Bytebuffer buf = ByteBuffer.allocate(16);
向 buffer 寫入數據
有兩種辦法
- 調用 channel 的 read 方法
- 調用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
和
buf.put((byte)127);
從 buffer 讀取數據
同樣有兩種辦法
- 調用 channel 的 write 方法
- 調用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
和
byte b = buf.get();
get 方法會讓 position 讀指針向后走,如果想重復讀取數據
- 可以調用 rewind 方法將 position 重新置為 0
- 或者調用 get(int i) 方法獲取索引 i 的內容,它不會移動讀指針
mark 和 reset
mark 是在讀取時,做一個標記,即使 position 改變,只要調用 reset 就能回到 mark 的位置
注意
rewind 和 flip 都會清除 mark 位置
字符串與 ByteBuffer 互轉
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");
debug(buffer1);
debug(buffer2);
CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
class java.nio.HeapCharBuffer
你好
?? Buffer 的線程安全
Buffer 是非線程安全的
2.4 Scattering Reads
分散讀取,有一個文本文件 3parts.txt
onetwothree
使用如下方式讀取,可以將數據填充至多個 buffer
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
debug(a);
debug(b);
debug(c);
} catch (IOException e) {
e.printStackTrace();
}
結果
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+
2.5 Gathering Writes
使用如下方式寫入,可以將多個 buffer 的數據填充至 channel
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer d = ByteBuffer.allocate(4);
ByteBuffer e = ByteBuffer.allocate(4);
channel.position(11);
d.put(new byte[]{'f', 'o', 'u', 'r'});
e.put(new byte[]{'f', 'i', 'v', 'e'});
d.flip();
e.flip();
debug(d);
debug(e);
channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
e.printStackTrace();
}
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 6f 75 72 |four |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 69 76 65 |five |
+--------+-------------------------------------------------+----------------+
文件內容
onetwothreefourfive
2.6 練習
網絡上有多條數據發送給服務端,數據之間使用 \n 進行分隔
但由于某種原因這些數據在接收時,被進行了重新組合,例如原始數據有3條為
- Hello,world\n
- I'm zhangsan\n
- How are you?\n
變成了下面的兩個 byteBuffer (黏包,半包)
- Hello,world\nI'm zhangsan\nHo
- w are you?\n
現在要求你編寫程序,將錯亂的數據恢復成原始的按 \n 分隔的數據
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 11 24
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\nhaha!\n".getBytes());
split(source);
}
private static void split(ByteBuffer source) {
source.flip();
int oldLimit = source.limit();
for (int i = 0; i < oldLimit; i++) {
if (source.get(i) == '\n') {
System.out.println(i);
ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
// 0 ~ limit
source.limit(i + 1);
target.put(source); // 從source 讀,向 target 寫
debugAll(target);
source.limit(oldLimit);
}
}
source.compact();
}
3. 文件編程
3.1 FileChannel
?? FileChannel 工作模式
FileChannel 只能工作在阻塞模式下
獲取
不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法
- 通過 FileInputStream 獲取的 channel 只能讀
- 通過 FileOutputStream 獲取的 channel 只能寫
- 通過 RandomAccessFile 是否能讀寫根據構造 RandomAccessFile 時的讀寫模式決定
讀取
會從 channel 讀取數據填充 ByteBuffer,返回值表示讀到了多少字節,-1 表示到達了文件的末尾
int readBytes = channel.read(buffer);
寫入
寫入的正確姿勢如下, SocketChannel
ByteBuffer buffer = ...;
buffer.put(...); // 存入數據
buffer.flip(); // 切換讀模式
while(buffer.hasRemaining()) {
channel.write(buffer);
}
在 while 中調用 channel.write 是因為 write 方法并不能保證一次將 buffer 中的內容全部寫入 channel
關閉
channel 必須關閉,不過調用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會間接地調用 channel 的 close 方法
位置
獲取當前位置
long pos = channel.position();
設置當前位置
long newPos = ...;
channel.position(newPos);
設置當前位置時,如果設置為文件的末尾
- 這時讀取會返回 -1
- 這時寫入,會追加內容,但要注意如果 position 超過了文件末尾,再寫入時在新內容和原末尾之間會有空洞(00)
大小
使用 size 方法獲取文件的大小
強制寫入
操作系統出于性能的考慮,會將數據緩存,不是立刻寫入磁盤??梢哉{用 force(true) 方法將文件內容和元數據(文件的權限等信息)立刻寫入磁盤
3.2 兩個 Channel 傳輸數據
String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();
FileChannel to = new FileOutputStream(TO).getChannel();
) {
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用時:" + (end - start) / 1000_000.0);
輸出
transferTo 用時:8.2011
超過 2g 大小的文件傳輸
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 效率高,底層會利用操作系統的零拷貝進行優化
long size = from.size();
// left 變量代表還剩余多少字節
for (long left = size; left > 0; ) {
System.out.println("position:" + (size - left) + " left:" + left);
left -= from.transferTo((size - left), left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
實際傳輸一個超大文件
position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219
3.3 Path
jdk7 引入了 Path 和 Paths 類
- Path 用來表示文件路徑
- Paths 是工具類,用來獲取 Path 實例
Path source = Paths.get("1.txt"); // 相對路徑 使用 user.dir 環境變量來定位 1.txt
Path source = Paths.get("d:\\1.txt"); // 絕對路徑 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt"); // 絕對路徑 同樣代表了 d:\1.txt
Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
.代表了當前路徑..代表了上一級路徑
例如目錄結構如下
d:
|- data
|- projects
|- a
|- b
代碼
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正?;窂?
會輸出
d:\data\projects\a\..\b
d:\data\projects\b
3.4 Files
檢查文件是否存在
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));
創建一級目錄
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
- 如果目錄已存在,會拋異常 FileAlreadyExistsException
- 不能一次創建多級目錄,否則會拋異常 NoSuchFileException
創建多級目錄用
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);
拷貝文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
Files.copy(source, target);
- 如果文件已存在,會拋異常 FileAlreadyExistsException
如果希望用 source 覆蓋掉 target,需要用 StandardCopyOption 來控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
移動文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
- StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性
刪除文件
Path target = Paths.get("helloword/target.txt");
Files.delete(target);
- 如果文件不存在,會拋異常 NoSuchFileException
刪除目錄
Path target = Paths.get("helloword/d1");
Files.delete(target);
- 如果目錄還有內容,會拋異常 DirectoryNotEmptyException
遍歷目錄文件
public static void main(String[] args) throws IOException {
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
System.out.println(dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println(dirCount); // 133
System.out.println(fileCount); // 1479
}
統計 jar 的數目
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
if (file.toFile().getName().endsWith(".jar")) {
fileCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println(fileCount); // 724
刪除多級目錄
Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
?? 刪除很危險
刪除是危險操作,確保要遞歸刪除的文件夾沒有重要內容
拷貝多級目錄
long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";
Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
// 是目錄
if (Files.isDirectory(path)) {
Files.createDirectory(Paths.get(targetName));
}
// 是普通文件
else if (Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
4. 網絡編程
4.1 非阻塞 vs 阻塞
阻塞
- 阻塞模式下,相關方法都會導致線程暫停
- ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停
- SocketChannel.read 會在沒有數據可讀時讓線程暫停
- 阻塞的表現其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當于閑置
- 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
- 但多線程下,有新的問題,體現在以下方面
- 32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數過多,必然導致 OOM,并且線程太多,反而會因為頻繁上下文切換導致性能降低
- 可以采用線程池技術來減少線程數和線程上下文切換,但治標不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接
服務器端
// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創建了服務器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 綁定監聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客戶端發送的數據
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,線程停止運行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
非阻塞
- 非阻塞模式下,相關方法都會不會讓線程暫停
- 在 ServerSocketChannel.accept 在沒有連接建立時,會返回 null,繼續運行
- SocketChannel.read 在沒有數據可讀時,會返回 0,但線程不必阻塞,可以去執行其它 SocketChannel 的 read 或是去執行 ServerSocketChannel.accept
- 寫數據時,線程只是等待數據寫入 Channel 即可,無需等 Channel 通過網絡把數據發送出去
- 但非阻塞模式下,即使沒有連接建立,和可讀數據,線程仍然在不斷運行,白白浪費了 cpu
- 數據復制過程中,線程實際還是阻塞的(AIO 改進的地方)
服務器端,客戶端代碼不變
// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創建了服務器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續運行,如果沒有連接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客戶端發送的數據
int read = channel.read(buffer);// 非阻塞,線程仍然會繼續運行,如果沒有讀到數據,read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
多路復用
單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監控,這稱之為多路復用
- 多路復用僅針對網絡 IO、普通文件 IO 沒法利用多路復用
- 如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證
- 有可連接事件時才去連接
- 有可讀事件才去讀取
- 有可寫事件才去寫入
- 限于網絡傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發 Selector 的可寫事件
4.2 Selector
好處
- 一個線程配合 selector 就可以監控多個 channel 的事件,事件發生線程才去處理。避免非阻塞模式下所做無用功
- 讓這個線程能夠被充分利用
- 節約了線程的數量
- 減少了線程上下文切換
創建
Selector selector = Selector.open();
綁定 Channel 事件
也稱之為注冊事件,綁定的事件 selector 才會關心
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 綁定事件);
- channel 必須工作在非阻塞模式
- FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
- 綁定的事件類型可以有
- connect - 客戶端連接成功時觸發
- accept - 服務器端成功接受連接時觸發
- read - 數據可讀入時觸發,有因為接收能力弱,數據暫不能讀入的情況
- write - 數據可寫出時觸發,有因為發送能力弱,數據暫不能寫出的情況
監聽 Channel 事件
可以通過下面三種方法來監聽是否有事件發生,方法的返回值代表有多少 channel 發生了事件
方法1,阻塞直到綁定事件發生
int count = selector.select();
方法2,阻塞直到綁定事件發生,或是超時(時間單位為 ms)
int count = selector.select(long timeout);
方法3,不會阻塞,也就是不管有沒有事件,立刻返回,自己根據返回值檢查是否有事件
int count = selector.selectNow();
?? select 何時不阻塞
- 事件發生時
- 客戶端發起連接請求,會觸發 accept 事件
- 客戶端發送數據過來,客戶端正常、異常關閉時,都會觸發 read 事件,另外如果發送的數據大于 buffer 緩沖區,會觸發多次讀取事件
- channel 可寫,會觸發 write 事件
- 在 linux 下 nio bug 發生時
- 調用 selector.wakeup()
- 調用 selector.close()
- selector 所在線程 interrupt
4.3 處理 accept 事件
客戶端代碼為
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務器端代碼為
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
?? 事件發生后能否不處理
事件發生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發,這是因為 nio 底層使用的是水平觸發
4.4 處理 read 事件
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("連接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
開啟兩個客戶端,修改一下發送文字,輸出
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 |world |
+--------+-------------------------------------------------+----------------+
?? 為何要 iter.remove()
因為 select 在事件發生后,就會將相關的 key 放入 selectedKeys 集合,但不會在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如
- 第一次觸發了 ssckey 上的 accept 事件,沒有移除 ssckey
- 第二次觸發了 sckey 上的 read 事件,但這時 selectedKeys 中還有上次的 ssckey ,在處理時因為沒有真正的 serverSocket 連上了,就會導致空指針異常
?? cancel 的作用
cancel 會取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續不會再監聽事件
?? 不處理邊界的問題
以前有同學寫過這樣的代碼,思考注釋中兩個問題,以 bio 為例,其實 nio 道理是一樣的
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss=new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 這里這么寫,有沒有問題
byte[] arr = new byte[4];
while(true) {
int read = in.read(arr);
// 這里這么寫,有沒有問題
if(read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}
客戶端
public class Client {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
max.close();
}
}
輸出
hell
owor
ld?
?好
為什么?
處理消息的邊界

- 一種思路是固定消息長度,數據包大小一樣,服務器按預定長度讀取,缺點是浪費帶寬
- 另一種思路是按分隔符拆分,缺點是效率低
- TLV 格式,即 Type 類型、Length 長度、Value 數據,類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內容過大,則影響 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
服務器端
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一條完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把這條完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 從 source 讀,向 target 寫
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}
public static void main(String[] args) throws IOException {
// 1. 創建 selector, 管理多個 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的聯系(注冊)
// SelectionKey 就是將來事件發生后,通過它可以知道事件和哪個channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只關注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 沒有事件發生,線程阻塞,有事件,線程才會恢復運行
// select 在事件未處理時,它不會阻塞, 事件發生后要么處理,要么取消,不能置之不理
selector.select();
// 4. 處理事件, selectedKeys 內部包含了所有發生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 處理key 時,要從 selectedKeys 集合中刪除,否則下次處理就會有問題
iter.remove();
log.debug("key: {}", key);
// 5. 區分事件類型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 將一個 byteBuffer 作為附件關聯到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發事件的channel
// 獲取 selectionKey 上關聯的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要擴容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因為客戶端斷開了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
}
}
}
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
ByteBuffer 大小分配
- 每個 channel 都需要記錄可能被切分的消息,因為 ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護一個獨立的 ByteBuffer
- ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內存,因此需要設計大小可變的 ByteBuffer
- 一種思路是首先分配一個較小的 buffer,例如 4k,如果發現數據不夠,再分配 8k 的 buffer,將 4k buffer 內容拷貝至 8k buffer,優點是消息連續容易處理,缺點是數據拷貝耗費性能,參考實現 http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一種思路是用多個數組組成 buffer,一個數組不夠,把多出來的內容寫入新的數組,與前面的區別是消息存儲不連續解析復雜,優點是避免了拷貝引起的性能損耗
4.5 處理 write 事件
一次無法寫完例子
- 非阻塞模式下,無法保證把 buffer 中所有數據都寫入 channel,因此需要追蹤 write 方法的返回值(代表實際寫入字節數)
- 用 selector 監聽所有 channel 的可寫事件,每個 channel 都需要一個 key 來跟蹤 buffer,但這樣又會導致占用內存過多,就有兩階段策略
- 當消息處理器第一次寫入消息時,才將 channel 注冊到 selector 上
- selector 檢查 channel 上的可寫事件,如果所有的數據寫完了,就取消 channel 的注冊
- 如果不取消,會每次可寫均會觸發 write 事件
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客戶端發送內容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示實際寫了多少字節
System.out.println("實際寫入字節:" + write);
// 4. 如果有剩余未讀字節,才需要關注寫事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有關注事件的基礎上,多關注 寫事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作為附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("實際寫入字節:" + write);
if (!buffer.hasRemaining()) { // 寫完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
客戶端
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}
?? write 為何要取消
只要向 channel 發送數據時,socket 緩沖可寫,這個事件會頻繁觸發,因此應當只在 socket 緩沖區寫不下時再關注可寫事件,數據寫完之后再取消關注
4.6 更進一步
?? 利用多線程優化
現在都是多核 cpu,設計時要充分考慮別讓 cpu 的力量被白白浪費
前面的代碼只有一個選擇器,沒有充分利用多核 cpu,如何改進呢?
分兩組選擇器
- 單線程配一個選擇器,專門處理 accept 事件
- 創建 cpu 核心數的線程,每個線程配一個選擇器,輪流處理 read 事件
public class ChannelDemo7 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}
@Slf4j
static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
AtomicInteger index = new AtomicInteger();
public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}
public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}
@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
public WorkerEventLoop(int index) {
this.index = index;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}
@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
?? 如何拿到 cpu 個數
- Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因為容器不是物理隔離的,會拿到物理 cpu 個數,而不是容器申請時的個數
- 這個問題直到 jdk 10 才修復,使用 jvm 參數 UseContainerSupport 配置, 默認開啟
4.7 UDP
- UDP 是無連接的,client 發送數據不會管 server 是否開啟
- server 這邊的 receive 方法會將接收到的數據存入 byte buffer,但如果數據報文超過 buffer 大小,多出來的數據會被默默拋棄
首先啟動服務器端
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
輸出
waiting...
運行客戶端
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}
接下來服務器端輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
5. NIO vs BIO
5.1 stream vs channel
- stream 不會自動緩沖數據,channel 會利用系統提供的發送緩沖區、接收緩沖區(更為底層)
- stream 僅支持阻塞 API,channel 同時支持阻塞、非阻塞 API,網絡 channel 可配合 selector 實現多路復用
- 二者均為全雙工,即讀寫可以同時進行
5.2 IO 模型
同步阻塞、同步非阻塞、同步多路復用、異步阻塞(沒有此情況)、異步非阻塞
- 同步:線程自己去獲取結果(一個線程)
- 異步:線程自己不去獲取結果,而是由其它線程送結果(至少兩個線程)
當調用一次 channel.read 或 stream.read 后,會切換至操作系統內核態來完成真正數據讀取,而讀取又分為兩個階段,分別為:
-
等待數據階段
-
復制數據階段
![]()
-
阻塞 IO
![]()
-
非阻塞 IO
![]()
-
多路復用
![]()
-
信號驅動
-
異步 IO
![]()
-
阻塞 IO vs 多路復用
![]()
![]()
?? 參考
UNIX 網絡編程 - 卷 I
5.3 零拷貝
傳統 IO 問題
傳統的 IO 將一個文件通過 socket 寫出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
內部工作流程是這樣的:

-
java 本身并不具備 IO 讀寫能力,因此 read 方法調用后,要從 java 程序的用戶態切換至內核態,去調用操作系統(Kernel)的讀能力,將數據讀入內核緩沖區。這期間用戶線程阻塞,操作系統使用 DMA(Direct Memory Access)來實現文件讀,其間也不會使用 cpu
DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO
-
從內核態切換回用戶態,將數據從內核緩沖區讀入用戶緩沖區(即 byte[] buf),這期間 cpu 會參與拷貝,無法利用 DMA
-
調用 write 方法,這時將數據從用戶緩沖區(byte[] buf)寫入 socket 緩沖區,cpu 會參與拷貝
-
接下來要向網卡寫數據,這項能力 java 又不具備,因此又得從用戶態切換至內核態,調用操作系統的寫能力,使用 DMA 將 socket 緩沖區的數據寫入網卡,不會使用 cpu
可以看到中間環節較多,java 的 IO 實際不是物理設備級別的讀寫,而是緩存的復制,底層的真正讀寫是操作系統來完成的
- 用戶態與內核態的切換發生了 3 次,這個操作比較重量級
- 數據拷貝了共 4 次
NIO 優化
通過 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的還是 java 內存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系統內存

大部分步驟與優化前相同,不再贅述。唯有一點:java 可以使用 DirectByteBuf 將堆外內存映射到 jvm 內存中來直接訪問使用
- 這塊內存不受 jvm 垃圾回收的影響,因此內存地址固定,有助于 IO 讀寫
- java 中的 DirectByteBuf 對象僅維護了此內存的虛引用,內存回收分成兩步
- DirectByteBuf 對象被垃圾回收,將虛引用加入引用隊列
- 通過專門線程訪問引用隊列,根據虛引用釋放堆外內存
- 減少了一次數據拷貝,用戶態與內核態的切換次數沒有減少
進一步優化(底層采用了 linux 2.1 后提供的 sendFile 方法),java 中對應著兩個 channel 調用 transferTo/transferFrom 方法拷貝數據

- java 調用 transferTo 方法后,要從 java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 cpu
- 數據從內核緩沖區傳輸到 socket 緩沖區,cpu 會參與拷貝
- 最后使用 DMA 將 socket 緩沖區的數據寫入網卡,不會使用 cpu
可以看到
- 只發生了一次用戶態與內核態的切換
- 數據拷貝了 3 次
進一步優化(linux 2.4)

- java 調用 transferTo 方法后,要從 java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 cpu
- 只會將一些 offset 和 length 信息拷入 socket 緩沖區,幾乎無消耗
- 使用 DMA 將 內核緩沖區的數據寫入網卡,不會使用 cpu
整個過程僅只發生了一次用戶態與內核態的切換,數據拷貝了 2 次。所謂的【零拷貝】,并不是真正無拷貝,而是在不會拷貝重復數據到 jvm 內存中,零拷貝的優點有
- 更少的用戶態與內核態的切換
- 不利用 cpu 計算,減少 cpu 緩存偽共享
- 零拷貝適合小文件傳輸
5.3 AIO
AIO 用來解決數據復制階段的阻塞問題
- 同步意味著,在進行讀寫操作時,線程需要等待結果,還是相當于閑置
- 異步意味著,在進行讀寫操作時,線程不必等待結果,而是將來由操作系統來通過回調方式由另外的線程來獲得結果
異步模型需要底層操作系統(Kernel)提供支持
- Windows 系統通過 IOCP 實現了真正的異步 IO
- Linux 系統異步 IO 在 2.6 版本引入,但其底層實現還是用多路復用模擬了異步 IO,性能沒有優勢
文件 AIO
先來看看 AsynchronousFileChannel
@Slf4j
public class AioDemo1 {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});
} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}
輸出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+
可以看到
- 響應文件讀取成功的是另一個線程 Thread-5
- 主線程并沒有 IO 操作阻塞
?? 守護線程
默認文件 AIO 使用的線程都是守護線程,所以最后要執行 System.in.read() 以避免守護線程意外結束
網絡 AIO
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 處理完第一個 read 時,需要再次調用 read 方法來處理下一個 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作為附件的 buffer 還有內容,需要再次 write 寫出剩余內容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 讀事件由 ReadHandler 處理
sc.read(buffer, buffer, new ReadHandler(sc));
// 寫事件由 WriteHandler 處理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 處理完第一個 accpet 時,需要再次調用 accept 方法來處理下一個 accept 事件
ssc.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}








浙公網安備 33010602011771號