NIO基础
文中有关channel的部分使用完后未进行关闭,请自行关闭channel
# ByteBuffer
新建 Maven 项目导入依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.15</version>
</dependency>
</dependencies>
ByteBuffer的简单使用
public static void main(String[] args) {
// FileChannel
// 输入输出流
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
// 从channel读取数据,写入到buffer
while (channel.read(buffer) != -1) {
/*
切换为读模式,
limit = position; (读)截至位置为原当前位置
position = 0; 原位置从0开始(读)
*/
buffer.flip();
// 还有剩余
while (buffer.hasRemaining()) {
// 默认读取一个字节
byte b = buffer.get();
Console.log("读取到的字节:{}", (char) b);
}
// 清除缓冲区
buffer.clear();
}
} catch (IOException e) {
}
}
# 使用姿势
- 向 buffer 写入数据,比如 channel.read(buffer)
- 调用 flip() 切换至读模式
- 从 buffer 读取数据,比如调用 buffer.get()
- 调用 clear() 或 compact() 切换至写模式
- 重复1~4
# ByteBuffer 读写图示
三个重要属性
- capacity(容量)
- position(读写位置)
- limit(读写限制)
# 开始
最开始,还没有数据,读写位置从(下标)0开始,读写限制和最大容量都是最大值。
# 写入
写入数据,position随着写入数据而更新,limit和capacity都是最大值,下图模拟写入5个字节。
# 切换读取
flip 动作发生后,position变为0,从0开始读取;limit变为原position位置,限制读取最大值
# 读取
读取完5个字节
# 清除
clean 清除数据,变为最开始的状态
# 压缩
compact 把未读取完的部分向前压缩,然后切换到写模式
# ByteBuffer 常用方法
# 分配空间
/*
* class java.nio.HeapByteBuffer java堆内存,读写效率低,受到GC的影响
* class java.nio.DirectByteBuffer 直接内存(系统内存),读写效率高(少一次拷贝),不受GC的影响,但是分配时效率较低
*
* */
System.out.println(ByteBuffer.allocate(16).getClass());
System.out.println(ByteBuffer.allocateDirect(16).getClass());
ByteBuffer buffer = ByteBuffer.allocate(16);
# 向 buffer 写入数据
两种方法:
- 调用 channel 的 read 方法
int readBytes = channel.read(buffer);
- 调用 buffer 自己的 put 方法
buffer.put((byte)127)
# 从 buffer 读取数据
同样有两种方法:
- 调用 channel 的 write 方法
int writeBytes = channel.write(buffer);
- 调用 buffer 自己的 get 方法
byte b = buffer.get();
get 方法会让position
读指针先后走,如果想重复读取数据
- 可以调用
rewind
方法将position
重新设置为0
- 或者调用
get(int i)
方法获取索引i
的内容,它不会移动读指针
# mark 和 reset
mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置
注意 rewind 和 flip 都会清除 mark 位置
# 字符串与 ByteBuffer 互转
String str = "hello";
// 1、字符串转 ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put(str.getBytes(StandardCharsets.UTF_8));
// 此时还是写模式,需要手动执行buffer.flip()
// 2、Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode(str);
// 此方法会自动切换到读模式
// 3、wrap
ByteBuffer buffer3 = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
// 此方法会自动切换到读模式
// ByteBuffer转字符串
buffer.flip();
String s1 = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println(s1);
String s2 = StandardCharsets.UTF_8.decode(buffer2).toString();
System.out.println(s2);
String s3 = StandardCharsets.UTF_8.decode(buffer3).toString();
System.out.println(s3);
# Scattering Reads
分散读写,有个文本文件,假设我们已知内容
onetwothree
分散读取
try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
ByteBuffer buffer1 = ByteBuffer.allocate(3);
ByteBuffer buffer2 = ByteBuffer.allocate(3);
ByteBuffer buffer3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{buffer1, buffer2, buffer3});
buffer1.flip();
buffer2.flip();
buffer3.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer1));
System.out.println(StandardCharsets.UTF_8.decode(buffer2));
System.out.println(StandardCharsets.UTF_8.decode(buffer3));
} catch (IOException e) {
}
分散写入
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});
} catch (IOException e) {
}
成功写到words2.txt
helloworld你好
# 半包黏包解析
_/**
- 网络上有多条数据发送给服务器,数据之间用\n进行分隔
- 但是由于某种原因这些数据在接收时,被进行重新组合
- 比如原始数据有3条
- Hello,world\n
- I'm zhangsan\n
- How are you?\n
- 变成了下面的两个 byteBuffer(粘包,半包)
- Hello,world\nI'm zhangsan\nHo
- w are you?\n
- 现在要求编写程序,将错乱的数据恢复成原始的按\n分隔的数据
- **@param _**args */ 黏包:发送端将多条数据集中在一起,一次请求发送多条数据,接收端收到请求后,未能正确解析,导致数据粘黏在一起 半包:接收端的buffer太小,每次只能读取部分数据,数据就像只收到一半
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(32);
buffer.put("Hello,world\nI'm zhangsan\nHo".getBytes(StandardCharsets.UTF_8));
split(buffer);
buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
split(buffer);
}
/**
* 打印分割好的数据
* @param buffer
*/
private static void split(ByteBuffer buffer) {
// 切换为读模式
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
// 判断字符是否相等
if ('\n' == buffer.get(i)) {
// 取此条数据的长度,当前为\n的位置+1 减去 数据起始位置
int length = i + 1 - buffer.position();
ByteBuffer newBuffer = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
// 读取数据添加到新buffer
newBuffer.put(buffer.get());
}
newBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(newBuffer));
}
}
// 压缩,将上次未读取完的数据移动到buffer头
buffer.compact();
}
成功分割
Hello,world
I'm zhangsan
How are you?
# 文件编程
# FileChannel
File 只能工作在阻塞模式下
# 获取
不能直接打开FileChannel
,必须通过FileInputStream
、FileOutputStream
或者RandomAccessFile
来获取FileChannel
,它们都有getChannel
方法
- 通过
FileInputStream
获取的channel
只能读 - 通过
FileOutputStream
获取的channel
只能写 - 通过
RandomAccessFile
是否能读写根据构造的mode
决定
# 读取
会从channel
读取数据填充ByteBuffer
,返回值表示读取到多少字节,-1表示到达了文件的末尾
int readBytes = channel.read(buffer);
# 写入
正确写入姿势:
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式
while(buffer.hasRemaining()) {
channel.write(buffer);
}
在while
中调用 channel.wirte
是因为write
方法并不能保证一次性将buffer
中的内容全部写入channel
# 关闭
channel
必须关闭,不过调用FileInputStream、FileOutputStream、RandomAccessFile
的close
方法会间接的调用channel
的close
方法。
# 位置
获取当前位置
long pos = channel.position();
设置当前位置
long newPos = ...;
channel.position(newPos);
设置当前位置时,如果设置为文件的末尾
- 这时读取会返回-1
- 这时写入,会追加内容,当要注意如果
position
超过了问价末尾,再写入时会在新内容和原末尾之间存在空洞(00)
# 大小
使用size
方法获取文件大小
# 强制写入
操作系统处于性能的考虑,会将数据缓存,而不是立即写入磁盘。可以调用force(true)
方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
# Channel 间传输数据
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 效率高,底层利用操作系统的零拷贝进行优化
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
//---------------传输大于2g的数据-------------
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 效率高,底层利用操作系统的零拷贝进行优化,只能传输2g的数据
long size = from.size();
// size 需要传输的总数据
// last 当前剩余的数据(还未进行传输的数据)
for (long last = size; last > 0; ) {
// 开始位置就是总的减剩余的
// 结果:剩余的减去本次传输的
last -= from.transferTo(size - last, size, to);
}
} catch (IOException e) {
e.printStackTrace();
}
# Path
JDK7引入了Path
和Paths
类
- Path 用来表示文件路径
- Paths 是工具类,用来获取 Path 实例
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt"); // 绝对路径 d:\1.txt
Path source = Paths.get("d:/1.txt"); // 绝对路径 d:\1.txt
Path source = Paths.get("d:\\data","projects"); //d:\data\projects
.
代表当前路径..
代表上一级路径
比如:
d:
|- data
|- projects
|- a
|- b
代码
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径
输出
d:\data\projects\a\..\b
d:\data\projects\b
# Files
检查文件是否存在
Path path = Paths.get("helloworld/data/txt");
System.out.println(Files.exists(path));
创建一级目录
Path path = Paths.get("helloworld/d1");
Files.createDirectory(path);
如果目录存在会抛出异常;不能一次创建多级目录,否则会抛异常。
创建多级目录
Path path = Paths.get("helloworld/d1/d2");
Files.createDirectories(path);
拷贝文件
Path source = Paths.get("helloworld/data.txt");
Path target = Paths.get("helloworld/target.txt");
Files.copy(source,target);
如果文件已存在,会抛异常;
如果希望用 source 覆盖 target,需要用StandardCopyOption
来控制
Files.copy(source,target,StandardCopyOption.REPLACE_EXISTING);
移动文件
Path source = Paths.get("helloworld/data.txt");
Path target = Paths.get("helloworld/data.txt");
Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);
StandardCopyOption.ATOMIC_MOVE
保证文件移动的原子性
删除文件
Path target = Paths.get("helloworld/target.txt");
Files.delete(target);
如果文件不存在,会抛异常
删除目录
Path target = Paths.get("helloworld/d1");
Files.delete(target);
如果目录还有文件,会抛异常
遍历目录文件
public static void main(String[] args) throws IOException {
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(Paths.get("D:\\environment\\Java\\jdk1.8.0_291"), new SimpleFileVisitor<Path>(){
/**
* 查看目录前
* @param dir
* @param attrs
* @return
* @throws IOException
*/
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("===>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
/**
* 查看文件
* @param file
* @param attrs
* @return
* @throws IOException
*/
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println("dirCount = " + dirCount);
System.out.println("fileCount = " + fileCount);
}
查看以_ .jar _结尾的文件数量
public static void main(String[] args) throws IOException {
AtomicInteger jarCount = new AtomicInteger();
Files.walkFileTree(Paths.get("D:\\environment\\Java\\jdk1.8.0_291"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.toString().endsWith(".jar")) {
jarCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println("jarCount = " + jarCount);
}
删除多级目录
由于Files.delete
只能删除空文件夹,所以我们需要借助Files.walkFileTree
来完成删除
public static void main(String[] args) throws IOException {
Files.walkFileTree(Paths.get("C:\\Users\\starry\\Desktop\\test"), new SimpleFileVisitor<Path>() {
/**
* 查看文件
*
* @param file
* @param attrs
* @return
* @throws IOException
*/
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
/**
* 查看文件夹之后
*
* @param dir
* @param exc
* @return
* @throws IOException
*/
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
System.out.println("删除完成");
}
拷贝多级目录
public static void main(String[] args) throws IOException {
String source = "C:\\Users\\starry\\Desktop\\test";
String target = "C:\\Users\\starry\\Desktop\\test_target";
Files.walk(Paths.get(source)).forEach(path -> {
try {
// 原路径进行替换
String targetPath = path.toString().replace(source, target);
if (Files.isDirectory(path)) {
// 文件夹
Files.createDirectories(Paths.get(targetPath));
} else if (Files.isRegularFile(path)) {
// 常规文件
Files.createFile(Paths.get(targetPath));
}
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("复制完成");
}
# 网络编程
# 阻塞/非阻塞
# 阻塞
- 阻塞模式下,相关方法都会导致线程暂停
ServerSocketChannel.accept
会在没有连接建立时让线程暂停SockerChannel.read
会在没有数据可读时让线程暂停- 阻塞的表现其实就是线程暂停了,暂停期间不会占用CPU,当线程相当于闲置
- 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程的支持
- 但多线程下,有新的问题
- 32位JVM,一个线程320k,64位JVM一个线程1024k,如果连接数过多,必须导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
- 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive(不活跃),会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
代码演示
服务端
public static void main(String[] args) throws Exception {
// 接收数据
ByteBuffer buffer = ByteBuffer.allocate(16);
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// channel集合
List<SocketChannel> channelList = new ArrayList<>();
while (true) {
// 建立与客户端的连接,socketChannel用来和客户端间接通信
// 阻塞方法,线程停止运行
SocketChannel socketChannel = ssc.accept();
log.info("connected:{}",socketChannel);
channelList.add(socketChannel);
for (SocketChannel channel : channelList) {
// 接收客户端发送的数据
// 阻塞方法,线程停止运行
channel.read(buffer);
buffer.flip();
log.info("data:{}",Charset.defaultCharset().decode(buffer));
buffer.clear();
}
}
}
客户端
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
System.out.println("waiting...");
}
# 非阻塞
- 非阻塞模式下,相关方法都不会让线程暂停
ServerSocketChannel.accept
在没有连接建立时,会返回null
,继续运行SocketChannel.read
在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其他SocketChannel
的read
或者去执行ServerSocketChannel.accept
- 写数据时,线程只是等待数据写入
Channel
即可,无需等待Channel
通过网络把数据发送出去
- 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍在不断运行,白白浪费了CPU
- 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
服务端代码,客户端不变
private static void noBlocking() throws IOException {
// 接收数据
ByteBuffer buffer = ByteBuffer.allocate(16);
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 是否阻塞,默认true
ssc.configureBlocking(false);
// 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// channel集合
List<SocketChannel> channelList = new ArrayList<>();
while (true) {
// 建立与客户端的连接,socketChannel用来和客户端间接通信
// 配置了非阻塞,线程继续运行,如果没有连接,返回null
SocketChannel socketChannel = ssc.accept();
if (null != socketChannel) {
log.info("connected:{}", socketChannel);
socketChannel.configureBlocking(false);
channelList.add(socketChannel);
}
for (SocketChannel channel : channelList) {
// 接收客户端发送的数据
// 配置了非阻塞,线程继续运行,如果没有数据,返回0
if (channel.read(buffer) > 0) {
buffer.flip();
log.info("data:{}", Charset.defaultCharset().decode(buffer));
buffer.clear();
}
}
}
}
# 多路复用
单线程可以配合Selector
完成对多个Channel
可读写时间的监控,这称为多路复用
- 多路复用仅针对网络IO,普通文件IO没法利用多路复用
- 如果不用
Selector
的非阻塞模式,线程大部分时间都在做无用功,而Selector
能够保证- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入
- 限于网络传输能力,
Channel
未必时时可写,一旦Channel
可写,会触发Selector
的可写事件
- 限于网络传输能力,
# Selector
好处
- 一个线程配合
selector
就可以监控多个channel
的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功 - 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换
# 创建
Selector selector = Selector.open();
# 绑定 Channel 事件
也称为注册事件,绑定的事件Selector
才会关心
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,绑定事件);
channel
必须工作在非阻塞模式FileChannel
没有非阻塞模式,因此不能配合Selector
使用- 绑定的事件类型可以有
connect
- 客户端连接成功时触发accept
- 服务器端成功接受连接时触发read
- 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况write
- 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
# 监听 Channel 事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel
发生了事件
方法1,阻塞直到绑定事件发生
int count = selector.select();
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();
# select 何时不阻塞
- 事件发生时
- 客户端发起连接请求,会触发
accept
事件 - 客户端发送数据过来,客户端正常、异常关闭时,都会触发
read
事件,另外如果发送的数据大于buffer
缓冲区,会触发多次读取事件 channel
可写,会触发write
事件- 在 linux 下 nio bug 发生时
- 客户端发起连接请求,会触发
- 调用
selector.wakeup()
- 调用
selector.close()
selector
所在线程interrupt
# 处理 accept 事件
客户端
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
System.out.println("waiting...");
}
服务端
public static void main(String[] args) throws Exception {
// 创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 建立selector和channel的联系(注册)
// SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 发生事件才继续运行,否则阻塞
selector.select();
// 处理事件,selectedKeys包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断事件类型
if (key.isAcceptable()) {
log.info("key:{}", key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel socketChannel = channel.accept();
log.info("{}", socketChannel);
// 否则取消
//key.channel();
}
// 处理完key后,要删除
iterator.remove();
}
}
}
事件发生后不能不处理 事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,因为NIO底层使用的是水平触发
# 处理 read 事件
public static void main(String[] args) throws Exception {
// 创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 建立selector和channel的联系(注册)
// SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 发生事件才继续运行,否则阻塞
selector.select();
// 处理事件,selectedKeys包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
// 将channel添加到selector
SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int len = channel.read(buffer);
if (len == -1) {
// 客户端正常关闭
key.cancel();
channel.close();
} else {
// 输出内容
buffer.flip();
log.info("{}", Charset.defaultCharset().decode(buffer));
buffer.clear();
}
} catch (IOException e) {
// 客户端断开连接
e.printStackTrace();
key.cancel();
}
}
// 处理完key后,要删除
iterator.remove();
}
}
}
# 为何要 iter.remove()
因为select
在事件发生后,就会将相关的key
放入selectedKeys
集合,但不会在处理完后从selectedKeys
集合中移除,需要我们自己编码删除。例如
- 第一次触发了
selectionKey
上的accept
事件,没有移除selectionKey
- 第二次触发了
channelKey
上的read
事件,但这时selectedKeys
中还有上次的selectionKey
,在处理时因为没有真正的serverSocket
连上了,就会导致空指针异常
# cancel 的作用
cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
# 处理消息的边界
由于接收消息的长度是不可知的,接收端的ByteBuffer
读取到字符的一半,就会出现乱码
你�
��
- 一种思路是固定消息长度,数据包大小一样(不满进行填充),服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低,需要逐一读取判断
- TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
演示方法二
private static void split(ByteBuffer buffer) {
// 切换为读模式
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
// 判断字符是否相等
if ('\n' == buffer.get(i)) {
// 取此条数据的长度,当前为\n的位置+1 减去 数据起始位置
int length = i + 1 - buffer.position();
ByteBuffer newBuffer = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
// 读取数据添加到新buffer
newBuffer.put(buffer.get());
}
newBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(newBuffer));
}
}
// 压缩,将上次未读取完的数据移动到buffer头
buffer.compact();
}
public static void main(String[] args) throws Exception {
// 创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 建立selector和channel的联系(注册),
// SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 发生事件才继续运行,否则阻塞
selector.select();
// 处理事件,selectedKeys包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8);
// 将channel添加到selector,将buffer作为附属绑定到对应的key
SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_READ, buffer);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
// 获取附属的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
int len = channel.read(buffer);
if (len == -1) {
// 客户端正常关闭
key.cancel();
channel.close();
} else {
// 输出内容
split(buffer);
if (buffer.position() == buffer.limit()) {
// 扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
buffer.flip();
newBuffer.put(buffer);
key.attach(newBuffer);
}
}
} catch (IOException e) {
// 客户端断开连接
e.printStackTrace();
key.cancel();
}
}
// 处理完key后,要删除
iterator.remove();
}
}
}
# ByteBuffer 大小分配
- 每个
channel
都需要记录可能被切分的消息,因为ByteBuffer
不能被多个channel
共同使用,因此需要为每个channel
维护一个独立的ByteBuffer
ByteBuffer
不能太大,比如一个ByteBuffer
1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的ByteBuffer
- 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html (opens new window)
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
# 处理 write 事件
# 一次无法写完例子
- 非阻塞模式下,无法保证把
buffer
中所有数据都写入channel
,因此需要追踪write
方法的返回值(代表实际写入字节数) - 用
selector
监听所有channel
的可写事件,每个channel
都需要一个key
来跟踪buffer
,但这样又会导致占用内存过多,就有两阶段策略- 当消息处理器第一次写入消息时,才将
channel
注册到selector
上 selector
检查channel
上的可写事件,如果所有的数据写完了,就取消channel
的注册- 如果不取消,会每次可写均会触发
write
事件
- 当消息处理器第一次写入消息时,才将
服务端
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int len = channel.write(buffer);
log.info(String.valueOf(len));
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
key.attach(buffer);
}
} else if (key.isWritable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int len = channel.write(buffer);
log.info(String.valueOf(len));
if (!buffer.hasRemaining()) {
// 写完了,取消注册写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
// 清空附属,help buffer gc
key.attach(null);
}
}
}
iterator.remove();
}
}
客户端
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
int count = 0;
while (true) {
count += socketChannel.read(buffer);
System.out.println(count);
buffer.clear();
}
}
# write 为何要取消
只要向channel
发送数据时,socket
缓冲可写,这个事件会频繁触发,因此应当只在socket
缓冲区写不下时再关注可写事件,数据写完之后再取消关注
# 利用多线程处理
前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢? 分两组选择器
- 单线程配一个选择器,专门处理
accept
事件 - 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理
read
事件
@Slf4j
public class ThreadServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int cpus = Runtime.getRuntime().availableProcessors();
log.info(String.valueOf(cpus));
Worker[] workers = new Worker[cpus];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker(i);
}
AtomicInteger number = new AtomicInteger();
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
// 将与客户端建立的channel和read_selector进行关联
workers[number.getAndIncrement() % workers.length].register(channel);
}
iterator.remove();
}
}
}
static class Worker implements Runnable {
private Selector selector;
private volatile boolean start = false;
private int index;
public Worker(int index) {
this.index = index;
}
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
@SneakyThrows
public void register(SocketChannel socketChannel) {
if (!start) {
// 只执行一次
selector = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
// 为了保证在同一线程中执行,使用队列
tasks.add(() -> {
try {
// 添加channel和read_selector的绑定
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
// 唤醒selector.select();
selector.wakeup();
}
@SneakyThrows
@Override
public void run() {
while (true) {
selector.select();
Runnable task = tasks.poll();
if (null != task) {
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
try {
ByteBuffer buffer = ByteBuffer.allocate(128);
int len = channel.read(buffer);
if (len == -1) {
key.cancel();
channel.close();
} else {
buffer.flip();
String str = Charset.defaultCharset().decode(buffer).toString();
log.info(str);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
channel.close();
}
}
iterator.remove();
}
}
}
}
}
# 获取 CPU 核数
Runtime.getRuntime().availableProcessors()
如果工作在docker
容器下,因为容器不是物理隔离的,会拿到物理cpu
个数,而不是容器申请时的个数- 这个问题直到
jdk 10
才修复,使用 JVM 参数UseContainerSupport
配置, 默认开启
# UDP
- UDP 是无连接的,client 发送数据不会管 server 是否开启
- server 这边的
receive
方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃
服务端
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}
# NIO / BIO
# stream / channel
stream
不会自动缓冲数据,channel
会利用系统提供的发送缓冲区、接收缓冲区(更为底层)stream
仅支持阻塞API,Channel
同时支持阻塞、非阻塞API,网络Channel
可配合selector
实现多路复用- 二者均为全双工,即读写可以同时进行
# IO 模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程发送结果(至少两个线程)
当调用一次channel.read
或stream.read
后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段
# 阻塞 IO
# 非阻塞 IO
# 多路复用
# 异步 IO
# 阻塞 IO vs 多路复用
详情参考:UNIX 网络编程 - 卷 I
https://www.yuque.com/starries/notes/kgzman#77f50247 (opens new window) https://www.softprayog.in/programming/io-multiplexing-select-poll-epoll-in-linux (opens new window)
# 零拷贝
# 传统 IO 问题
传统的 IO 将一个文件通过socket
写出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
工作流程:
- Java 本身并不具备 IO 读写能力,以此
read
方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓存区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间也不会使用 CPU - 从内核态切换回用户态,将数据从内核缓冲器读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
- 调用
write
方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,CPU 会参与拷贝 - 向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
可以看到中间环节较多,Java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
# NIO 优化
通过DirectByteBuf
ByteBuffer.allocate(10)
HeapByteBuffer
使用的还是 Java 内存ByteBuffer.allocateDirect(10)
DirectByteBuffer
使用的是操作系统内存
大部分步骤与优化前相同。唯有一点:Java 可以使用DirectByteBuf
将堆外内存映射到 JVM 内存中来直接访问使用
- 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- Java 中的
DirectByteBuf
对象仅维护了此内存的虚引用,内存回收分成两步DirectByteBuf
对象被垃圾回收,将虚引用加入引用队列- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了 linux 2.1 后提供的sendFile
方法),java 中对应着两个channel
调用transferTo/transferFrom
方法拷贝数据
- Java 调用
transferTo
方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU - 数据从内核缓冲区传输到 socket 缓冲区,CPU会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux 2.4)
- Java 调用
transferTo
方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU - 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将内核缓冲区的数据写入网卡,不会使用 CPU
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 JVM 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 CPU 计算,减少 CPU 缓存伪共享
- 零拷贝适合小文件传输
# AIO
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows 系统通过 IOCP 实现了真正的异步 IO
- Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
# 文件 AIO
AsynchronousFileChannel
public static void main(String[] args) throws IOException {
Path file = Paths.get("data.txt");
StandardOpenOption option = StandardOpenOption.READ;
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file, option)) {
ByteBuffer buffer = ByteBuffer.allocate(16);
log.info("begin");
// 参数一:读取文件后要写入的buffer
// 参数二:从哪个位置开始读
// 参数三:附属对象
// 参数四:回调方法
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
/**
* read success
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.info("read success ==> length:{}", result);
attachment.flip();
String str = Charset.defaultCharset().decode(attachment).toString();
log.info("result:{}", str);
}
/**
* read failed
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.error(exc.getMessage(), exc);
}
});
} catch (IOException e) {
}
log.info("end");
System.in.read();
}
输出
22:02:52.075 [main] INFO com.starry.netty.files.TestAsynchronousFileChannel - begin
22:02:52.079 [main] INFO com.starry.netty.files.TestAsynchronousFileChannel - end
22:02:52.079 [Thread-16] INFO com.starry.netty.files.TestAsynchronousFileChannel - read success ==> length:13
22:02:52.080 [Thread-16] INFO com.starry.netty.files.TestAsynchronousFileChannel - result:1234567890abc
可以看到
- 相应文件读取成功的另外一个线程 Thread-16
- 主线程并没有 IO 阻塞
默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束