notes notes
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
  • 并发编程

    • 并发编程
    • 多线程
    • 高级篇
  • 设计模式

    • 设计模式
  • 网络编程

    • Netty

      • NIO基础
      • Netty入门
        • 概述
        • Hello World
          • 服务端
          • 客户端
          • 执行流程
        • 组件
          • EventLoop
          • 处理普通任务/定时任务
          • 优雅关闭
          • 处理 IO 事件
          • handler 执行中如何切换 EventLoop
          • Channel
          • ChannelFuture
          • CloseFuture
          • Future & Promise
          • Handler & Pipeline
          • ByteBuf
          • 创建
          • 直接内存 vs 堆内存
          • 池化 vs 非池化
          • 组成
          • 写入
          • 扩容
          • 读取
          • retain & release
          • slice
          • duplicate
          • copy
          • CompositeByteBuf
          • Unpooled
          • ByteBuf 优势
      • Netty进阶
      • 优化与源码
  • 源码篇

    • 环境搭建
    • Spring
  • 云原生

    • Kubernetes
    • Helm
  • ElasticSearch

    • ElasticSearch
  • Java 虚拟机

    • 深入拆解 Java 虚拟机
    • JVM与GC调优
  • MQ

    • RabbitMQ

      • RabbitMQ笔记
      • RabbitMQ集群搭建文档
  • Redis

    • Redis进阶
  • ShardingSphere

    • Sharding-JDBC
  • SpringCloud

    • SpringCloud
  • ZooKeeper

    • ZooKeeper
  • 学习笔记
  • 网络编程
  • Netty
starry
2023-08-03
目录

Netty入门

# 概述

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

# Hello World

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world
  • 服务器仅接收,不返回

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

# 服务端

    public static void main(String[] args) {
        // 启动器,负责组装 netty 各个组件,启动服务器
        new ServerBootstrap()
                // group 组,简单理解为 线程池+selector
                .group(new NioEventLoopGroup())
                // 选择服务器的 ServerSocketChannel 实现,linux、mac 等等,这里选择通用的 nio
                .channel(NioServerSocketChannel.class)
                // 添加子处理器给 SocketChannel 处理事件,能添加多个
                .childHandler(
                        // channel 代表和客户端进行数据读写的通道
                        // Initializer 初始化,负责添加其他的handler,在连接建立后会被调用
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                                // 添加具体的 handler
                                // 将 ByteBuf 转为 String
                                nioSocketChannel.pipeline().addLast(new StringDecoder());
                                // 自定义的 handler
                                nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                    // 读事件
                                    @Override
                                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                        System.out.println(msg);

                                    }
                                });
                            }
                        })
                // 绑定监听端口
                .bind(8080);
    }

# 客户端

    public static void main(String[] args) throws InterruptedException {
        // 客户端启动器
        new Bootstrap()
                // selector
                .group(new NioEventLoopGroup())
                // channel 实现
                .channel(NioSocketChannel.class)
                // channel 的事件处理
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 添加 handler 将 String 转为 ByteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接地址
                .connect("localhost", 8080)
                // 同步阻塞,直到成功连接到服务器
                .sync()
                // 获取与服务器建立的 channel
                .channel()
                // 向 channel 写数据
                .writeAndFlush("hello world");
    }

# 执行流程

组装所需的各个组件 添加事件处理器 selector 事件通知对应 handler 进行事件处理

  • 如果是 accept,netty 会自动帮我们建立连接,并执行 initChannel 方法
  • 如果是 read、write 会调用 initChannel 时添加的 handler 进行处理

举个栗子

  • 把 channel 理解为数据的通道
  • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • 把 handler 理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成...)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler 分 Inbound 和 Outbound 两类
  • 把 eventLoop 理解为处理数据的工人
    • 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

# 组件

# EventLoop

事件循环对象 EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。 它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看看自己属于哪个EventLoopGroup

事件循环组 EventLoopGroup是一组EventLoop,Channel 一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个 Channel 上的 io 事件都由此EventLoop来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

# 处理普通任务/定时任务

    public static void main(String[] args) {
        // 可处理 IO 事件,普通任务,定时任务
        // 未指定就查看环境变量是否配置了io.netty.eventLoopThreads,否则就是 系统线程数*2,如果都没有就是1
        EventLoopGroup group = new NioEventLoopGroup();
        // 可处理 普通任务,定时任务
        //EventLoopGroup group = new DefaultEventLoop();

        // 获取下一个事件循环对象
        for (EventExecutor eventExecutor : group) {
            System.out.println(eventExecutor);
        }

        // 执行普通任务
        group.next().execute(() -> {
            try {
                Thread.sleep(1000);
                log.info("task");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.info("schedule");
        }, 0, 1, TimeUnit.SECONDS);

        log.info("main");

        // 关闭,并非立即关闭
        group.shutdownGracefully();
    }

# 优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

# 处理 IO 事件

服务端

    public static void main(String[] args) {
        new ServerBootstrap()
                // 2个 EventLoopGroup
                // 参数一的只处理 ServerSocketChannel 的 accept 事件
                // 参数二的只处理 SocketChannel 的 read、write事件
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info(byteBuf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }

启动多个客户端发送消息

20:22:48.492 [nioEventLoopGroup-3-1] INFO com.starry.netty.eventloop.EventLoopServer - 111
20:22:58.221 [nioEventLoopGroup-3-2] INFO com.starry.netty.eventloop.EventLoopServer - 222
20:23:08.557 [nioEventLoopGroup-3-1] INFO com.starry.netty.eventloop.EventLoopServer - 333

可以看到两个 EventLoop 轮流处理 channel,并且 EventLoop 与 channel 之间进行了绑定

添加额外 EventLoopGroup

    public static void main(String[] args) {
        // 新增一个 EventLoopGroup 用来处理事件
        DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
        new ServerBootstrap()
                // 2个 EventLoopGroup
                // 参数一的只处理 ServerSocketChannel 的 accept 事件
                // 参数二的只处理 SocketChannel 的 read、write事件
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast("handler1",new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf byteBuf = (ByteBuf) msg;
                                        log.info(byteBuf.toString(StandardCharsets.UTF_8));
                                        // 将消息传递到下一个 handler
                                        ctx.fireChannelRead(msg);
                                    }
                                })
                                // 假设事件处理时间较长,可以使用额外 EventLoopGroup 来处理
                                .addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf byteBuf = (ByteBuf) msg;
                                        log.info(byteBuf.toString(StandardCharsets.UTF_8));
                                    }
                                })
                        ;
                    }
                })
                .bind(8080);
    }

输出

20:54:48.028 [nioEventLoopGroup-4-1] INFO com.starry.netty.eventloop.EventLoopServer - 1
20:54:48.029 [defaultEventLoopGroup-2-1] INFO com.starry.netty.eventloop.EventLoopServer - 1
20:55:18.741 [nioEventLoopGroup-4-2] INFO com.starry.netty.eventloop.EventLoopServer - 2
20:55:18.741 [defaultEventLoopGroup-2-2] INFO com.starry.netty.eventloop.EventLoopServer - 2
20:56:27.911 [nioEventLoopGroup-4-2] INFO com.starry.netty.eventloop.EventLoopServer - 2
20:56:27.912 [defaultEventLoopGroup-2-2] INFO com.starry.netty.eventloop.EventLoopServer - 2
20:57:06.961 [nioEventLoopGroup-4-1] INFO com.starry.netty.eventloop.EventLoopServer - 3
20:57:06.961 [defaultEventLoopGroup-2-1] INFO com.starry.netty.eventloop.EventLoopServer - 3

# handler 执行中如何切换 EventLoop

关键代码io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 获取下一个 EventExecutor(EventLoop)
        EventExecutor executor = next.executor();
        // 下一个 EventExecutor 所在线程是否和当前线程相等	return thread == this.thread;
        if (executor.inEventLoop()) {
            // 相等就直接调用下一个 handler
            next.invokeChannelRead(m);
        } else {
            // 不相等就新建一个线程去调用下一个 handler
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

# Channel

channel 的常用方法

  • close()可以用来关闭 channel
  • closeFuture()用来处理 channel 的关闭
    • sync方法作用是同步等待 channel 关闭
    • addListener方法是异步等待 channel 关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

# ChannelFuture

    public static void main(String[] args) throws InterruptedException, IOException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 异步非阻塞,main 线程发起调用,NioEventLoopGroup 真正执行
                .connect("localhost", 8080);

        Channel channel = channelFuture.channel();
        log.info("{}",channel);
        // 阻塞,直到成功连接服务器
        channelFuture.sync();
        channel = channelFuture.channel();
        log.info("{}",channel);


    }

打印结果

21:42:08.279 [main] INFO com.starry.netty.channel.TestChannel - [id: 0xda02875a]
21:42:08.290 [main] INFO com.starry.netty.channel.TestChannel - [id: 0xda02875a, L:/127.0.0.1:59148 - R:localhost/127.0.0.1:8080]

main 线程执行 第一次还未建立连接,只返回 id 第二次成功建立连接,带有 local 和 remote 信息

还可以使用回调方法

    public static void main(String[] args) throws InterruptedException, IOException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 异步非阻塞,main 线程发起调用,NioEventLoopGroup 真正执行
                .connect("localhost", 8080);

        // 异步回调
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                log.info("{}", channel);
                channel.writeAndFlush("hello");
            }
        });


        //Channel channel = channelFuture.channel();
        //log.info("{}",channel);
        //// 阻塞,直到成功连接服务器
        //channelFuture.sync();
        //channel = channelFuture.channel();
        //log.info("{}",channel);


    }

输出

21:48:54.710 [nioEventLoopGroup-2-1] INFO com.starry.netty.channel.TestChannel - [id: 0xdeccec20, L:/127.0.0.1:59352 - R:localhost/127.0.0.1:8080]

nioEventLoopGroup 执行,并不是 main 线程

# CloseFuture

关闭channel并执行后续操作

同步阻塞

    public static void main(String[] args) throws InterruptedException, IOException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 日志 handle
                        nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("localhost", 8080);

        Channel channel = channelFuture.sync().channel();

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    // 如果输入 q 就关闭 channel
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();


        ChannelFuture closeFuture = channel.closeFuture();
        // 同步阻塞,直到 channel 被关闭
        closeFuture.sync();
        log.info("do something...");
        // 关闭 eventLoopGroup
        eventLoopGroup.shutdownGracefully();


    }

输出

q
22:10:10.693 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x42023d3a, L:/127.0.0.1:59940 - R:localhost/127.0.0.1:8080] CLOSE
22:10:10.693 [main] INFO com.starry.netty.channel.TestCloseFuture - do something...

nioEventLoopGroup 关闭 channel main 处理善后工作

异步回调

    public static void main(String[] args) throws InterruptedException, IOException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 日志 handle
                        nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("localhost", 8080);

        Channel channel = channelFuture.sync().channel();

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    // 如果输入 q 就关闭 channel
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();


        ChannelFuture closeFuture = channel.closeFuture();
        //// 同步阻塞,直到 channel 被关闭
        //closeFuture.sync();
        //log.info("do something...");
        //// 关闭 eventLoopGroup
        //eventLoopGroup.shutdownGracefully();


        // 关闭异步回调
        closeFuture.addListener((ChannelFutureListener) future -> {
            log.info("do something...");
            // 关闭 eventLoopGroup
            eventLoopGroup.shutdownGracefully();
        });


    }

输出

q
22:15:01.092 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x24f724cd, L:/127.0.0.1:60131 - R:localhost/127.0.0.1:8080] CLOSE
22:15:01.093 [nioEventLoopGroup-2-1] INFO com.starry.netty.channel.TestCloseFuture - do something...

nioEventLoopGroup 进行后续处理,并不是 main 线程

# Future & Promise

在异步处理时,经常用到这两个接口 首先要说明 netty 中的Future与 jdk 中的Future同名,但是是两个接口,netty 的Future继承自 jdk 的Future,而Promise又对netty Future进行了扩展

  • jdk Future 只能同步等待任务结束(成功或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等到任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

jdk Future

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
        // 提交任务
        Future<Integer> future = executor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("calculate...");
                Thread.sleep(1000);
                return 50;
            }
        });

        // 获取结果
        log.info("main thread waiting...");
        log.info("result:{}",future.get());

        executor.shutdown();
    }

netty Future

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(2);
        EventLoop executors = group.next();

        Future<Integer> future = executors.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("calculate...");
                Thread.sleep(1000);
                return 50;
            }
        });

        log.info("main thread waiting...");
        // 同步
        //log.info("result:{}",future.get());

        // 异步
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.info("result:{}",future.getNow());
                group.shutdownGracefully();
            }
        });


    }

netty Promise

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(2);
        // 准备 EventLoop 对象
        EventLoop executors = group.next();
        // 创建 promise 结果容器
        Promise<Integer> promise = executors.newPromise();

        new Thread(() -> {
            // 任意线程执行计算,计算完向 promise 填充结果
            log.info("calculate...");
            try {
                //int i = 1 / 0;
                Thread.sleep(1000);
                promise.setSuccess(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
                promise.setFailure(e);
            }
        }).start();

        // 获取结果
        log.info("main thread waiting...");
        log.info("result:{}",promise.get());

        group.shutdownGracefully();


    }

# Handler & Pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工

打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

服务端

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(1);
                    ctx.fireChannelRead(msg); // 1
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(2);
                    ctx.fireChannelRead(msg); // 2
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(3);
                    ctx.channel().write(msg); // 3
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(4);
                    ctx.write(msg, promise); // 4
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(5);
                    ctx.write(msg, promise); // 5
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(6);
                    ctx.write(msg, promise); // 6
                }
            });
        }
    })
    .bind(8080);

客户端

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .addListener((ChannelFutureListener) future -> {
        future.channel().writeAndFlush("hello,world");
    });

服务端打印

1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而ChannelOutboundHandlerAdapter是按照addLast的逆序执行的。ChannelPipeline的实现是一个 ChannelHandlerContext(包装了ChannelHandler) 组成的双向链表

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的ctx.channel().write(msg)会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise)的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是触发出站处理器的执行
    • ctx.channel().write(msg)从尾部开始查找出站处理器
    • ctx.write(msg)是从当前节点找上一个出站处理器
    • 3 处的ctx.channel().write(msg)如果改为ctx.write(msg)仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的ctx.write(msg, promise)如果改为ctx.channel().write(msg)会打印 1 2 3 6 6 6... 因为ctx.channel().write()是从尾部开始查找,结果又是节点6 自己

EmbeddedChannel 对 handler 快速测试

    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站操作
        //channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes(StandardCharsets.UTF_8)));


    }

# ByteBuf

是对字节数据的封装

# 创建

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);

上面代码创建了一个默认的ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10

输出

read index:0 write index:0 capacity:10

其中 log 方法参考如下

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

# 直接内存 vs 堆内存

可以使用下面的代码来创建池化基于堆的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

# 池化 vs 非池化

池化的最大意义在于可以重用ByteBuf,优点有

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

# 组成

ByteBuf 由四部分组成 最开始读写指针都在 0 位置

# 写入

方法列表,省略一些不重要的方法

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是Big Endian

先写入 4 个字节

buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

结果是

read index:0 write index:4 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

再写入一个 int 整数,也是 4 个字节

buffer.writeInt(5);
log(buffer);

结果是

read index:0 write index:8 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

# 扩容

再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容

buffer.writeInt(6);
log(buffer);

扩容规则是

  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后capacity是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后capacity是 210=1024(29=512 已经不够了)
  • 扩容不能超过max capacity会报错

结果是

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06             |............    |
+--------+-------------------------------------------------+----------------+

# 读取

例如读了 4 次,每次一个字节

System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

1
2
3
4
read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

如果需要重复读取 int 整数 5,怎么办?

可以在read前先做个标记mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);

结果

5
read index:8 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06                                     |....            |
+--------+-------------------------------------------------+----------------+

这时要重复读取的话,重置到标记位置reset

buffer.resetReaderIndex();
log(buffer);

这时

read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

还有种办法是采用get开头的一系列方法,这些方法不会改变read index

# retain & release

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

回收内存的源码实现,请关注下面方法的不同实现

protected abstract void deallocate()

Netty 这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

  • 每个ByteBuf对象的初始计数为 1
  • 调用release方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用retain方法计数加 1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果直接在当前finally中release了,就失去了传递性(当然,如果在这个ChannelHandler内这个ByteBuf已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

  • 起点,对于 NIO 实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站ByteBuf处理原则
    • 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release
    • 将原始ByteBuf转换为其它类型的 Java 对象,这时ByteBuf就没用了,必须release
    • 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release
    • 注意各种异常,如果ByteBuf没有成功传递到下一个ChannelHandler,必须release
    • 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)
  • 出站ByteBuf处理原则
    • 出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext flush后release
  • 异常处理原则
    • 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true

TailContext释放未处理消息逻辑

// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

具体代码

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

# slice

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的read,write指针 例,原始 ByteBuf 进行一些初始操作

ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的read index到write index之间的内容进行切片,切片后的max capacity被固定为这个区间的大小,因此不能追加write

ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果原始 ByteBuf 再次读操作(又读了一个字节)

origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04                                           |..              |
+--------+-------------------------------------------------+----------------+

这时的 slice 不受影响,因为它有独立的读写指针

System.out.println(ByteBufUtil.prettyHexDump(slice));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果 slice 的内容发生了更改

slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05                                        |...             |
+--------+-------------------------------------------------+----------------+

这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存

System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05                                           |..              |
+--------+-------------------------------------------------+----------------+

# duplicate

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有max capacity的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的

# copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

# CompositeByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝

有两个 ByteBuf 如下

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+

现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?

方法1:

ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

结果

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

这种方法好不好?回答是不太好,因为进行了数据的内存复制操作

方法2:

CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);

结果是一样的

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

CompositeByteBuf是一个组合的ByteBuf,它内部维护了一个Component数组,每个Component管理一个ByteBuf,记录了这个ByteBuf相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

# Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

这里仅介绍其跟【零拷贝】相关的wrappedBuffer方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

也可以用来包装普通字节数组,底层也不会有拷贝操作

ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));

输出

class io.netty.buffer.CompositeByteBuf
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06                               |......          |
+--------+-------------------------------------------------+----------------+

# ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf
上次更新: 2024/03/03, 08:36:37
NIO基础
Netty进阶

← NIO基础 Netty进阶→

Theme by Vdoing | Copyright © 2023-2024 Starry | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式