notebook notebook
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)

学习笔记

啦啦啦,向太阳~
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)
  • Java开发

    • My

    • Posts

    • Java SE

    • Java 并发编程

    • JVM

    • JDBC

    • Java Web

    • Spring 与 SpringMVC

    • Spring Boot

    • Spring Cloud

    • Spring Security

    • Netty

      • 黑马 Netty

        • NIO 基础与 ByteBuffer
        • 文件编程
        • 网络编程
        • Netty 入门
        • 基本组件:EventLoop、Channel、Future 与 Promise
        • 基本组件:Handler、Pipeline 与 ByteBuf
        • 应用一:粘包与半包问题
        • 应用二:协议设计与解析
        • Netty 优化
          • 1. 拓展序列化算法
            • 1.1 抽象一个 Serializer 接口
            • 1.2 JSON 的实现
            • 1.3 修改编解码器
          • 2. 参数调优
            • 2.1 CONNECTTIMEOUTMILLIS
            • 2.2 SO_BACKLOG
            • 2.3 ulimit -n
            • 2.4 TCP_NODELAY
            • 2.5 SOSNDBUF & SORCVBUF
            • 2.6 ALLOCATOR
            • 2.7 RCVBUF_ALLOCATOR
    • MyBatis

  • Python开发

  • Golang开发

  • Git

  • 软件设计与架构

  • 大数据与分布式系统

  • 区块链

  • Nginx

  • 开发
  • Java开发
  • Netty
  • 黑马 Netty
yubin
2023-12-03
目录

Netty 优化

# 1. 拓展序列化算法

序列化和反序列化主要用在消息正文的转换上:

  • 序列化:需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化:需要将传入的正文数据还原成 Java 对象

# 1.1 抽象一个 Serializer 接口

我们到目前为止的代码都是使用的 Java 自带的序列化和反序列化机制,核心代码如下:

// 反序列化
byte[] body = new byte[bodyLength];
byteBuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
1
2
3
4
5
6
7
8
9
10
11

为了支持更多的序列化算法,可以抽象出一个 Serializer 接口:

public interface Serializer {

    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // 序列化方法
    <T> byte[] serialize(T object);

}
1
2
3
4
5
6
7
8
9

# 1.2 JSON 的实现

下面是使用 GSON 来做的实现:

enum SerializerAlgorithm implements Serializer {

    // Json 实现(引入了 Gson 依赖)
    Json {
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
        }

        @Override
        public <T> byte[] serialize(T object) {
            return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
        }
    };

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 1.3 修改编解码器

# 1.3.1 编码

// 获得序列化后的msg
// 使用指定的序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 获得序列化后的对象
byte[] bytes = values[out.getByte(5)-1].serialize(msg);
1
2
3
4
5

# 1.3.2 解码

// 获得反序列化方式
SerializerAlgorithm[] values = SerializerAlgorithm.values();
// 通过指定方式进行反序列化
// 需要通过Message的方法获得具体的消息类型
Message message = values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);
1
2
3
4
5

# 2. 参数调优

client 与 server 的配置参数

  • 客户端通过 .option() 方法来配置参数,它是给 SocketChannel 配置的参数
Bootstrap bootstrap = new Bootstrap()
    .group(group)
    .option( xxx )
    .channel(NioSocketChannel.class);
...
1
2
3
4
5
  • 服务端通过
    • .option() 是给 ServerSocketChannel 配置参数
    • .childOption() 是给 SocketChannel 配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap()
    .channel(NioServerSocketChannel.class)
    .group(boss, worker)
    .option( xxx )
    .childOption( xxx );
...
1
2
3
4
5
6

# 2.1 CONNECT_TIMEOUT_MILLIS

# 2.1.1 使用

  • 属于 SocketChannal 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • 注意:Netty 中不要用成了 SO_TIMEOUT,它主要用在阻塞 IO,而 Netty 是非阻塞 IO

使用:

public class TestParam {
    public static void main(String[] args) {
        // SocketChannel 5s内未建立连接就抛出异常
        new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        
        // ServerSocketChannel 5s内未建立连接就抛出异常
        new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000);
        // SocketChannel 5s内未建立连接就抛出异常
        new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    }
}
1
2
3
4
5
6
7
8
9
10
11

# 2.1.2 源码分析

通过分析产生连接超时的源码,我们可以学习到 Netty 的一些设计思路。

客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢?

client 的代码:

Bootstrap bootstrap = ....;
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
1
2
3

这是主线程的行为,调用 connect 方法后,会有一个 NIO 线程来执行连接操作,connect 方法立即返回一个 future 对象,这个对象其实是一个 Promise 对象。

看一下 NIO 线程,这个线程的关键在于 AbstractNioUnsafe 类的 connect 方法:













 


















public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
    ...
        
    // Schedule connect timeout.
    // 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    // 如果超时时间大于0
    if (connectTimeoutMillis > 0) {
        // 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
        // schedule(Runnable command, long delay, TimeUnit unit)
        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                // 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
                // 如果超时,则通过tryFailure方法将异常放入Promise中
                // 在主线程中抛出
                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                    close(voidPromise());
                }
            }
        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    
...
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

这里有一个 eventLoop,之前我们说过 NIO 线程其实就属于一个 EventLoop,它可以检测这个 loop 上的 IO 事件、加一些任务等,这里的 eventLoop().schedule() 就是在 eventLoop 上加一个定时任务,这个定时任务包含要执行的东西、时间:

  • 执行的东西:抛出 ConnectTimeoutException
  • 时间:我们配置的超时参数

当时间一到,他就执行这个任务,从而创建连接超时异常,并将这个异常通过 Promise 传给主线程。如果这个期间内连接成功,他就会取消这个任务,取消任务的代码这里就不演示了。

所以,超时的判断主要是通过 Eventloop 的 schedule 方法和 Promise 共同实现的:

  • schedule 设置了一个定时任务,延迟 connectTimeoutMillis 秒后执行该方法
  • 如果指定时间内没有建立连接,则会执行其中的任务
    • 任务负责创建 ConnectTimeoutException 异常,并将异常通过 Pormise 传给主线程并抛出

我们通过分析这里的源码,学习到了:

  • Netty 中通过 Promise 来进行线程通信
  • Netty 中可以通过 eventLoop 加一些定时任务,并在时间到达后开始执行这个任务

# 2.2 SO_BACKLOG

# 2.2.1 复习三次握手

参考下图,sync queue 表示半连接队列,accept queue 表示全连接队列:

20231203122042
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中:

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
    • sync queue - 半连接队列
      • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
    • accept queue - 全连接队列
      • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
      • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

文件中配置的这些值是系统参数,而在 netty 中我们也可以传入 backlog 参数。

在 Java NIO 中,是通过 bind() 函数传入这个参数值;在 netty 中,是需要通过 .option() 来传入这个参数。

在 Netty 中,可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置 SO_BACKLOG 的大小。

# 2.3 ulimit -n

属于操作系统的参数,用于限制一个进程能够打开的文件描述符的最大数量。

socket 这类也是被视为文件描述符。

# 2.4 TCP_NODELAY

属于 SocketChannal 参数。

表示是否开启 Nagle 算法。默认为 false,即开启 Nagle 算法。

# 2.5 SO_SNDBUF & SO_RCVBUF

表示发送缓存区和接收缓冲区,决定了滑动窗口的上限。

建议大家不要调节这两个参数,现在的 OS 都可以根据实际情况来智能调节。

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

# 2.6 ALLOCATOR

是 ByteBuf 的分配器。

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

# 2.7 RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
编辑 (opens new window)
上次更新: 2023/12/03, 06:19:28
应用二:协议设计与解析
MyBatis 概述与入门

← 应用二:协议设计与解析 MyBatis 概述与入门→

最近更新
01
Deep Reinforcement Learning
10-03
02
误删数据后怎么办
04-06
03
MySQL 一主多从
03-22
更多文章>
Theme by Vdoing | Copyright © 2021-2024 yubincloud | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×