notes notes
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
  • 并发编程

    • 并发编程
    • 多线程
    • 高级篇
  • 设计模式

    • 设计模式
  • 网络编程

    • Netty

      • NIO基础
        • ByteBuffer
          • 使用姿势
          • ByteBuffer 读写图示
          • 开始
          • 写入
          • 切换读取
          • 读取
          • 清除
          • 压缩
          • ByteBuffer 常用方法
          • 分配空间
          • 向 buffer 写入数据
          • 从 buffer 读取数据
          • mark 和 reset
          • 字符串与 ByteBuffer 互转
          • Scattering Reads
          • 半包黏包解析
        • 文件编程
          • FileChannel
          • 获取
          • 读取
          • 写入
          • 关闭
          • 位置
          • 大小
          • 强制写入
          • Channel 间传输数据
          • Path
          • Files
        • 网络编程
          • 阻塞/非阻塞
          • 阻塞
          • 非阻塞
          • 多路复用
          • Selector
          • 创建
          • 绑定 Channel 事件
          • 监听 Channel 事件
          • select 何时不阻塞
          • 处理 accept 事件
          • 处理 read 事件
          • 为何要 iter.remove()
          • cancel 的作用
          • 处理消息的边界
          • ByteBuffer 大小分配
          • 处理 write 事件
          • 一次无法写完例子
          • write 为何要取消
          • 利用多线程处理
          • 获取 CPU 核数
          • UDP
        • NIO / BIO
          • stream / channel
          • IO 模型
          • 阻塞 IO
          • 非阻塞 IO
          • 多路复用
          • 异步 IO
          • 阻塞 IO vs 多路复用
          • 零拷贝
          • 传统 IO 问题
          • NIO 优化
          • AIO
          • 文件 AIO
      • Netty入门
      • Netty进阶
      • 优化与源码
  • 源码篇

    • 环境搭建
    • Spring
  • 云原生

    • Kubernetes
    • Helm
  • ElasticSearch

    • ElasticSearch
  • Java 虚拟机

    • 深入拆解 Java 虚拟机
    • JVM与GC调优
  • MQ

    • RabbitMQ

      • RabbitMQ笔记
      • RabbitMQ集群搭建文档
  • Redis

    • Redis进阶
  • ShardingSphere

    • Sharding-JDBC
  • SpringCloud

    • SpringCloud
  • ZooKeeper

    • ZooKeeper
  • 学习笔记
  • 网络编程
  • Netty
starry
2023-08-03
目录

NIO基础

文中有关channel的部分使用完后未进行关闭,请自行关闭channel

# ByteBuffer

新建 Maven 项目导入依赖

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.15</version>
        </dependency>
    </dependencies>

ByteBuffer的简单使用

public static void main(String[] args) {
        // FileChannel
        // 输入输出流
        try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
            // 缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);

            // 从channel读取数据,写入到buffer
            while (channel.read(buffer) != -1) {
                /*
                切换为读模式,
                limit = position; (读)截至位置为原当前位置
                position = 0;     原位置从0开始(读)
                */
                buffer.flip();
                // 还有剩余
                while (buffer.hasRemaining()) {
                    // 默认读取一个字节
                    byte b = buffer.get();
                    Console.log("读取到的字节:{}", (char) b);
                }
                // 清除缓冲区
                buffer.clear();
            }

        } catch (IOException e) {
        }
    }

# 使用姿势

  1. 向 buffer 写入数据,比如 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,比如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复1~4

# ByteBuffer 读写图示

三个重要属性

  • capacity(容量)
  • position(读写位置)
  • limit(读写限制)

# 开始

最开始,还没有数据,读写位置从(下标)0开始,读写限制和最大容量都是最大值。 image.png

# 写入

写入数据,position随着写入数据而更新,limit和capacity都是最大值,下图模拟写入5个字节。 image.png

# 切换读取

flip 动作发生后,position变为0,从0开始读取;limit变为原position位置,限制读取最大值 image.png

# 读取

读取完5个字节 image.png

# 清除

clean 清除数据,变为最开始的状态 image.png

# 压缩

compact 把未读取完的部分向前压缩,然后切换到写模式 image.png

# ByteBuffer 常用方法

# 分配空间

        /*
        * class java.nio.HeapByteBuffer     java堆内存,读写效率低,受到GC的影响
        * class java.nio.DirectByteBuffer   直接内存(系统内存),读写效率高(少一次拷贝),不受GC的影响,但是分配时效率较低
        *
        * */
        System.out.println(ByteBuffer.allocate(16).getClass());
        System.out.println(ByteBuffer.allocateDirect(16).getClass());


        ByteBuffer buffer = ByteBuffer.allocate(16);

# 向 buffer 写入数据

两种方法:

  • 调用 channel 的 read 方法
int readBytes = channel.read(buffer);
  • 调用 buffer 自己的 put 方法
buffer.put((byte)127)

# 从 buffer 读取数据

同样有两种方法:

  • 调用 channel 的 write 方法
int writeBytes = channel.write(buffer);
  • 调用 buffer 自己的 get 方法
byte b = buffer.get();

get 方法会让position读指针先后走,如果想重复读取数据

  • 可以调用rewind方法将position重新设置为0
  • 或者调用get(int i)方法获取索引i的内容,它不会移动读指针

# mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

注意 rewind 和 flip 都会清除 mark 位置

# 字符串与 ByteBuffer 互转

        String str = "hello";

        // 1、字符串转 ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        buffer.put(str.getBytes(StandardCharsets.UTF_8));
        // 此时还是写模式,需要手动执行buffer.flip()

        // 2、Charset
        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode(str);
        // 此方法会自动切换到读模式

        // 3、wrap
        ByteBuffer buffer3 = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
        // 此方法会自动切换到读模式


        // ByteBuffer转字符串
        buffer.flip();
        String s1 = StandardCharsets.UTF_8.decode(buffer).toString();
        System.out.println(s1);

        String s2 = StandardCharsets.UTF_8.decode(buffer2).toString();
        System.out.println(s2);

        String s3 = StandardCharsets.UTF_8.decode(buffer3).toString();
        System.out.println(s3);

# Scattering Reads

分散读写,有个文本文件,假设我们已知内容

onetwothree

分散读取

try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
    ByteBuffer buffer1 = ByteBuffer.allocate(3);
    ByteBuffer buffer2 = ByteBuffer.allocate(3);
    ByteBuffer buffer3 = ByteBuffer.allocate(5);

    channel.read(new ByteBuffer[]{buffer1, buffer2, buffer3});
    buffer1.flip();
    buffer2.flip();
    buffer3.flip();

    System.out.println(StandardCharsets.UTF_8.decode(buffer1));
    System.out.println(StandardCharsets.UTF_8.decode(buffer2));
    System.out.println(StandardCharsets.UTF_8.decode(buffer3));

} catch (IOException e) {
}

分散写入

        ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("world");
        ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("你好");

        try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
            channel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});
        } catch (IOException e) {
        }

成功写到words2.txt

helloworld你好

# 半包黏包解析

_/**

  • 网络上有多条数据发送给服务器,数据之间用\n进行分隔
  • 但是由于某种原因这些数据在接收时,被进行重新组合
  • 比如原始数据有3条
  • Hello,world\n
  • I'm zhangsan\n
  • How are you?\n
  • 变成了下面的两个 byteBuffer(粘包,半包)
  • Hello,world\nI'm zhangsan\nHo
  • w are you?\n
  • 现在要求编写程序,将错乱的数据恢复成原始的按\n分隔的数据
  • **@param _**args */ 黏包:发送端将多条数据集中在一起,一次请求发送多条数据,接收端收到请求后,未能正确解析,导致数据粘黏在一起 半包:接收端的buffer太小,每次只能读取部分数据,数据就像只收到一半
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(32);
        buffer.put("Hello,world\nI'm zhangsan\nHo".getBytes(StandardCharsets.UTF_8));
        split(buffer);
        buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
        split(buffer);

    }

    /**
     * 打印分割好的数据
     * @param buffer
     */
    private static void split(ByteBuffer buffer) {
        // 切换为读模式
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            // 判断字符是否相等
            if ('\n' == buffer.get(i)) {
                // 取此条数据的长度,当前为\n的位置+1 减去 数据起始位置
                int length = i + 1 - buffer.position();
                ByteBuffer newBuffer = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    // 读取数据添加到新buffer
                    newBuffer.put(buffer.get());
                }
                newBuffer.flip();
                System.out.println(StandardCharsets.UTF_8.decode(newBuffer));
            }
        }
        // 压缩,将上次未读取完的数据移动到buffer头
        buffer.compact();

    }

成功分割

Hello,world

I'm zhangsan

How are you?

# 文件编程

# FileChannel

File 只能工作在阻塞模式下

# 获取

不能直接打开FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel方法

  • 通过FileInputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过RandomAccessFile是否能读写根据构造的mode决定

# 读取

会从channel读取数据填充ByteBuffer,返回值表示读取到多少字节,-1表示到达了文件的末尾

int readBytes = channel.read(buffer);

# 写入

正确写入姿势:

ByteBuffer buffer = ...;
buffer.put(...);	// 存入数据
buffer.flip();		// 切换读模式

while(buffer.hasRemaining()) {
	channel.write(buffer);
}

在while中调用 channel.wirte是因为write方法并不能保证一次性将buffer中的内容全部写入channel

# 关闭

channel必须关闭,不过调用FileInputStream、FileOutputStream、RandomAccessFile的close方法会间接的调用channel的close方法。

# 位置

获取当前位置

long pos = channel.position();

设置当前位置

long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回-1
  • 这时写入,会追加内容,当要注意如果position超过了问价末尾,再写入时会在新内容和原末尾之间存在空洞(00)

# 大小

使用size方法获取文件大小

# 强制写入

操作系统处于性能的考虑,会将数据缓存,而不是立即写入磁盘。可以调用force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

# Channel 间传输数据

        try (
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
        ) {

            // 效率高,底层利用操作系统的零拷贝进行优化
            from.transferTo(0, from.size(), to);
        } catch (IOException e) {
            e.printStackTrace();
        }

		//---------------传输大于2g的数据-------------

		try (
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
        ) {

            // 效率高,底层利用操作系统的零拷贝进行优化,只能传输2g的数据
            long size = from.size();
            // size 需要传输的总数据
            // last 当前剩余的数据(还未进行传输的数据)
            for (long last = size; last > 0; ) {
                // 开始位置就是总的减剩余的
                // 结果:剩余的减去本次传输的
                last -= from.transferTo(size - last, size, to);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

# Path

JDK7引入了Path和Paths类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
Path source = Paths.get("1.txt");		// 相对路径 使用 user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt");	// 绝对路径 d:\1.txt
Path source = Paths.get("d:/1.txt");	// 绝对路径 d:\1.txt
Path source = Paths.get("d:\\data","projects"); //d:\data\projects
  • .代表当前路径
  • ..代表上一级路径

比如:

d:
	|- data
    	|- projects
        	|- a
            |- b

代码

Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize());	// 正常化路径

输出

d:\data\projects\a\..\b
d:\data\projects\b

# Files

检查文件是否存在

Path path = Paths.get("helloworld/data/txt");
System.out.println(Files.exists(path));

创建一级目录

Path path = Paths.get("helloworld/d1");
Files.createDirectory(path);

如果目录存在会抛出异常;不能一次创建多级目录,否则会抛异常。

创建多级目录

Path path = Paths.get("helloworld/d1/d2");
Files.createDirectories(path);

拷贝文件

Path source = Paths.get("helloworld/data.txt");
Path target = Paths.get("helloworld/target.txt");

Files.copy(source,target);

如果文件已存在,会抛异常; 如果希望用 source 覆盖 target,需要用StandardCopyOption来控制

Files.copy(source,target,StandardCopyOption.REPLACE_EXISTING);

移动文件

Path source = Paths.get("helloworld/data.txt");
Path target = Paths.get("helloworld/data.txt");

Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);

StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性

删除文件

Path target = Paths.get("helloworld/target.txt");

Files.delete(target);

如果文件不存在,会抛异常

删除目录

Path target = Paths.get("helloworld/d1");

Files.delete(target);

如果目录还有文件,会抛异常

遍历目录文件

    public static void main(String[] args) throws IOException {
        AtomicInteger dirCount = new AtomicInteger();
        AtomicInteger fileCount = new AtomicInteger();
        Files.walkFileTree(Paths.get("D:\\environment\\Java\\jdk1.8.0_291"), new SimpleFileVisitor<Path>(){
            /**
             * 查看目录前
             * @param dir
             * @param attrs
             * @return
             * @throws IOException
             */
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                System.out.println("===>"+dir);
                dirCount.incrementAndGet();
                return super.preVisitDirectory(dir, attrs);
            }

            /**
             * 查看文件
             * @param file
             * @param attrs
             * @return
             * @throws IOException
             */
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                System.out.println(file);
                fileCount.incrementAndGet();
                return super.visitFile(file, attrs);
            }
        });
        System.out.println("dirCount = " + dirCount);
        System.out.println("fileCount = " + fileCount);
    }

查看以_ .jar _结尾的文件数量

    public static void main(String[] args) throws IOException {
        AtomicInteger jarCount = new AtomicInteger();

        Files.walkFileTree(Paths.get("D:\\environment\\Java\\jdk1.8.0_291"), new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                if (file.toString().endsWith(".jar")) {
                    jarCount.incrementAndGet();
                }
                return super.visitFile(file, attrs);
            }
        });
        System.out.println("jarCount = " + jarCount);
    }

删除多级目录 由于Files.delete只能删除空文件夹,所以我们需要借助Files.walkFileTree来完成删除

    public static void main(String[] args) throws IOException {
        Files.walkFileTree(Paths.get("C:\\Users\\starry\\Desktop\\test"), new SimpleFileVisitor<Path>() {
            /**
             * 查看文件
             *
             * @param file
             * @param attrs
             * @return
             * @throws IOException
             */
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                return super.visitFile(file, attrs);
            }

            /**
             * 查看文件夹之后
             *
             * @param dir
             * @param exc
             * @return
             * @throws IOException
             */
            @Override
            public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                Files.delete(dir);
                return super.postVisitDirectory(dir, exc);
            }
        });
        System.out.println("删除完成");
    }

拷贝多级目录

    public static void main(String[] args) throws IOException {
        String source = "C:\\Users\\starry\\Desktop\\test";
        String target = "C:\\Users\\starry\\Desktop\\test_target";
        Files.walk(Paths.get(source)).forEach(path -> {
            try {
                // 原路径进行替换
                String targetPath = path.toString().replace(source, target);
                if (Files.isDirectory(path)) {
                    // 文件夹
                    Files.createDirectories(Paths.get(targetPath));
                } else if (Files.isRegularFile(path)) {
                    // 常规文件
                    Files.createFile(Paths.get(targetPath));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("复制完成");
    }

# 网络编程

# 阻塞/非阻塞

# 阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept会在没有连接建立时让线程暂停
    • SockerChannel.read会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用CPU,当线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程的支持
  • 但多线程下,有新的问题
    • 32位JVM,一个线程320k,64位JVM一个线程1024k,如果连接数过多,必须导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive(不活跃),会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

代码演示

服务端

    public static void main(String[] args) throws Exception {
        // 接收数据
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // channel集合
        List<SocketChannel> channelList = new ArrayList<>();

        while (true) {
            // 建立与客户端的连接,socketChannel用来和客户端间接通信
            // 阻塞方法,线程停止运行
            SocketChannel socketChannel = ssc.accept();
            log.info("connected:{}",socketChannel);
            channelList.add(socketChannel);
            for (SocketChannel channel : channelList) {
                // 接收客户端发送的数据
                // 阻塞方法,线程停止运行
                channel.read(buffer);
                buffer.flip();
                log.info("data:{}",Charset.defaultCharset().decode(buffer));
                buffer.clear();
            }
        }

    }

客户端

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        System.out.println("waiting...");

    }

# 非阻塞

  • 非阻塞模式下,相关方法都不会让线程暂停
    • ServerSocketChannel.accept在没有连接建立时,会返回null,继续运行
    • SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其他SocketChannel的read或者去执行ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入Channel即可,无需等待Channel通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍在不断运行,白白浪费了CPU
  • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)

服务端代码,客户端不变

    private static void noBlocking() throws IOException {
        // 接收数据
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 是否阻塞,默认true
        ssc.configureBlocking(false);
        // 绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // channel集合
        List<SocketChannel> channelList = new ArrayList<>();

        while (true) {
            // 建立与客户端的连接,socketChannel用来和客户端间接通信
            // 配置了非阻塞,线程继续运行,如果没有连接,返回null
            SocketChannel socketChannel = ssc.accept();
            if (null != socketChannel) {
                log.info("connected:{}", socketChannel);
	            socketChannel.configureBlocking(false);
                channelList.add(socketChannel);
            }
            for (SocketChannel channel : channelList) {
                // 接收客户端发送的数据
                // 配置了非阻塞,线程继续运行,如果没有数据,返回0
                if (channel.read(buffer) > 0) {
                    buffer.flip();
                    log.info("data:{}", Charset.defaultCharset().decode(buffer));
                    buffer.clear();
                }
            }
        }
    }

# 多路复用

单线程可以配合Selector完成对多个Channel可读写时间的监控,这称为多路复用

  • 多路复用仅针对网络IO,普通文件IO没法利用多路复用
  • 如果不用 Selector的非阻塞模式,线程大部分时间都在做无用功,而 Selector能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel未必时时可写,一旦 Channel可写,会触发 Selector的可写事件

# Selector

好处

  • 一个线程配合 selector就可以监控多个channel的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

# 创建

Selector selector = Selector.open();

# 绑定 Channel 事件

也称为注册事件,绑定的事件Selector才会关心

channel.configureBlocking(false);
SelectionKey key = channel.register(selector,绑定事件);
  • channel必须工作在非阻塞模式
  • FileChannel没有非阻塞模式,因此不能配合Selector使用
  • 绑定的事件类型可以有
    • connect- 客户端连接成功时触发
    • accept- 服务器端成功接受连接时触发
    • read- 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write- 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

# 监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件

方法1,阻塞直到绑定事件发生

int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

# select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read事件,另外如果发送的数据大于 buffer缓冲区,会触发多次读取事件
    • channel可写,会触发 write事件
    • 在 linux 下 nio bug 发生时
  • 调用selector.wakeup()
  • 调用selector.close()
  • selector所在线程 interrupt

# 处理 accept 事件

客户端

public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        System.out.println("waiting...");

    }

服务端

    public static void main(String[] args) throws Exception {
        // 创建selector,管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 建立selector和channel的联系(注册)
        // SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 发生事件才继续运行,否则阻塞
            selector.select();
            // 处理事件,selectedKeys包含了所有发生的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 判断事件类型
                if (key.isAcceptable()) {

                    log.info("key:{}", key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    // 必须处理
                    SocketChannel socketChannel = channel.accept();
                    log.info("{}", socketChannel);
                    // 否则取消
                    //key.channel();

                }
                // 处理完key后,要删除
                iterator.remove();

            }
        }

    }

事件发生后不能不处理 事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,因为NIO底层使用的是水平触发

# 处理 read 事件

    public static void main(String[] args) throws Exception {
        // 创建selector,管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 建立selector和channel的联系(注册)
        // SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 发生事件才继续运行,否则阻塞
            selector.select();
            // 处理事件,selectedKeys包含了所有发生的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 判断事件类型
                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    // 将channel添加到selector
                    SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
						ByteBuffer buffer = ByteBuffer.allocate(16);
                        
                        int len = channel.read(buffer);
                        if (len == -1) {
                            // 客户端正常关闭
                            key.cancel();
                            channel.close();
                        } else {
                            // 输出内容
                            buffer.flip();
                            log.info("{}", Charset.defaultCharset().decode(buffer));
                            buffer.clear();
                        }
                    } catch (IOException e) {
                        // 客户端断开连接
                        e.printStackTrace();
                        key.cancel();
                    }
                }
                // 处理完key后,要删除
                iterator.remove();

            }
        }

    }

# 为何要 iter.remove()

因为select在事件发生后,就会将相关的key放入selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码删除。例如

  • 第一次触发了selectionKey上的accept事件,没有移除selectionKey
  • 第二次触发了channelKey上的read事件,但这时selectedKeys中还有上次的selectionKey,在处理时因为没有真正的serverSocket连上了,就会导致空指针异常

# cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

# 处理消息的边界

由于接收消息的长度是不可知的,接收端的ByteBuffer读取到字符的一半,就会出现乱码

你�
��
  • 一种思路是固定消息长度,数据包大小一样(不满进行填充),服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低,需要逐一读取判断
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

演示方法二

    private static void split(ByteBuffer buffer) {
        // 切换为读模式
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            // 判断字符是否相等
            if ('\n' == buffer.get(i)) {
                // 取此条数据的长度,当前为\n的位置+1 减去 数据起始位置
                int length = i + 1 - buffer.position();
                ByteBuffer newBuffer = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    // 读取数据添加到新buffer
                    newBuffer.put(buffer.get());
                }
                newBuffer.flip();
                System.out.println(StandardCharsets.UTF_8.decode(newBuffer));
            }
        }
        // 压缩,将上次未读取完的数据移动到buffer头
        buffer.compact();

    }


    public static void main(String[] args) throws Exception {
        // 创建selector,管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 建立selector和channel的联系(注册),
        // SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 发生事件才继续运行,否则阻塞
            selector.select();
            // 处理事件,selectedKeys包含了所有发生的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 判断事件类型
                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);

                    ByteBuffer buffer = ByteBuffer.allocate(8);
                    // 将channel添加到selector,将buffer作为附属绑定到对应的key
                    SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_READ, buffer);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 获取附属的buffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int len = channel.read(buffer);
                        if (len == -1) {
                            // 客户端正常关闭
                            key.cancel();
                            channel.close();
                        } else {
                            // 输出内容
                            split(buffer);
                            if (buffer.position() == buffer.limit()) {
                                // 扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
                                buffer.flip();
                                newBuffer.put(buffer);
                                key.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        // 客户端断开连接
                        e.printStackTrace();
                        key.cancel();
                    }
                }
                // 处理完key后,要删除
                iterator.remove();

            }
        }

    }

# ByteBuffer 大小分配

  • 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个ByteBuffer1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的ByteBuffer
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html (opens new window)
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

# 处理 write 事件

# 一次无法写完例子

  • 非阻塞模式下,无法保证把buffer中所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入字节数)
  • 用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将channel注册到selector上
    • selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册
    • 如果不取消,会每次可写均会触发write事件

服务端

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    SocketChannel channel = serverSocketChannel.accept();
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);

                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int len = channel.write(buffer);
                    log.info(String.valueOf(len));
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件
                        key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
                        key.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();

                    int len = channel.write(buffer);
                    log.info(String.valueOf(len));
                    if (!buffer.hasRemaining()) {
                        // 写完了,取消注册写事件
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        // 清空附属,help buffer gc
                        key.attach(null);
                    }
                }
            }
            iterator.remove();
        }
    }

客户端

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
        int count = 0;
        while (true) {
            count += socketChannel.read(buffer);
            System.out.println(count);
            buffer.clear();
        }


    }

# write 为何要取消

只要向channel发送数据时,socket缓冲可写,这个事件会频繁触发,因此应当只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注

# 利用多线程处理

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢? 分两组选择器

  • 单线程配一个选择器,专门处理accept事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理read事件
@Slf4j
public class ThreadServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        int cpus = Runtime.getRuntime().availableProcessors();
        log.info(String.valueOf(cpus));
        Worker[] workers = new Worker[cpus];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker(i);
        }
        AtomicInteger number = new AtomicInteger();
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    SocketChannel channel = serverSocketChannel.accept();
                    channel.configureBlocking(false);
                    // 将与客户端建立的channel和read_selector进行关联
                    workers[number.getAndIncrement() % workers.length].register(channel);
                }

                iterator.remove();
            }
        }
    }


    static class Worker implements Runnable {

        private Selector selector;
        private volatile boolean start = false;
        private int index;

        public Worker(int index) {
            this.index = index;
        }

        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        @SneakyThrows
        public void register(SocketChannel socketChannel) {
            if (!start) {
                // 只执行一次
                selector = Selector.open();
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            // 为了保证在同一线程中执行,使用队列
            tasks.add(() -> {
                try {
                    // 添加channel和read_selector的绑定
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            // 唤醒selector.select();
            selector.wakeup();
        }

        @SneakyThrows
        @Override
        public void run() {
            while (true) {
                selector.select();
                Runnable task = tasks.poll();
                if (null != task) {
                    task.run();
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        try {
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            int len = channel.read(buffer);
                            if (len == -1) {
                                key.cancel();
                                channel.close();
                            } else {
                                buffer.flip();
                                String str = Charset.defaultCharset().decode(buffer).toString();
                                log.info(str);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();
                            channel.close();
                        }
                    }
                    iterator.remove();
                }
            }

        }
    }

}

# 获取 CPU 核数

  • Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理cpu个数,而不是容器申请时的个数
  • 这个问题直到jdk 10才修复,使用 JVM 参数UseContainerSupport配置, 默认开启

# UDP

  • UDP 是无连接的,client 发送数据不会管 server 是否开启
  • server 这边的receive方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

服务端

public class UdpServer {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress(9999));
            System.out.println("waiting...");
            ByteBuffer buffer = ByteBuffer.allocate(32);
            channel.receive(buffer);
            buffer.flip();
            debug(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端

public class UdpClient {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
            InetSocketAddress address = new InetSocketAddress("localhost", 9999);
            channel.send(buffer, address);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

# NIO / BIO

# stream / channel

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream仅支持阻塞API,Channel同时支持阻塞、非阻塞API,网络Channel可配合selector实现多路复用
  • 二者均为全双工,即读写可以同时进行

# IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其他线程发送结果(至少两个线程)

当调用一次channel.read或stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

# 阻塞 IO

# 非阻塞 IO

# 多路复用

# 异步 IO

# 阻塞 IO vs 多路复用

详情参考:UNIX 网络编程 - 卷 I

https://www.yuque.com/starries/notes/kgzman#77f50247 (opens new window) https://www.softprayog.in/programming/io-multiplexing-select-poll-epoll-in-linux (opens new window)

# 零拷贝

# 传统 IO 问题

传统的 IO 将一个文件通过socket写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

工作流程:

  1. Java 本身并不具备 IO 读写能力,以此read方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓存区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间也不会使用 CPU
  2. 从内核态切换回用户态,将数据从内核缓冲器读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
  3. 调用write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,CPU 会参与拷贝
  4. 向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,Java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

# NIO 优化

通过DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer使用的还是 Java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer使用的是操作系统内存

大部分步骤与优化前相同。唯有一点:Java 可以使用DirectByteBuf将堆外内存映射到 JVM 内存中来直接访问使用

  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • Java 中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的sendFile方法),java 中对应着两个channel调用transferTo/transferFrom方法拷贝数据 image.png

  1. Java 调用transferTo方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU
  2. 数据从内核缓冲区传输到 socket 缓冲区,CPU会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4) image.png

  1. Java 调用transferTo方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 JVM 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 CPU 计算,减少 CPU 缓存伪共享
  • 零拷贝适合小文件传输

# AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

# 文件 AIO

AsynchronousFileChannel

    public static void main(String[] args) throws IOException {
        Path file = Paths.get("data.txt");
        StandardOpenOption option = StandardOpenOption.READ;
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file, option)) {
            ByteBuffer buffer = ByteBuffer.allocate(16);
            log.info("begin");
            // 参数一:读取文件后要写入的buffer
            // 参数二:从哪个位置开始读
            // 参数三:附属对象
            // 参数四:回调方法
            channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                /**
                 * read success
                 * @param result
                 * @param attachment
                 */
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    log.info("read success ==> length:{}", result);
                    attachment.flip();
                    String str = Charset.defaultCharset().decode(attachment).toString();
                    log.info("result:{}", str);

                }

                /**
                 * read failed
                 * @param exc
                 * @param attachment
                 */
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    log.error(exc.getMessage(), exc);
                }
            });
        } catch (IOException e) {
        }
        log.info("end");
        System.in.read();
    }

输出

22:02:52.075 [main] INFO com.starry.netty.files.TestAsynchronousFileChannel - begin
22:02:52.079 [main] INFO com.starry.netty.files.TestAsynchronousFileChannel - end
22:02:52.079 [Thread-16] INFO com.starry.netty.files.TestAsynchronousFileChannel - read success ==> length:13
22:02:52.080 [Thread-16] INFO com.starry.netty.files.TestAsynchronousFileChannel - result:1234567890abc

可以看到

  • 相应文件读取成功的另外一个线程 Thread-16
  • 主线程并没有 IO 阻塞

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

上次更新: 2024/03/03, 08:36:37
设计模式
Netty入门

← 设计模式 Netty入门→

Theme by Vdoing | Copyright © 2023-2024 Starry | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式