Netty 优化
# 1. 拓展序列化算法
序列化和反序列化主要用在消息正文的转换上:
- 序列化:需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化:需要将传入的正文数据还原成 Java 对象
# 1.1 抽象一个 Serializer 接口
我们到目前为止的代码都是使用的 Java 自带的序列化和反序列化机制,核心代码如下:
// 反序列化
byte[] body = new byte[bodyLength];
byteBuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);
// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
2
3
4
5
6
7
8
9
10
11
为了支持更多的序列化算法,可以抽象出一个 Serializer 接口:
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
}
2
3
4
5
6
7
8
9
# 1.2 JSON 的实现
下面是使用 GSON 来做的实现:
enum SerializerAlgorithm implements Serializer {
// Json 实现(引入了 Gson 依赖)
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
}
@Override
public <T> byte[] serialize(T object) {
return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
}
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1.3 修改编解码器
# 1.3.1 编码
// 获得序列化后的msg
// 使用指定的序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 获得序列化后的对象
byte[] bytes = values[out.getByte(5)-1].serialize(msg);
2
3
4
5
# 1.3.2 解码
// 获得反序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 通过指定方式进行反序列化
// 需要通过Message的方法获得具体的消息类型
Message message = values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);
2
3
4
5
# 2. 参数调优
client 与 server 的配置参数
- 客户端通过
.option()
方法来配置参数,它是给 SocketChannel 配置的参数
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option( xxx )
.channel(NioSocketChannel.class);
...
2
3
4
5
- 服务端通过
.option()
是给 ServerSocketChannel 配置参数.childOption()
是给 SocketChannel 配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(boss, worker)
.option( xxx )
.childOption( xxx );
...
2
3
4
5
6
# 2.1 CONNECT_TIMEOUT_MILLIS
# 2.1.1 使用
- 属于 SocketChannal 参数
- 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
- 注意:Netty 中不要用成了 SO_TIMEOUT,它主要用在阻塞 IO,而 Netty 是非阻塞 IO
使用:
public class TestParam {
public static void main(String[] args) {
// SocketChannel 5s内未建立连接就抛出异常
new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// ServerSocketChannel 5s内未建立连接就抛出异常
new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000);
// SocketChannel 5s内未建立连接就抛出异常
new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
}
}
2
3
4
5
6
7
8
9
10
11
# 2.1.2 源码分析
通过分析产生连接超时的源码,我们可以学习到 Netty 的一些设计思路。
客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢?
client 的代码:
Bootstrap bootstrap = ....;
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
2
3
这是主线程的行为,调用 connect 方法后,会有一个 NIO 线程来执行连接操作,connect 方法立即返回一个 future 对象,这个对象其实是一个 Promise 对象。
看一下 NIO 线程,这个线程的关键在于 AbstractNioUnsafe 类的 connect 方法:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
// Schedule connect timeout.
// 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 如果超时时间大于0
if (connectTimeoutMillis > 0) {
// 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
// schedule(Runnable command, long delay, TimeUnit unit)
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
// 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
// 如果超时,则通过tryFailure方法将异常放入Promise中
// 在主线程中抛出
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
这里有一个 eventLoop,之前我们说过 NIO 线程其实就属于一个 EventLoop,它可以检测这个 loop 上的 IO 事件、加一些任务等,这里的 eventLoop().schedule()
就是在 eventLoop 上加一个定时任务,这个定时任务包含要执行的东西、时间:
- 执行的东西:抛出 ConnectTimeoutException
- 时间:我们配置的超时参数
当时间一到,他就执行这个任务,从而创建连接超时异常,并将这个异常通过 Promise 传给主线程。如果这个期间内连接成功,他就会取消这个任务,取消任务的代码这里就不演示了。
所以,超时的判断主要是通过 Eventloop 的 schedule 方法和 Promise 共同实现的:
- schedule 设置了一个定时任务,延迟
connectTimeoutMillis
秒后执行该方法 - 如果指定时间内没有建立连接,则会执行其中的任务
- 任务负责创建
ConnectTimeoutException
异常,并将异常通过 Pormise 传给主线程并抛出
- 任务负责创建
我们通过分析这里的源码,学习到了:
- Netty 中通过 Promise 来进行线程通信
- Netty 中可以通过 eventLoop 加一些定时任务,并在时间到达后开始执行这个任务
# 2.2 SO_BACKLOG
# 2.2.1 复习三次握手
参考下图,sync queue 表示半连接队列,accept queue 表示全连接队列:
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中:
- 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
- sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
- accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
- sync queue - 半连接队列
文件中配置的这些值是系统参数,而在 netty 中我们也可以传入 backlog 参数。
在 Java NIO 中,是通过
bind()
函数传入这个参数值;在 netty 中,是需要通过.option()
来传入这个参数。
在 Netty 中,可以通过 option(ChannelOption.SO_BACKLOG, 值)
来设置 SO_BACKLOG 的大小。
# 2.3 ulimit -n
属于操作系统的参数,用于限制一个进程能够打开的文件描述符的最大数量。
socket 这类也是被视为文件描述符。
# 2.4 TCP_NODELAY
属于 SocketChannal 参数。
表示是否开启 Nagle 算法。默认为 false,即开启 Nagle 算法。
# 2.5 SO_SNDBUF & SO_RCVBUF
表示发送缓存区和接收缓冲区,决定了滑动窗口的上限。
建议大家不要调节这两个参数,现在的 OS 都可以根据实际情况来智能调节。
- SO_SNDBUF 属于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
# 2.6 ALLOCATOR
是 ByteBuf 的分配器。
- 属于 SocketChannal 参数
- 用来分配 ByteBuf, ctx.alloc()
# 2.7 RCVBUF_ALLOCATOR
- 属于 SocketChannal 参数
- 控制 netty 接收缓冲区大小
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定