源码地址及更新日志

https://delaunay.coding.net/p/netty-demo/d/netty-demo/git

实战:Netty 实现客户端登录

登录流程

登录流程

  1. 客户端会构建一个登录请求对象,然后通过编码把请求对象编码为 ByteBuf,写到服务端。
  2. 服务端接受到 ByteBuf 之后,首先通过解码把 ByteBuf 解码为登录请求响应,然后进行校验。
  3. 服务端校验通过之后,构造一个登录响应对象,依然经过编码,然后再写回到客户端。
  4. 客户端接收到服务端的之后,解码 ByteBuf,拿到登录响应响应,判断是否登陆成功

客户端发送登录请求

客户端处理登录请求

ClientHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端开始登陆");

// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("delaunay");
loginRequestPacket.setPassword("pwd");

// 编码
ByteBuf byteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);

// 写数据
ctx.channel().writeAndFlush(byteBuf);
}
}

在编码的环节,我们把 PacketCodeC变成单例模式,然后把ByteBuf分配器抽取出一个参数,这里第一个实参ctx.alloc()获取的就是与当前连接相关的ByteBuf分配器,建议这样来使用。

服务端处理登录请求

ServerHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;

// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);

// 判断是否是登录请求数据包
if (packet instanceof LoginRequestPacket) {
LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;

// 登录校验
if (valid(loginRequestPacket)) {
// 校验成功
} else {
// 校验失败
}
}
}

private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}

拿到 ByteBuf 之后,首先要做的事情就是解码,解码出 java 数据包对象,然后判断如果是登录请求数据包 LoginRequestPacket,就进行登录逻辑的处理,这里,我们假设所有的登录都是成功的,valid() 方法返回 true。 服务端校验通过之后,接下来就需要向客户端发送登录响应,我们继续编写服务端的逻辑。

服务端发送登录响应

服务端处理登录响应

ServerHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(new Date() + ": 客户端开始登录……");
// 为什么 Netty 不直接把这个参数 msg 的类型定义为 ByteBuf ?我们在后续的小节会分析到。
ByteBuf requestByteBuf = (ByteBuf)msg;

// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);

// 判断是否是登录请求数据包
if (packet instanceof LoginRequestPacket){
// 登录流程
LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;

LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(packet.getVersion());

// 登录校验
if (valid(loginRequestPacket)) {
// 校验成功
loginResponsePacket.setSuccess(true);

System.out.println(new Date() + ": " + loginRequestPacket.getUsername() + " 登录成功!");
} else {
//校验失败
loginResponsePacket.setSuccess(false);
loginResponsePacket.setReason("账号密码校验失败");

System.out.println(new Date() + ": 登录失败!");
}

// 登录响应
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}

private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}
}

我们构造一个登录响应包 LoginResponsePacket,然后在校验成功和失败的时候分别设置标志位,接下来,调用编码器把 Java 对象编码成 ByteBuf,调用 writeAndFlush() 写到客户端。
构建登录响应包时LoginResponsePacket,需要在Command类中加入LOGIN_RESPONSE指令信息,同时在PacketCodeC中的初始化模块中添加进去。

客户端处理登录响应

ClientHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf)msg;
// System.out.println("登录响应信息解码");

// 解码
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

// 校验
if (packet instanceof LoginResponsePacket){
// System.out.println("进入校验");
LoginResponsePacket loginResponsePacket = (LoginResponsePacket)packet;

if (loginResponsePacket.isSuccess()){
System.out.println(new Date() + ": 客户端登录成功!");
} else {
System.out.println(new Date() + ": 客户端登录失败,原因: " + loginResponsePacket.getReason());
}
} else {
System.out.println(packet);
}
}

总结

梳理了一下客户端登录的基本流程,然后结合上一小节的编解码逻辑,使用 Netty 实现了完整的客户端登录流程

实战:实现客户端与服务端收发消息

收发消息对象

把客户端发送至服务端的消息对象定义为 MessageRequestPacket
指令为 MESSAGE_REQUEST = 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ublic class MessageRequestPacket extends Packet {

private String message;

@Override
public Byte getCommand() {
return MESSAGE_REQUEST;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}

把服务端发送至客户端的消息对象定义为 MessageResponsePacket
指令为 MESSAGE_RESPONSE = 4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MessageResponsePacket extends Packet {

private String message;

@Override
public Byte getCommand() {
return MESSAGE_RESPONSE;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}

判断客户端登录状态

通过 channel.attr(xxx).set(xx) 的方式,可以给客户端连接,也就是 Channel 绑定属性,因此可以在登录成功之后,给 Channel 绑定一个登录成功的标志位,然后判断是否登录成功的时候取出这个标志位就可以了
首先定义一下是否登录成功的标志位

1
2
3
4
public interface Attributes {
// 定义一下是否登录成功的标志位
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
}

然后在客户端登录逻辑中添加标志位 LoginUtil.markAsLogin(ctx.channel());

添加及判断标志位的方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LoginUtil {
/**
* 设置登录标志位
* @param channel
*/
public static void markAsLogin(Channel channel){
channel.attr(Attributes.LOGIN).set(true);
}

/**
* 判断是否有标志位
* @param channel
* @return
*/
public static boolean hasLogin(Channel channel){
Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);

return loginAttr.get() != null;
}
}

控制台输入消息并发送

在客户端连接上服务端之后启动控制台线程,从控制台获取消息,然后发送至服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
Channel channel = ((ChannelFuture) future).channel();
// 连接成功之后,启动控制台线程
startConsoleThread(channel);
}
// ...
});
}

private static void startChannelThread(Channel channel) {
new Thread(() ->{
while (!Thread.interrupted()){
if (LoginUtil.hasLogin(channel)){
System.out.println("输入消息发送至服务端: ");

// 读取输入消息
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();

// 创建消息对象
MessageRequestPacket messageRequestPacket = new MessageRequestPacket();
messageRequestPacket.setMessage(line);

// 编码
ByteBuf byteBuf = PacketCodeC.INSTANCE.encode(channel.alloc(), messageRequestPacket);

// 传到服务端
channel.writeAndFlush(byteBuf);
}
}
}).start();
}

调用 startConsoleThread() 开始启动控制台线程,然后在控制台线程中,判断只要当前 channel 是登录状态,就允许控制台输入消息。

服务端收发消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 判断是否是登录请求数据包
if (packet instanceof LoginRequestPacket){
// 登录逻辑
} else if (packet instanceof MessageRequestPacket){
MessageRequestPacket messageRequestPacket = (MessageRequestPacket)packet;
System.out.println(new Date() + ": 客户端收到消息: " + messageRequestPacket.getMessage());

MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setVersion(packet.getVersion());
messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】消息已收到");

// 响应
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);

}

客户端收消息处理

1
2
3
4
5
6
7
8
9
10
11
12
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

if (packet instanceof LoginResponsePacket) {
// 登录逻辑...
} else if (packet instanceof MessageResponsePacket) {
MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
}
}

总结

  1. 定义了收发消息的 Java 对象进行消息的收发。
  2. 学到了 channel 的 attr() 的实际用法:可以通过给 channel 绑定属性来设置某些状态,获取某些状态,不需要额外的 map 来维持。
  3. 学习了如何在控制台获取消息并且发送至服务端。
  4. 实现了服务端回消息,客户端响应的逻辑

pipeline 与 channelHandler

Netty 里面一大核心组件: PipelineChannelHandler
大致处理流程:
处理流程
我们把这三类逻辑都写在一个类里面,客户端写在 ClientHandler,服务端写在 ServerHandler,如果要做功能的扩展(比如,我们要校验 magic number,或者其他特殊逻辑),只能在一个类里面去修改, 这个类就会变得越来越臃肿。

另外,我们注意到,每次发指令数据包都要手动调用编码器编码成 ByteBuf,对于这类场景的编码优化,我们能想到的办法自然是模块化处理,不同的逻辑放置到单独的类来处理,最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

Netty 中的 pipelinechannelHandler 正是用来解决这个问题的:它通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除 ,Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler

pipeline 与 channelHandler 的构成

pipeline与channelHandler的构成
无论是从服务端来看,还是客户端来看,在 Netty 整个框架里面,一条连接对应着一个 Channel,这条 Channel 所有的处理逻辑都在一个叫做 ChannelPipeline 的对象里面,ChannelPipeline 是一个双向链表结构,他和 Channel 之间是一对一的关系。

ChannelPipeline 里面每个节点都是一个ChannelHandlerContext对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler

channelHandler 的分类

channelHandler的分类
可以看到 ChannelHandler 有两大子接口:

第一个子接口是 ChannelInboundHandler,从字面意思也可以猜到,他是处理读数据的逻辑,比如,我们在一端读到一段数据,首先要解析这段数据,然后对这些数据做一系列逻辑处理,最终把响应写到对端, 在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理,它的一个最重要的方法就是 channelRead()。读者可以将 ChannelInboundHandler 的逻辑处理过程与 TCP 的七层协议的解析联系起来,收到的数据一层层从物理层上升到我们的应用层。

第二个子接口 ChannelOutBoundHandler 是处理写数据的逻辑,它是定义我们一端在组装完响应之后,把数据写到对端的逻辑,比如,我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端,它里面最核心的一个方法就是 write(),读者可以将 ChannelOutBoundHandler 的逻辑处理过程与 TCP 的七层协议的封装过程联系起来,我们在应用层组装响应之后,通过层层协议的封装,直到最底层的物理层。

这两个子接口分别有对应的默认实现,ChannelInboundHandlerAdapter,和 ChanneloutBoundHandlerAdapter,它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler。

ChannelInboundHandler 的事件传播

关于 ChannelInboundHandler ,我们拿 channelRead() 为例子,来体验一下 inbound 事件的传播。

我们在服务端的 pipeline 添加三个 ChannelInboundHandler

1
2
3
4
5
6
7
8
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});

每个 inBoundHandler 都继承自 ChannelInboundHandlerAdapter,然后实现了 channelRead() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
super.channelRead(ctx, msg);
}
}

channelRead() 方法里面,我们打印当前 handler 的信息,然后调用父类的 channelRead() 方法,而这里父类的channelRead()方法会自动调用到下一个 inBoundHandlerchannelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler,我们例子中传递的对象都是同一个 msg。

我们通过 addLast() 方法来为pipeline添加 inBoundHandler,当然,除了这个方法还有其他的方法,感兴趣的同学可以自行浏览一下 pipeline 的 api ,这里我们添加的顺序为 A -> B -> C,最后,控制台的输出 也为 A -> B -> C。

inBoundHandler 的执行顺序与我们通过 addLast() 方法 添加的顺序保持一致,接下来,我们再来看一下 outBoundHandler 的事件传播。

ChannelOutboundHandler 的事件传播

关于 ChanneloutBoundHandler ,我们拿 write() 为例子,来体验一下 outbound 事件的传播。

我们继续在服务端的 pipeline 添加三个 ChanneloutBoundHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// inBound,处理读数据的逻辑链
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

// outBound,处理写数据的逻辑链
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});

每个 outBoundHandler 都继承自 ChanneloutBoundHandlerAdapter,然后实现了 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerC: " + msg);
super.write(ctx, msg, promise);
}
}

write() 方法里面,我们打印当前 handler 的信息,然后调用父类的 write() 方法,而这里父类的write()方法会自动调用到下一个 outBoundHandlerwrite() 方法,并且会把当前 outBoundHandler 里处理完毕的对象传递到下一个 outBoundHandler

我们通过 addLast() 方法 添加 outBoundHandler 的顺序为 A -> B -> C,最后,控制台的输出为 C -> B -> A。

pipeline 的结构

pipeline 的结构
不管我们定义的是哪种类型的 handler, 最终它们都是以双向链表的方式连接,这里实际链表的节点是 ChannelHandlerContext,这里为了让结构清晰突出,可以直接把节点看作 ChannelHandlerContext

pipeline 的执行顺序

pipeline 的执行顺序

虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的,inBoundHandler 的事件通常只会传播到下一个 inBoundHandleroutBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰。

总结

  1. 通过我们前面编写客户端服务端处理逻辑,引出了 pipelinechannelHandler 的概念。
  2. channelHandler 分为inBoundoutBound 两种类型的接口,分别是处理数据读与数据写的逻辑,可与 tcp 协议栈联系起来。
  3. 两种类型的 handler 均有相应的默认实现,默认会把事件传递到下一个,这里的传递事件其实说白了就是把本 handler 的处理结果传递到下一个 handler 继续处理。
    inBoundHandler的执行顺序与我们实际的添加顺序相同,而 outBoundHandler 则相反。

实战:构建客户端与服务端 pipeline

ChannelInboundHandlerAdapter 与 ChannelOutboundHandlerAdapter

首先是 ChannelInboundHandlerAdapter ,这个适配器主要用于实现其接口 ChannelInboundHandler 的所有方法,这样我们在编写自己的 handler 的时候就不需要实现 handler 里面的每一种方法,而只需要实现我们所关心的方法,默认情况下,对于 ChannelInboundHandlerAdapter,我们比较关心的是他的如下方法

1
2
3
4
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

他的作用就是接收上一个 handler 的输出,这里的 msg 就是上一个 handler 的输出。默认情况下 adapter 会通过 fireChannelRead() 方法直接把上一个 handler 的输出结果传递到下一个 handler。

ChannelInboundHandlerAdapter 类似的类是 ChannelOutboundHandlerAdapter,它的核心方法如下

1
2
3
4
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

默认情况下,这个 adapter 也会把对象传递到下一个 outBound 节点,它的传播顺序与 inboundHandler 相反。

往 pipeline 添加的第一个 handler 中的 channelRead 方法中,msg 对象其实就是 ByteBuf。服务端在接受到数据之后,应该首先要做的第一步逻辑就是把这个 ByteBuf 进行解码,然后把解码后的结果传递到下一个 handler,像这样

1
2
3
4
5
6
7
8
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf requestByteBuf = (ByteBuf) msg;
// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
// 解码后的对象传递到下一个 handler 处理
ctx.fireChannelRead(packet)
}

解码之前,了解一下另外一个特殊的 handler

ByteToMessageDecoder

通常情况下,无论是在客户端还是服务端,当收到数据之后,首先要做的事情就是把二进制数据转换到我们的一个 Java 对象,所以 Netty 很贴心地写了一个父类,来专门做这个事情

1
2
3
4
5
6
7
public class PacketDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
out.add(PacketCodeC.INSTANCE.decode(in));
}
}

当我们继承了 ByteToMessageDecoder 这个类之后,我们只需要实现一下 decode() 方法,这里的 in 大家可以看到,传递进来的时候就已经是 ByteBuf 类型,所以我们不再需要强转,第三个参数是 List 类型,我们通过往这个 List 里面添加解码后的结果对象,就可以自动实现结果往下一个 handler 进行传递,这样,我们就实现了解码的逻辑 handler。

另外,值得注意的一点,对于 Netty 里面的 ByteBuf,我们使用 4.1.6.Final 版本,默认情况下用的是堆外内存,在 ByteBuf 这一小节中我们提到,堆外内存我们需要自行释放,在我们前面小节的解码的例子中,其实我们已经漏掉了这个操作,这一点是非常致命的,随着程序运行越来越久,内存泄露的问题就慢慢暴露出来了, 而这里我们使用 ByteToMessageDecoder,Netty 会自动进行内存的释放,我们不用操心太多的内存管理方面的逻辑。

SimpleChannelInboundHandler

通过 if else 逻辑进行逻辑的处理,当我们要处理的指令越来越多的时候,代码会显得越来越臃肿,我们可以通过给 pipeline 添加多个 handler(ChannelInboundHandlerAdapter的子类) 来解决过多的 if else 问题,如下
XXXHandler.java

1
2
3
4
5
if (packet instanceof XXXPacket) {
// ...处理
} else {
ctx.fireChannelRead(packet);
}

这样一个好处就是,每次添加一个指令处理器,逻辑处理的框架都是一致的
这里我们编写指令处理 handler 的时候,依然编写了一段我们其实可以不用关心的 if else 判断,然后还要手动传递无法处理的对象 (XXXPacket) 至下一个指令处理器,这也是一段重复度极高的代码,因此,Netty 基于这种考虑抽象出了一个 SimpleChannelInboundHandler 对象,类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可。
LoginRequestHandler.java

1
2
3
4
5
6
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
// 登录逻辑
}
}

SimpleChannelInboundHandler 从字面意思也可以看到,使用它非常简单,我们在继承这个类的时候,给他传递一个泛型参数,然后在 channelRead0() 方法里面,我们不用再通过 if 逻辑来判断当前对象是否是本 handler 可以处理的对象,也不用强转,不用往下传递本 handler 处理不了的对象,这一切都已经交给父类 SimpleChannelInboundHandler 来实现了,我们只需要专注于我们要处理的业务逻辑即可。

MessageToByteEncoder

我们处理每一种指令完成之后的逻辑是类似的,都需要进行编码,然后调用 writeAndFlush() 将数据写到客户端,这个编码的过程其实也是重复的逻辑,而且在编码的过程中,我们还需要手动去创建一个 ByteBuf。
Netty 提供了一个特殊的 channelHandler 来专门处理编码逻辑,我们不需要每一次将响应写到对端的时候调用一次编码逻辑进行编码,也不需要自行创建 ByteBuf,这个类叫做 MessageToByteEncoder,从字面意思也可以看出,它的功能就是将对象转换到二进制数据。

1
2
3
4
5
6
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
PacketCodeC.INSTANCE.encode(out, packet);
}
}

PacketEncoder 继承自 MessageToByteEncoder,泛型参数 Packet 表示这个类的作用是实现 Packet 类型对象到二进制的转换。
这里我们只需要实现 encode() 方法,我们注意到,在这个方法里面,第二个参数是 Java 对象,而第三个参数是 ByteBuf 对象,我们在这个方法里面要做的事情就是把 Java 对象里面的字段写到 ByteBuf,我们不再需要自行去分配 ByteBuf,因此,大家注意到,PacketCodeC 的 encode() 方法的定义也改了,下面是更改前后的对比
PacketCodeC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 更改前的定义
public ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = byteBufAllocator.ioBuffer();
// 2. 序列化 java 对象

// 3. 实际编码过程

return byteBuf;
}
// 更改后的定义
public void encode(ByteBuf byteBuf, Packet packet) {
// 1. 序列化 java 对象

// 2. 实际编码过程
}

PacketCodeC 不再需要手动创建对象,不再需要再把创建完的 ByteBuf 进行返回。当我们向 pipeline 中添加了这个编码器之后,我们在指令处理完毕之后就只需要 writeAndFlush java 对象即可,像这样

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
ctx.channel().writeAndFlush(login(loginRequestPacket));
}
}

public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageRequestPacket) {
ctx.channel().writeAndFlush(receiveMessage(messageRequestPacket));
}
}

构建客户端与服务端 pipeline

客户端与服务端pipline
客户端

1
2
3
4
5
6
7
8
9
10
bootstrap
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
服务端
1
2
3
4
5
6
7
8
9
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

总结

  1. 基于 ByteToMessageDecoder,我们可以实现自定义解码,而不用关心 ByteBuf 的强转和 解码结果的传递。
  2. 基于 SimpleChannelInboundHandler,我们可以实现每一种指令的处理,不再需要强转,不再有冗长乏味的 if else 逻辑,不需要手动传递对象。
  3. 基于 MessageToByteEncoder,我们可以实现自定义编码,而不用关心 ByteBuf 的创建,不用每次向对端写 Java 对象都进行一次编码

实战:拆包粘包理论与解决方案

拆包粘包例子

FirstClientHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
// 这个方法会在客户端连接建立成功之后被调用
public void channelActive(ChannelHandlerContext ctx){
System.out.println(new Date() + ":客户端写出数据");

for (int i = 0; i < 500; i++){
// 1.获取数据
ByteBuf buffer = getByBuf(ctx);

// 2.写数据
ctx.channel().writeAndFlush(buffer);
}

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println(new Date() + ":客户端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
}

private ByteBuf getByBuf(ChannelHandlerContext ctx) {
// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();

// 2. 准备数据,指定字符串的字符集为 utf-8
byte[] bytes = "你好,我是欢,欢迎来到我的博客!".getBytes(Charset.forName("utf-8"));

// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);

return buffer;
}
}

客户端在连接建立成功之后,使用一个 for 循环,不断向服务端写一串数据,服务端将数据打印出来,结果如下:
打印结果

从服务端的控制台输出可以看出,存在三种类型的输出

  1. 一种是正常的字符串输出。
  2. 一种是多个字符串“粘”在了一起,我们定义这种 ByteBuf 为粘包。
  3. 一种是一个字符串被“拆”开,形成一个破碎的包,我们定义这种 ByteBuf 为半包。

粘包半包现象的原因

尽管我们在应用层面使用了 Netty,但是对于操作系统来说,只认 TCP 协议,尽管我们的应用层是按照 ByteBuf 为 单位来发送数据,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。

拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开,举个栗子,发送端将三个数据包粘成两个 TCP 数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新组装成三个数据包。

拆包的原理

在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包

  1. 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
  2. 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
    如果我们自己实现拆包,这个过程将会非常麻烦,我们的每一种自定义协议,都需要自己实现,还需要考虑各种异常,而 Netty 自带的一些开箱即用的拆包器已经完全满足我们的需求了,下面我们来介绍一下 Netty 有哪些自带的拆包器。

Netty 自带的拆包器

  1. 固定长度的拆包器 FixedLengthFrameDecoder
    如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。

  2. 行拆包器 LineBasedFrameDecoder
    从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。

  3. 分隔符拆包器 DelimiterBasedFrameDecoder
    DelimiterBasedFrameDecoder 是行拆包器的通用版本,只不过我们可以自定义分隔符。

  4. 基于长度域拆包器 LengthFieldBasedFrameDecoder
    最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。

如何使用 LengthFieldBasedFrameDecoder

我们的自定义协议如下:
自定义协议
关于拆包,我们只需要关注

  1. 在我们的自定义协议中,我们的长度域在整个数据包的哪个地方,专业术语来说就是长度域相对整个数据包的偏移量是多少,这里显然是 4+1+1+1=7。
  2. 另外需要关注的就是,我们长度域的长度是多少,这里显然是 4。 有了长度域偏移量和长度域的长度,我们就可以构造一个拆包器。
1
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);

其中,第一个参数指的是数据包的最大长度,第二个参数指的是长度域的偏移量,第三个参数指的是长度域的长度,这样一个拆包器写好之后,只需要在 pipeline 的最前面加上这个拆包器。
这样,在后续 PacketDecoder 进行 decode 操作的时候,ByteBuf 就是一个完整的自定义协议数据包。

拒绝非本协议连接

我们设计魔数的原因是为了尽早屏蔽非本协议的客户端,通常在第一个 handler 处理这段逻辑。我们接下来的做法是每个客户端发过来的数据包都做一次快速判断,判断当前发来的数据包是否是满足我的自定义协议, 我们只需要继承自 LengthFieldBasedFrameDecoderdecode() 方法,然后在 decode 之前判断前四个字节是否是等于我们定义的魔数 0x12345678

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Spliter extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_OFFSET = 7;
private static final int LENGTH_FIELD_LENGTH = 4;

public Spliter() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 屏蔽非本协议的客户端
if (in.getInt(in.readerIndex()) != PacketCodeC.MAGIC_NUMBER) {
ctx.channel().close();
return null;
}

return super.decode(ctx, in);
}
}

然后替换一下

1
2
3
//ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
// 替换为
ch.pipeline().addLast(new Spliter());

服务端和客户端的 pipeline 结构

pipeline结构

总结

  1. 拆包器的作用就是根据我们的自定义协议,把数据拼装成一个个符合我们自定义数据包大小的 ByteBuf,然后送到我们的自定义协议解码器去解码。
  2. Netty 自带的拆包器包括基于固定长度的拆包器,基于换行符和自定义分隔符的拆包器,还有另外一种最重要的基于长度域的拆包器。通常 Netty 自带的拆包器已完全满足我们的需求,无需重复造轮子。
  3. 基于 Netty 自带的拆包器,我们可以在拆包之前判断当前连上来的客户端是否是支持自定义协议的客户端,如果不支持,可尽早关闭,节省资源。

channelHandler 的生命周期

ChannelHandler 的生命周期详解

添加一个自定义 ChannelHandler 来测试一下各个回调方法的执行顺序。
基于 ChannelInboundHandlerAdapter,自定义了一个 handler: LifeCyCleTestHandler
LifeCyCleTestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class LifeCyCleTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被添加:handlerAdded()");
super.handlerAdded(ctx);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()");
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 准备就绪:channelActive()");
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channel 有数据可读:channelRead()");
super.channelRead(ctx, msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 某次数据读完:channelReadComplete()");
super.channelReadComplete(ctx);

}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 被关闭:channelInactive()");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()");
super.channelUnregistered(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被移除:handlerRemoved()");
super.handlerRemoved(ctx);
}
}

每个方法被调用的时候都会打印一段文字,然后把这个事件继续往下传播。最后,我们把这个 handler 添加到我们在上小节构建的 pipeline 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
.group(bossGroup, workerGroup) // 配置上述两个线程组,定型线程模型
.channel(NioServerSocketChannel.class) // 指定我们服务端的 IO 模型为NIO
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 定义后续每条连接的数据读写,业务处理逻辑
protected void initChannel(NioSocketChannel ch) {
// 获取服务端侧关于这条连接的逻辑处理链 pipeline,然后添加一个逻辑处理器,负责读取客户端发来的数据
ch.pipeline().addLast(new LifeCyCleTestHandler());
ch.pipeline().addLast(new Spliter());
// ch.pipeline().addLast(new FirstServerHandler());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

}
});

运行 NettyServer.java,然后再运行 NettyClient.java,这个时候,Server 端 控制台的输出为
控制台输出
可以看到,ChannelHandler 回调方法的执行顺序为

handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()

每个回调方法的含义:

  1. handlerAdded() :指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,表示在当前的 channel 中,已经成功添加了一个 handler 处理器。
  2. channelRegistered():这个回调方法,表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系,类似我们在Netty 是什么?这小节中 BIO 编程中,accept 到新的连接,然后创建一个线程来处理这条连接的读写,只不过 Netty 里面是使用了线程池的方式,只需要从线程池里面去抓一个线程绑定在这个 channel 上即可,这里的 NIO 线程通常指的是 NioEventLoop,不理解没关系,后面我们还会讲到。
  3. channelActive():当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法。
  4. channelRead():客户端向服务端发来数据,每次都会回调此方法,表示有数据可读。
  5. channelReadComplete():服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕。

再把客户端关闭,这个时候对于服务端来说,其实就是 channel 被关闭
控制台输出

ChannelHandler 回调方法的执行顺序为
channelInactive() -> channelUnregistered() -> handlerRemoved()

这里的回调方法的执行顺序是新连接建立时候的逆操作,下面解释一下每个方法的含义

  1. channelInactive(): 表面这条连接已经被关闭了,这条连接在 TCP 层面已经不再是 ESTABLISH 状态了
  2. channelUnregistered(): 既然连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了,这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
  3. handlerRemoved():最后,我们给这条连接上添加的所有的业务逻辑处理器都给移除掉。

最后,用一幅图来标识 ChannelHandler 的生命周期
ChannelHandler生命周期

ChannelHandler 生命周期各回调方法用法举例

ChannelInitializer 的实现原理

在给新连接定义 handler 的时候,其实只是通过 childHandler() 方法给新连接设置了一个 handler,这个 handler 就是 ChannelInitializer,而在 ChannelInitializerinitChannel() 方法里面,我们通过拿到 channel 对应的 pipeline,然后往里面塞 handler。

这里的 ChannelInitializer 其实就利用了 Netty 的 handler 生命周期中 channelRegistered() 与 handlerAdded() 两个特性,我们简单翻一翻 ChannelInitializer 这个类的源代码:
ChannelInitializer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected abstract void initChannel(C ch) throws Exception;

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// ...
initChannel(ctx);
// ...
}

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// ...
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
// ...
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
initChannel((C) ctx.channel());
// ...
return true;
}
return false;
}
  1. ChannelInitializer 定义了一个抽象的方法 initChannel(),这个抽象方法由我们自行实现,我们在服务端启动的流程里面的实现逻辑就是往 pipeline 里面塞我们的 handler 链
  2. handlerAdded()channelRegistered() 方法,都会尝试去调用 initChannel() 方法,initChannel() 使用 putIfAbsent() 来防止 initChannel() 被调用多次
  3. 如果你 debug 了 ChannelInitializer 的上述两个方法,你会发现,在 handlerAdded() 方法被调用的时候,channel 其实已经和某个线程绑定上了,所以,就我们的应用程序来说,这里的 channelRegistered() 其实是多余的,那为什么这里还要尝试调用一次呢?我猜测应该是担心我们自己写了个类继承自 ChannelInitializer,然后覆盖掉了 handlerAdded() 方法,这样即使覆盖掉,在 channelRegistered() 方法里面还有机会再调一次 initChannel(),把我们自定义的 handler 都添加到 pipeline 中去。

handlerAdded() 与 handlerRemoved()

这两个方法通常可以用在一些资源的申请和释放

channelActive() 与 channelInActive()

  1. 对我们的应用程序来说,这两个方法表明的含义是 TCP 连接的建立与释放,通常我们在这两个回调里面统计单机的连接数,channelActive() 被调用,连接数加一,channelInActive() 被调用,连接数减一
  2. 另外,我们也可以在 channelActive() 方法中,实现对客户端连接 ip 黑白名单的过滤,具体这里就不展开了

channelRead()

我们在前面小节讲拆包粘包原理,服务端根据自定义协议来进行拆包,其实就是在这个方法里面,每次读到一定的数据,都会累加到一个容器里面,然后判断是否能够拆出来一个完整的数据包,如果够的话就拆了之后,往下进行传递

channelReadComplete()

每次向客户端写数据的时候,都通过 writeAndFlush() 的方法写并刷新到底层,其实这种方式不是特别高效,我们可以在之前调用 writeAndFlush() 的地方都调用 write() 方法,然后在这个方面里面调用 ctx.channel().flush() 方法,相当于一个批量刷新的机制,当然,如果你对性能要求没那么高,writeAndFlush() 足矣。

总结

  1. 我们详细剖析了 ChannelHandler(主要是ChannelInBoundHandler)的各个回调方法,连接的建立和关闭,执行回调方法有个逆向的过程
  2. 每一种回调方法都有他各自的用法,但是有的时候某些回调方法的使用边界有些模糊,恰当地使用回调方法来处理不同的逻辑,可以使你的应用程序更为优雅。