Netty进阶
# 粘包与半包
《Netty权威指南》
TCP粘包/拆包 TCP是个”流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,它们是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据 TCP 缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
TCP粘包/拆包问题 假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。 (1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包; (2)服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包; (3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包; (4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。 如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包
TCP粘包/拆包发生的原因 问题产生的原因有三个,分别如下。 (1)应用程序write写入的字节大小大于套接口发送缓冲区大小; (2)进行MSS大小的TCP分段; (3)以太网帧的payload大于MTU进行IP分片。 粘包问题的解决策略 由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。 (1)消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格; (2)在包尾增加回车换行符进行分割,例如FTP协议; (3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度; (4)更复杂的应用层协议。
# 粘包现象
服务端代码
public class HelloWorldServer {
static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
客户端代码希望发送 10 个消息,每个消息是 16 字节
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
Random r = new Random();
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
服务器端的某次输出,可以看到一次就接收了 160 个字节,而非分 10 次接收
08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5] binding...
08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5, L:/0:0:0:0:0:0:0:0:8080] bound...
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] REGISTERED
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] ACTIVE
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177]
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ COMPLETE
# 半包现象
客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);
为现象明显,服务端修改一下接收缓冲区,其它代码不变
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
服务器端的某次输出,可以看到接收的消息被分为两节,第一次 20 字节,第二次 140 字节
08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84] binding...
08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84, L:/0:0:0:0:0:0:0:0:8080] bound...
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] REGISTERED
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] ACTIVE
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221]
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 20B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 |.... |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 140B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000030| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000040| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000050| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000060| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000070| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000080| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |............ |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
注意
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)
影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍SO_RCVBUF
,SO_SNDBUF
这两个选项就是来设置TCP连接的两个buffer尺寸的。 每个TCP socket在内核中都有一个发送缓冲区和一个接收缓冲区,TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的buffer以及此buffer的填充状态。
# 现象分析
粘包
- 现象,发送 abc def,接收 abcdef
- 原因
- 应用层:接收方
ByteBuf
设置太大(Netty 默认 1024) - 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
- 应用层:接收方
半包
- 现象,发送 abcdef,接收 abc def
- 原因
- 应用层:接收方
ByteBuf
小于实际发送数据量 - 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
- 应用层:接收方
本质是因为 TCP 是流式协议,消息无边界
滑动窗口
在进行数据传输时,如果传输的数据比较大,就需要拆分为多个数据包进行发送。TCP 协议需要对数据进行确认后,才可以发送下一个数据包,这样一来,就会在等待确认应答包环节浪费时间。
为了避免这种情况,TCP引入了窗口概念。窗口大小指的是不需要等待确认应答包而可以继续发送数据包的最大值。
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
http://c.biancheng.net/view/6427.html (opens new window)
MSS 限制
- 链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
- 以太网的 MTU 是 1500
- FDDI(光纤分布式数据接口)的 MTU 是 4352
- 本地回环地址的 MTU 是 65535 - 本地测试不走网卡
- MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
- ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
- TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
- MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
Nagle 算法
- 即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由
- 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
- 如果 SO_SNDBUF 的数据达到 MSS,则需要发送
- 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
- 如果 TCP_NODELAY = true,则需要发送
- 已发送的数据都收到 ack 时,则需要发送
- 上述条件不满足,但发生超时(一般为 200ms)则需要发送
- 除上述情况,延迟发送
# 解决方案
- 短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
- 每一条消息采用固定长度,缺点浪费空间
- 每一条消息采用分隔符,例如
\n
,缺点需要转义 - 每一条消息分为
head
和body
,head
中包含body
的长度
# 短链接
以解决粘包为例
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
// 分 10 次发送
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("conneted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 发完即关
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的
调整接收缓冲区比客户端小,测试出现半包
// 调整系统的接收缓冲区(滑动窗口)
//serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// 调整 netty 的接收缓冲区(byteBuf)
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(10, 10, 10));
# 固定长度
让所有数据包长度固定(假设长度为 8 字节),服务器端加入
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
客户端测试代码,注意, 采用这种方法后,客户端什么时候 flush 都可以
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("192.168.0.103", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
缺点是,数据包的大小不好把握
- 长度定的太大,浪费
- 长度定的太小,对某些数据包又显得不够
# 固定分隔符
服务端加入,默认以\n
或\r\n
作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端在每条消息之后,加入\n
分隔符
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
for (int j = 1; j <= r.nextInt(16)+1; j++) {
buffer.writeByte((byte) c);
}
buffer.writeByte(10);
c++;
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("192.168.0.103", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误
# 预设长度
在发送消息前,先约定用定长字节表示接下来数据的长度
public class TestLengthFieldBasedFrameDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
/*
* maxFrameLength – 帧的最大长度。如果帧的长度大于这个值,就会抛出 TooLongFrameException。
* lengthFieldOffset – 长度字段的偏移量
* lengthFieldLength – 长度字段的长度
* lengthAdjustment – 添加到长度字段值的补偿值
* initialBytesToStrip – 从解码帧中剥离的第一个字节数
* */
new LengthFieldBasedFrameDecoder(1024, 0, 4, 4, 0),
new LoggingHandler(LogLevel.DEBUG)
);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
write(buffer,"hello, world",1);
write(buffer,"hello,world",2);
channel.writeInbound(buffer);
}
private static void write(ByteBuf buf, String content,int version) {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
int length = bytes.length;
buf.writeInt(length);
buf.writeInt(version);
buf.writeBytes(bytes);
}
}
# 协议设计
# redis 客户端
第一,客户端与服务端之间的通信协议是在TCP协议之上构建的。
第二,Redis制定了RESP(REdis Serialization Protocol,Redis序列化协议)实现客户端与服务端的正常交互,这种协议简单高效,既能够被机器解析,又容易被人类识别。例如客户端发送一条set name starry
命令给服务端,按照RESP的标准,客户端需要将其封装为如下格式(每行用\r\n
分隔):
*3
$3
set
$4
name
$6
starry
客户端发送数据格式
- 第一个字节是
*
代表数组。 - 第一个字节是
$
代表块字符串。 - 第一个字节是
+
代表简单字符串类型。 - 第一个字节是
-
代表错误类型。 - 第一个字节是
:
代表整型。
发送数据为3组(*3), 第一个数据是字符串类型,长度为3($3), 第一个数据是字符串类型,长度为4($4), 第一个数据是字符串类型,长度为6($6)。
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
// 换行 \r\n
byte[] LINE = new byte[]{13, 10};
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 建立连接后调用此方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
set(ctx);
get(ctx);
}
/**
* 模拟 redis set 命令
* @param ctx
*/
private void set(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$6".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("starry".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
/**
* 模拟 redis get 命令
* @param ctx
*/
private void get(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*2".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("get".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
/**
* 打印 redis 服务器返回的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 6379).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭资源
group.shutdownGracefully();
}
}
输入
+OK
$6
starry
# Http 协议
Netty 封装了 Http 协议的消息编解码类
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec {}
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// http相关handler,编解码
ch.pipeline().addLast(new HttpServerCodec());
// 只关注 HttpRequest 相关的请求
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
log.info(msg.uri());
// 返回内容
byte[] content = "<p>hello,world</p>".getBytes(StandardCharsets.UTF_8);
// 封装返回 msg
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// header 需要设置相应内容长度 content_length
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.length);
// 写入内容
response.content().writeBytes(content);
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(),e);
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
浏览器访问 http://localhost:8080/test (opens new window)
15:28:15.892 [nioEventLoopGroup-3-2] INFO com.starry.netty.protocol.TestHttp - /test
15:28:16.119 [nioEventLoopGroup-3-2] INFO com.starry.netty.protocol.TestHttp - /favicon.ico
# 自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊... 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
# 编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
/**
* 魔数
*/
private static final byte[] MAGIC_NUMBER = new byte[]{1, 2, 3, 6};
/**
* 版本号
*/
private static final byte VERSION = 1;
/**
* 出站前 将 Message 编码为 ByteBuf
*
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 4 字节,自定义 魔数
out.writeBytes(MAGIC_NUMBER);
// 1 字节,版本号
out.writeByte(VERSION);
// 1 字节,序列化算法 0 JDK、1 JSON
out.writeByte(0);
// 1 字节,指令类型
out.writeByte(msg.getMessageType());
// 4 字节,请求序号
out.writeInt(msg.getSequenceId());
// 1 字节,对齐填充,使固定长度为2的倍数
out.writeByte(0xff);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(outputStream);
stream.writeObject(msg);
byte[] bytes = outputStream.toByteArray();
// 4 字节,正文长度
out.writeInt(bytes.length);
// 消息正文
out.writeBytes(bytes);
}
/**
* 入站后 将 ByteBuf 编码为 Message
*
* @param ctx
* @param in
* @param out
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] magicNumber = new byte[MAGIC_NUMBER.length];
in.readBytes(magicNumber, 0, MAGIC_NUMBER.length);
if (!Arrays.equals(MAGIC_NUMBER, magicNumber)) {
throw new RuntimeException();
}
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) inputStream.readObject();
log.info("{}|{}|{}|{}|{}|{}", magicNumber, version, serializerType, messageType, sequenceId, length);
log.info("{}", message);
// 放入 list 供下个 handler 处理
out.add(message);
}
}
测试
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
new LoggingHandler(LogLevel.DEBUG)
,new MessageCodec()
);
Message message = new LoginRequestMessage("starry", "123acb");
// encode
channel.writeOutbound(message);
// decode
// 先将 Message 转为 ByteBuf,以便入站测试转换
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null,message,buffer);
channel.writeInbound(buffer);
}
# @Sharable
带有 @Sharable
注解的类,标识这个类可以在多个 channel 中被共享,而不用每次都 new。
- 当 handler 不保存状态时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承
ByteToMessageCodec
或CombinedChannelDuplexHandler
父类,他们的构造方法对@Sharable
有限制 - 如果能确保编解码器不会保存状态,可以继承
MessageToMessageCodec
父类
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf,Message> {
/**
* 魔数
*/
private static final byte[] MAGIC_NUMBER = new byte[]{1, 2, 3, 6};
/**
* 版本号
*/
private static final byte VERSION = 1;
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
// 4 字节,自定义 魔数
buf.writeBytes(MAGIC_NUMBER);
// 1 字节,版本号
buf.writeByte(VERSION);
// 1 字节,序列化算法 0 JDK、1 JSON
buf.writeByte(0);
// 1 字节,指令类型
buf.writeByte(msg.getMessageType());
// 4 字节,请求序号
buf.writeInt(msg.getSequenceId());
// 1 字节,对齐填充,使固定长度为2的倍数
buf.writeByte(0xff);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(outputStream);
stream.writeObject(msg);
byte[] bytes = outputStream.toByteArray();
// 4 字节,正文长度
buf.writeInt(bytes.length);
// 消息正文
buf.writeBytes(bytes);
out.add(buf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] magicNumber = new byte[MAGIC_NUMBER.length];
msg.readBytes(magicNumber, 0, MAGIC_NUMBER.length);
if (!Arrays.equals(MAGIC_NUMBER, magicNumber)) {
throw new RuntimeException();
}
byte version = msg.readByte();
byte serializerType = msg.readByte();
byte messageType = msg.readByte();
int sequenceId = msg.readInt();
msg.readByte();
int length = msg.readInt();
byte[] bytes = new byte[length];
msg.readBytes(bytes, 0, length);
ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) inputStream.readObject();
log.info("{}|{}|{}|{}|{}|{}", magicNumber, version, serializerType, messageType, sequenceId, length);
log.info("{}", message);
// 放入 list 供下个 handler 处理
out.add(message);
}
}
# 聊天室案例
# 初始化
# 登录
server
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
boolean login = UserServiceFactory.getUserService().login(msg.getUsername(), msg.getPassword());
Message message;
if (login) {
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码错误");
}
ctx.writeAndFlush(message);
}
});
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
client
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
// 构建消息对象
LoginRequestMessage requestMessage = new LoginRequestMessage(username, password);
ctx.writeAndFlush(requestMessage);
System.out.println("等待后续操作...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}, "system in").start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
线程间通信 可以使用 CountDownLatch 和 AtomicBoolean 原子类
package com.starry.netty.client;
import com.starry.netty.message.*;
import com.starry.netty.protocol.MessageCodecSharable;
import com.starry.netty.protocol.ProtocolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author starry
* @version 1.0
* @date 2022/2/16 19:39
* @Description
*/
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicBoolean isSuccess = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
if (responseMessage.isSuccess()) {
// 登录成功
isSuccess.set(true);
}
// 减一
countDownLatch.countDown();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
// 构建消息对象
LoginRequestMessage requestMessage = new LoginRequestMessage(username, password);
ctx.writeAndFlush(requestMessage);
System.out.println("等待后续操作...");
try {
// 阻塞,直到收到服务器回应
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!isSuccess.get()) {
// 登录失败
ctx.channel().close();
return;
}
while (true) {
// 登录成功
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
String[] s = command.split(" ");
switch (s[0]) {
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
# 单聊
服务器端将 handler 独立出来 登录 handler
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
Message message;
if (login) {
// 间用户和对应的 channel 进行绑定
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码错误");
}
ctx.writeAndFlush(message);
}
}
单聊 handler
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
// 接收人
String to = msg.getTo();
// 发送人
String from = msg.getFrom();
// 消息内容
String content = msg.getContent();
// 通过用户名获取对应的 channel
Channel channel = SessionFactory.getSession().getChannel(to);
if (channel == null) {
// 返回给发送人
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户名不存在或者不在线"));
} else {
// 发送给接收人
channel.writeAndFlush(new ChatResponseMessage(from, content));
// 返回给发送人
ctx.writeAndFlush(new ChatResponseMessage(true, "发送成功"));
}
}
}
# 群聊
创建群聊
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if (group != null) {
ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已存在"));
} else {
// 通知被拉入的人员
List<Channel> channelList = groupSession.getMembersChannel(groupName);
channelList.parallelStream().forEach(channel -> channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName)));
ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
}
}
}
发送群聊消息
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
GroupSessionFactory.getGroupSession().getMembersChannel(msg.getGroupName()).parallelStream().forEach(channel -> channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent())));
}
}
# 退出
@ChannelHandler.Sharable
@Slf4j
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 断开连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.info("{} 已断开连接",ctx.channel());
}
/**
* 出现异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.info("{} 异常断开,异常:{}",ctx.channel(),cause.getMessage());
}
}
# 空闲检测
# 连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
服务器端解决
- 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
// IdleStateHandler 用来判断是不是 读空闲时间过长 或者 写空闲时间过长
// 5s 没有收到 channel 的数据,会触发一个事件 IdleState#READER_IDLE
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// ChannelDuplexHandler 可以同时作为入站和出站的处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
/**
* 用来触发特殊事件
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 读空闲事件
if (idleStateEvent.state() == IdleState.READER_IDLE) {
log.info("已经 5s 没有读到数据了");
ctx.channel().close();
}
}
});
客户端定时心跳
- 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 触发 写空闲事件
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("3s 没有写数据了,发送一个心跳包");
ctx.channel().writeAndFlush(new PingMessage());
}
}
});
# ChatServer
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
GroupCreateRequestMessageHandler groupCreateRequestMessageHandler = new GroupCreateRequestMessageHandler();
GroupChatRequestMessageHandler groupChatRequestMessageHandler = new GroupChatRequestMessageHandler();
QuitHandler quitHandler = new QuitHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// IdleStateHandler 用来判断是不是 读空闲时间过长 或者 写空闲时间过长
// 5s 没有收到 channel 的数据,会触发一个事件 IdleState#READER_IDLE
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// ChannelDuplexHandler 可以同时作为入站和出站的处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
/**
* 用来触发特殊事件
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 读空闲事件
if (idleStateEvent.state() == IdleState.READER_IDLE) {
log.info("已经 5s 没有读到数据了");
ctx.channel().close();
}
}
});
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(loginRequestMessageHandler);
ch.pipeline().addLast(chatRequestMessageHandler);
ch.pipeline().addLast(groupCreateRequestMessageHandler);
ch.pipeline().addLast(groupChatRequestMessageHandler);
ch.pipeline().addLast(quitHandler);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
# ChatClient
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicBoolean isSuccess = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 触发 写空闲事件
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("3s 没有写数据了,发送一个心跳包");
ctx.channel().writeAndFlush(new PingMessage());
}
}
});
ch.pipeline().addLast(new ProtocolFrameDecoder());
//ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("{}", msg);
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
if (responseMessage.isSuccess()) {
// 登录成功
isSuccess.set(true);
}
// 减一
countDownLatch.countDown();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
// 构建消息对象
LoginRequestMessage requestMessage = new LoginRequestMessage(username, password);
ctx.writeAndFlush(requestMessage);
System.out.println("等待后续操作...");
try {
// 阻塞,直到收到服务器回应
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!isSuccess.get()) {
// 登录失败
ctx.channel().close();
return;
}
while (true) {
// 登录成功
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
String[] s = command.split(" ");
switch (s[0]) {
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
// 加入自己
set.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}