基本组件:EventLoop、Channel、Future 与 Promise
# 1. EventLoop
EventLoop,事件循环对象,本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 IO 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此 EventLoop - 提供了
parent
方法来看看自己属于哪个 EventLoopGroup
- 提供了
EventLoopGroup,事件循环组,是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
# 1.1 简单示例
// 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup();
// for 遍历
for (EventExecutor eventLoop: group) {
System.out.printlin(eventLoop);
}
2
3
4
5
6
创建 NioEventLoopGroup
时可以指定其内部的线程数。
NioEventLoopGroup 可以处理 IO 事件、普通任务、定时任务,另外还有一种 DefaultEventLoopGroup 只能处理普通任务和定时任务。
// 处理普通与定时任务的示例
public class TestEventLoop {
public static void main(String[] args) {
// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过next方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());
// 通过EventLoop执行普通任务
group.next().execute(()->{
System.out.println(Thread.currentThread().getName() + " hello");
});
// 通过EventLoop执行定时任务
group.next().scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);
// 优雅地关闭
group.shutdownGracefully();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1.2 优雅关闭
优雅关闭 shutdownGracefully
方法:该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
# 1.3 演示 event loop 处理普通事件
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
EventLoop loop = group.next();
loop.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("ok");
});
log.debug("main");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
当然,event loop 也可以用于执行定时任务。
# 1.4 演示 NioEventLoop 处理 IO 事件
# 1.4.1 示例 1
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。
服务器端两个 NIO worker 工人(因为创建 NioEventLoopGroup 时指定参数 2 表示这个线程池是包含两个线程,也就是两个 EventLoop):
new ServerBootstrap()
// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan".getBytes()));
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
最后输出:
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
2
3
4
5
6
可以看到两个 EventLoop 轮流处理 channel,但 EventLoop 与 channel 之间进行了绑定,且一直负责处理该 channel 中的事件。
如上图,每个 channel 与一个 EventLoop 绑定,并交由这个 loop 处理(不同颜色的方块代表不同的 loop)。
# 1.4.2 示例 2
ServerBootstrap 的 .group()
方法不仅可以像上面的例子那样只接收一个 EventLoopGroup 作为参数,还可以接收两个 group 作为参数,第一个 group 作为 boss 只负责 accept 事件,第二个 group 作为 worker 只负责 socketChannel 上的数据读写。
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
2
3
# 1.4.3 示例 3
当 pipeline 中某个 handler 的运行耗费时间较长时,会占据 NIO 的 event loop 较久,从而影响其他的处理。
所以,当某个 handler 耗费可能较长时间时,最好不要让他占用 NIO 的线程,而是交给一个专门用于处理耗时较久的 event loop,这个 event loop 我们可以使用 DefaultEventLoopGroup,专门用来处理耗时较久的任务:
DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(normalWorkers,"myhandler",
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
在 ch.pipeline().addLast()
增加 handler 时,可以指定交由哪个 group,默认是交由 ServerBootstrap 的 NIO group,而当指定另一个 normalWorkers
group 时,这个任务则会交由指定的 group 处理。
运行后可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)
# 1.5 handler 执行中如何换人?
在刚刚的示例 3 中,中间的任务过程出现了更换 event loop 来执行的现象,也就是更换了线程来执行,这是怎么实现的呢?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
# 1.6 演示 NIOEventLoop 处理定时任务
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
2
3
4
5
6
7
# 2. Channel
- close() 可以用来关闭Channel
- closeFuture() 用来处理 Channel 的关闭,做一些善后处理
- sync 方法作用是同步等待 Channel 关闭
- 而 addListener 方法是异步等待 Channel 关闭
- pipeline() 方法用于添加处理器
- write() 方法将数据写入
- 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
- 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
- writeAndFlush() 方法将数据写入并立即发送(刷出)
# 2.1 ChannelFuture
ChannelFuture 是 Channel异步IO操作的结果。Netty中的所有IO操作都是异步的。这意味着任何IO调用都将立即返回,而不能保证所请求的IO操作在调用结束时完成。相反,将返回一个带有ChannelFuture的实例,该实例将提供有关IO操作的结果或状态的信息。
带有 Future、Promise 的类型基本都是和异步方法配套使用的,用来处理结果。
现在来看 ChannelFuture 使用的示例。
# 2.1.1 连接问题
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 该方法用于等待连接真正建立
channelFuture.sync(); // 阻塞住,直至 NIO 线程中的连接建立成功
// 获取客户端-服务器之间的Channel对象
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
System.in.read();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
如果我们去掉channelFuture.sync()
方法,会服务器无法收到 hello world。这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()
方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel()
拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端。
所以需要通过channelFuture.sync()
方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程。
下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)。
# 2.1.2 addListener 方法
通过这种方法可以在 NIO 线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作。
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 当connect方法执行完毕后,也就是连接真正建立后
// 会在NIO线程中调用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
}
});
System.in.read();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
如何在 channel 建立连接后发送消息
综上,我们可以看到两种方案:
- 方案 1:【sync 法】在主线程中使用
sync()
方法等待 NIO 线程完成连接的建立,然后获取其中的 channel 发送消息 - 方案 2:【addListener 法】对 NIO 线程的 ChannelFuture 调用
addListener()
,让 NIO 线程自己在完成连接的建立后发送消息
# 2.2 CloseFuture 来处理关闭
先看一个小需求:client 启动后不断监听用户输入,并将其通过网络发送给 server,直到用户输入 q
的时候才关闭连接,我们希望在连接完成关闭后,打印出提示信息。
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 创建EventLoopGroup,使用完毕后关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
// 创建一个 input 线程用于输入并向服务器发送
new Thread(()->{
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 关闭操作是异步的,在NIO线程中执行,所以紧跟着它的代码不一定是在 close 完成之后才执行的
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "inputThread").start();
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
// 同步等待NIO线程执行完close操作
closeFuture.sync();
// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
从 channel 获取的 CloseFuture 可以用于做一些连接关闭后的善后操作。
channel 虽然关闭了,但 NIO 的 EventLoopGroup 中还有线程没有关闭,所以在我们关闭了 chennel 并想结束程序的时候,需要优雅地把 EventLoopGroup 中的线程也关掉。
关闭 channel 时的一些小细节:
当我们要关闭channel时,可以调用channel.close()
方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作。如果我们想在 channel 真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现:
- 方法 1:【同步模式处理关闭】通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作:
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();
2
3
4
5
- 方法 2:【异步模式处理关闭】调用closeFuture.addListener方法,添加close的后续操作:
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});
2
3
4
5
6
7
8
9
要习惯于 Netty 中的这种异步的处理模式
# 2.3 为什么要异步?
Netty 为什么要用异步这种复杂的方式?简单地认为 Netty 是为了使用多线程而提高效率就片面了。
想象两个场景:
- 4 名医生来处理病人,每个病人都有一个医生处理全部流程
- 4 名医生分别处理病人来医院后的不同阶段,一个处理挂号,一个处理诊断,一个处理拿药等等,每个病人都要在 4 个医生那里走一遍流程,每个医生只处理一个阶段的任务
Netty 选择的就是类似后者的模型,这种方式并没有提高整个医院一天所能处理的患者数量,但它提高了每个医生在单位时间内所能处理的病人数量,也即是说,这种模型提高了吞吐量。
要注意的要点是(单论 Java 里的):
- 单线程没法异步提高效率,必须配合多线程、多核 CPU 才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加
- 合理进行任务拆分,也是利用异步的关键
Python 的 async 异步是在单线程内通过缩短等待 IO 的时间来提升的整体的效率
# 3. Future 与 Promise
# 3.1 概念
netty 中的 Future 与 JDK 中的 Future 同名,但是这是两个接口。
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展:
- JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- Netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
# 3.2 JDK Future
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newFixdThreadPool(2);
// 提交任务,获得Future对象
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return 50;
}
});
// 通过阻塞的方式,获得运行结果
System.out.println(future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3.3 Netty Future
Netty Future 的使用方式与 JDK 的类似,可以对比着学习。
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 获得 EventLoop 对象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 50;
}
});
// 主线程中获取结果
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
System.out.println("get " + future.get());
// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
运行结果:
main 获取结果
getNow null
get 50
nioEventLoopGroup-2-1 获取结果
getNow 50
2
3
4
5
Netty 中的 Future 对象,可以通过 EventLoop 的 sumbit() 方法得到
- 可以通过 Future 对象的 get 方法,阻塞地获取返回结果
- 也可以通过 getNow 方法,获取结果,若还没有结果,则返回 null,该方法是非阻塞的
- 还可以通过 future.addListener 方法,在 Callable 方法执行的线程中,异步获取返回结果
# 3.4 Netty Promise
Promise 相当于一个结果容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建EventLoop
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 创建Promise对象,用于存放结果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
try {
...
// 自定义线程向Promise中存放结果
promise.setSuccess(50);
} catch (Exception e) {
e.printStackTrace();
// 如果失败了,让等待的主线程也抛出异常,从而让他知道也出错了
promise.setFailure(e);
}
// 自定义线程向Promise中存放结果
promise.setSuccess(50);
}).start();
// 主线程从Promise中获取结果
System.out.println(Thread.currentThread().getName() + " " + promise.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27