NIO 基础与 ByteBuffer
Non-blocking IO:非阻塞 IO
# 1. 三大组件
# 1.1 Channel & Buffer
channel 是数据的传输通道,是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel。
常见的 Channel 有
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer 则用来缓冲读写数据,常见的 buffer 有
- ByteBuffer(最常用)
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
# 1.2 Selector
selector 单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途。
# 多线程版设计
如果每个 thread 来处理一个 socket client,那么会出现内存占用高、线程上下文切换成本高等问题,只适用于连接数少的场景。
# 线程池版设计
缺点:
- 在 socket 阻塞模式下,线程仅能处理一个 socket 连接,处理完后才能处理下一个
- 仅适合短连接场景
# selector 版设计
selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)
可以把 selector 想象成一个下面这些 channel 的监控器,哪个 channel 产生了事件,都会被 selector 检测到
调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理。
# 2. ByteBuffer
有一普通文本文件 data.txt,内容为:
1234567890abcd
# 2.1 使用 FileChannel 读取文件
使用 FileChannel 来读取 data.txt 的示例程序如下:
@Slf4j
public class ChannelDemo1 {
public static void main(String[] args) {
// 获取 FileChannel
try (RandomAccessFile file = new RandomAccessFile("data.txt", "rw")) {
FileChannel channel = file.getChannel();
// 准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
do {
// 从 channel 读取数据,向 buffer 写入
int len = channel.read(buffer);
log.debug("读到字节数:{}", len);
if (len == -1) { // 返回 -1 表示没有内容了
break;
}
// 打印 buffer 内容
buffer.flip(); // 切换 buffer 读模式
while(buffer.hasRemaining()) {
byte b = buffer.get();
log.debug("{}", (char) b);
}
// 切换 buffer 写模式
buffer.clear();
} while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
获取 FileChannel 对象的方式也可以这样:
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
...
}
2
3
# 2.2 ByteBuffer 正确使用姿势
- 向 buffer 写入数据,例如调用
channel.read(buffer)
- 调用
flip()
切换至读模式 - 从 buffer 读取数据,例如调用
buffer.get()
- 调用
clear()
或compact()
切换至写模式 - 重复 1~4 步骤
# 2.3 ByteBuffer 内部结构
ByteBuffer 像一个数组结构,有以下重要属性
- capacity:容量,表示能装多少数据
- position:类似于读写的指针,表示读到哪了,写到哪了
- limit:代表读写的限制,限制应该读多少 byte,应该写多少 byte
写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态:
flip 动作发生后,position 切换为读取位置,limit 切换为读取限制:
读取 4 个字节后,状态:
clear 动作发生后,状态:
compact 方法,是把未读完的部分向前压缩,然后切换至写模式:
# 2.4 ByteBuffer 调试工具
我们这里写了一个调试工具,这样当我们有了一个 ByteBuffer 实例的时候,可以调用这个调试工具的 debugAll()
来看一下这个 buffer 里面的内容。调试工具代码如下,可以直接粘贴到项目中使用:
::: detail ByteBuffer 调试工具 调试工具代码如下:
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
:::
# 2.5 ByteBuffer 常见方法
# 2.5.1 allocate 分配空间
可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法:
Bytebuffer buf = ByteBuffer.allocate(16);
其实有两种 buffer:
allocate()
:分配得到HeapByteBuffer
,属于 Java 堆内存,读写效率较低,受到 GC 影响allocateDirect()
:分配得到DirectByteBuffer
,是直接内存,读写效率高(少一次拷贝),不会受 GC 影响,但申请需要经过操作系统,因此分配效率低
# 2.5.2 向 buffer 写入数据
有两种向 buffer 写入数据的方式:
- 调用 channel 的 read 方法:
int readBytes = channel.read(buf);
- 调用 buffer 自己的 put 方法:
buf.put((byte) 127);
# 2.5.3 从 buffer 读取数据
两种方式:
- 调用 channel 的 write 方法:
int writeBytes = channel.write(buf);
- 调用 buffer 的 get 方法:
byte b = buf.get();
get 方法会让 position 读指针向后走,如果想重复读取数据:
- 可以调用
rewind
方法将 position 重新置为 0 - 或者调用
get(int i)
方法获取索引 i 的内容,它不会移动读指针
# 2.5.4 mark & reset
mark 是做一个标记,读取时记录一个 position 的位置,这样只要之后调用 reset 就可以回到 mark 的位置。
- mark 是在接下来要读的 position 上做个标记
- rewind 和 flip 都会清除 mark 位置
# 2.5.5 String 与 ByteBuffer 互转
String -> ByteBuffer:
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("hello");
ByteBuffer buffer3 = ByteBuffer.warp("hello".getBytes());
2
3
ByteBuffer -> String:
CharBuffer cb = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(cb.getClass());
System.out.println(cb.toString()); // 转为 String
2
3
# 2.5.6 Buffer 的线程安全
Buffer 是非线程安全的。
# 2.6 Scattering Reads
分散读取,有一个文本文件 3parts.txt:
onetwothree
使用如下方式读取,可以将数据填充至多个 buffer:
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
debug(a);
debug(b);
debug(c);
} catch (IOException e) {
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.7 Gathering Writes
集中写入,可以将多个 buffer 的数据填充至 channel:
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer d = ByteBuffer.allocate(4);
ByteBuffer e = ByteBuffer.allocate(4);
channel.position(11);
d.put(new byte[]{'f', 'o', 'u', 'r'});
e.put(new byte[]{'f', 'i', 'v', 'e'});
d.flip();
e.flip();
debug(d);
debug(e);
channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.8 练习
网络上有多条数据发送给服务端,数据之间使用 \n
进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有 3 条为:
Hello,world\n
I'm zhangsan\n
How are you?\n
变成了下面的两个 byteBuffer (黏包,半包)
Hello,world\nI'm zhangsan\nHo
w are you?\n
现在要求你编写程序,将错乱的数据恢复成原始的按 \n
分隔的数据:
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 11 24
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\nhaha!\n".getBytes());
split(source);
}
private static void split(ByteBuffer source) {
source.flip();
int oldLimit = source.limit();
for (int i = 0; i < oldLimit; i++) {
if (source.get(i) == '\n') {
System.out.println(i);
ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
// 0 ~ limit
source.limit(i + 1);
target.put(source); // 从source 读,向 target 写
debugAll(target);
source.limit(oldLimit);
}
}
source.compact();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
这里只是模拟的网络编程,但在实际网络编程中,正确处理黏包、半包问题是很重要的。