notebook notebook
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)

学习笔记

啦啦啦,向太阳~
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)
  • Java开发

    • My

    • Posts

    • Java SE

    • Java 并发编程

    • JVM

    • JDBC

    • Java Web

    • Spring 与 SpringMVC

    • Spring Boot

    • Spring Cloud

    • Spring Security

    • Netty

      • 黑马 Netty

        • NIO 基础与 ByteBuffer
        • 文件编程
        • 网络编程
        • Netty 入门
        • 基本组件:EventLoop、Channel、Future 与 Promise
          • 1. EventLoop
            • 1.1 简单示例
            • 1.2 优雅关闭
            • 1.3 演示 event loop 处理普通事件
            • 1.4 演示 NioEventLoop 处理 IO 事件
            • 1.5 handler 执行中如何换人?
            • 1.6 演示 NIOEventLoop 处理定时任务
          • 2. Channel
            • 2.1 ChannelFuture
            • 2.2 CloseFuture 来处理关闭
            • 2.3 为什么要异步?
          • 3. Future 与 Promise
            • 3.1 概念
            • 3.2 JDK Future
            • 3.3 Netty Future
            • 3.4 Netty Promise
        • 基本组件:Handler、Pipeline 与 ByteBuf
        • 应用一:粘包与半包问题
        • 应用二:协议设计与解析
        • Netty 优化
    • MyBatis

  • Python开发

  • Golang开发

  • Git

  • 软件设计与架构

  • 大数据与分布式系统

  • 区块链

  • Nginx

  • 开发
  • Java开发
  • Netty
  • 黑马 Netty
yubin
2023-10-02
目录

基本组件:EventLoop、Channel、Future 与 Promise

# 1. EventLoop

EventLoop,事件循环对象,本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 IO 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup,事件循环组,是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

# 1.1 简单示例

// 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup();
// for 遍历
for (EventExecutor eventLoop: group) {
    System.out.printlin(eventLoop);
}
1
2
3
4
5
6

创建 NioEventLoopGroup 时可以指定其内部的线程数。

NioEventLoopGroup 可以处理 IO 事件、普通任务、定时任务,另外还有一种 DefaultEventLoopGroup 只能处理普通任务和定时任务。

// 处理普通与定时任务的示例
public class TestEventLoop {
    public static void main(String[] args) {
        // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过next方法可以获得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通过EventLoop执行普通任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通过EventLoop执行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地关闭
        group.shutdownGracefully();
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 1.2 优雅关闭

优雅关闭 shutdownGracefully 方法:该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

# 1.3 演示 event loop 处理普通事件

public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup();

    EventLoop loop = group.next();

    loop.submit(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("ok");
    });

    log.debug("main");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

当然,event loop 也可以用于执行定时任务。

# 1.4 演示 NioEventLoop 处理 IO 事件

# 1.4.1 示例 1

Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。

服务器端两个 NIO worker 工人(因为创建 NioEventLoopGroup 时指定参数 2 表示这个线程池是包含两个线程,也就是两个 EventLoop):

new ServerBootstrap()
    // 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost", 8080)
            .sync()
            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan".getBytes()));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

最后输出:

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu     
1
2
3
4
5
6

可以看到两个 EventLoop 轮流处理 channel,但 EventLoop 与 channel 之间进行了绑定,且一直负责处理该 channel 中的事件。

20231002184706

如上图,每个 channel 与一个 EventLoop 绑定,并交由这个 loop 处理(不同颜色的方块代表不同的 loop)。

# 1.4.2 示例 2

ServerBootstrap 的 .group() 方法不仅可以像上面的例子那样只接收一个 EventLoopGroup 作为参数,还可以接收两个 group 作为参数,第一个 group 作为 boss 只负责 accept 事件,第二个 group 作为 worker 只负责 socketChannel 上的数据读写。

new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    ...
1
2
3

# 1.4.3 示例 3

当 pipeline 中某个 handler 的运行耗费时间较长时,会占据 NIO 的 event loop 较久,从而影响其他的处理。

所以,当某个 handler 耗费可能较长时间时,最好不要让他占用 NIO 的线程,而是交给一个专门用于处理耗时较久的 event loop,这个 event loop 我们可以使用 DefaultEventLoopGroup,专门用来处理耗时较久的任务:

 






















DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch)  {
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(normalWorkers,"myhandler",
              new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

在 ch.pipeline().addLast() 增加 handler 时,可以指定交由哪个 group,默认是交由 ServerBootstrap 的 NIO group,而当指定另一个 normalWorkers group 时,这个任务则会交由指定的 group 处理。

运行后可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)

20231002192821

# 1.5 handler 执行中如何换人?

在刚刚的示例 3 中,中间的任务过程出现了更换 event loop 来执行的现象,也就是更换了线程来执行,这是怎么实现的呢?

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();
    
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

# 1.6 演示 NIOEventLoop 处理定时任务

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
1
2
3
4
5
6
7

# 2. Channel

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭,做一些善后处理
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

# 2.1 ChannelFuture

ChannelFuture 是 Channel异步IO操作的结果。Netty中的所有IO操作都是异步的。这意味着任何IO调用都将立即返回,而不能保证所请求的IO操作在调用结束时完成。相反,将返回一个带有ChannelFuture的实例,该实例将提供有关IO操作的结果或状态的信息。

带有 Future、Promise 的类型基本都是和异步方法配套使用的,用来处理结果。

现在来看 ChannelFuture 使用的示例。

# 2.1.1 连接问题

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
                // NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 该方法用于等待连接真正建立
        channelFuture.sync();  // 阻塞住,直至 NIO 线程中的连接建立成功
        
        // 获取客户端-服务器之间的Channel对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

如果我们去掉channelFuture.sync()方法,会服务器无法收到 hello world。这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端。

所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程。

下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)。

# 2.1.2 addListener 方法

通过这种方法可以在 NIO 线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作。


















 










public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
                // NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 当connect方法执行完毕后,也就是连接真正建立后
        // 会在NIO线程中调用operationComplete方法
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
        System.in.read();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

如何在 channel 建立连接后发送消息

综上,我们可以看到两种方案:

  • 方案 1:【sync 法】在主线程中使用 sync() 方法等待 NIO 线程完成连接的建立,然后获取其中的 channel 发送消息
  • 方案 2:【addListener 法】对 NIO 线程的 ChannelFuture 调用 addListener(),让 NIO 线程自己在完成连接的建立后发送消息

# 2.2 CloseFuture 来处理关闭

先看一个小需求:client 启动后不断监听用户输入,并将其通过网络发送给 server,直到用户输入 q 的时候才关闭连接,我们希望在连接完成关闭后,打印出提示信息。

public class ReadClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建EventLoopGroup,使用完毕后关闭
        NioEventLoopGroup group = new NioEventLoopGroup();
        
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();

        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);

        // 创建一个 input 线程用于输入并向服务器发送
        new Thread(()->{
            while (true) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 关闭操作是异步的,在NIO线程中执行,所以紧跟着它的代码不一定是在 close 完成之后才执行的
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();

        // 获得closeFuture对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");
        
        // 同步等待NIO线程执行完close操作
        closeFuture.sync();
        
        // 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
        System.out.println("关闭之后执行一些额外操作...");
        
        // 关闭EventLoopGroup
        group.shutdownGracefully();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

从 channel 获取的 CloseFuture 可以用于做一些连接关闭后的善后操作。

channel 虽然关闭了,但 NIO 的 EventLoopGroup 中还有线程没有关闭,所以在我们关闭了 chennel 并想结束程序的时候,需要优雅地把 EventLoopGroup 中的线程也关掉。

关闭 channel 时的一些小细节:

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作。如果我们想在 channel 真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现:

  • 方法 1:【同步模式处理关闭】通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作:
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待NIO线程执行完close操作
closeFuture.sync();
1
2
3
4
5
  • 方法 2:【异步模式处理关闭】调用closeFuture.addListener方法,添加close的后续操作:
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 等待channel关闭后才执行的操作
        System.out.println("关闭之后执行一些额外操作...");
        // 关闭EventLoopGroup
        group.shutdownGracefully();
    }
});
1
2
3
4
5
6
7
8
9

要习惯于 Netty 中的这种异步的处理模式

# 2.3 为什么要异步?

Netty 为什么要用异步这种复杂的方式?简单地认为 Netty 是为了使用多线程而提高效率就片面了。

想象两个场景:

  • 4 名医生来处理病人,每个病人都有一个医生处理全部流程
  • 4 名医生分别处理病人来医院后的不同阶段,一个处理挂号,一个处理诊断,一个处理拿药等等,每个病人都要在 4 个医生那里走一遍流程,每个医生只处理一个阶段的任务

Netty 选择的就是类似后者的模型,这种方式并没有提高整个医院一天所能处理的患者数量,但它提高了每个医生在单位时间内所能处理的病人数量,也即是说,这种模型提高了吞吐量。

要注意的要点是(单论 Java 里的):

  • 单线程没法异步提高效率,必须配合多线程、多核 CPU 才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

Python 的 async 异步是在单线程内通过缩短等待 IO 的时间来提升的整体的效率

# 3. Future 与 Promise

# 3.1 概念

netty 中的 Future 与 JDK 中的 Future 同名,但是这是两个接口。

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展:

  • JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • Netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

# 3.2 JDK Future

public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建线程池
        ExecutorService executor = Executors.newFixdThreadPool(2);

        // 提交任务,获得Future对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通过阻塞的方式,获得运行结果
        System.out.println(future.get());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 3.3 Netty Future

Netty Future 的使用方式与 JDK 的类似,可以对比着学习。

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 获得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });

        // 主线程中获取结果
        System.out.println(Thread.currentThread().getName() + " 获取结果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());

        // NIO线程中异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取结果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

运行结果:

main 获取结果
getNow null
get 50
nioEventLoopGroup-2-1 获取结果
getNow 50
1
2
3
4
5

Netty 中的 Future 对象,可以通过 EventLoop 的 sumbit() 方法得到

  • 可以通过 Future 对象的 get 方法,阻塞地获取返回结果
  • 也可以通过 getNow 方法,获取结果,若还没有结果,则返回 null,该方法是非阻塞的
  • 还可以通过 future.addListener 方法,在 Callable 方法执行的线程中,异步获取返回结果

# 3.4 Netty Promise

Promise 相当于一个结果容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果














 

 












public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 创建Promise对象,用于存放结果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            try {
                ...
                // 自定义线程向Promise中存放结果
                promise.setSuccess(50);
            } catch (Exception e) {
                e.printStackTrace();
                // 如果失败了,让等待的主线程也抛出异常,从而让他知道也出错了
                promise.setFailure(e);
            }
            // 自定义线程向Promise中存放结果
            promise.setSuccess(50);
        }).start();

        // 主线程从Promise中获取结果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
编辑 (opens new window)
上次更新: 2023/11/12, 11:03:40
Netty 入门
基本组件:Handler、Pipeline 与 ByteBuf

← Netty 入门 基本组件:Handler、Pipeline 与 ByteBuf→

最近更新
01
Deep Reinforcement Learning
10-03
02
误删数据后怎么办
04-06
03
MySQL 一主多从
03-22
更多文章>
Theme by Vdoing | Copyright © 2021-2024 yubincloud | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×