Netty 基础-TCP Server-Client 案例(第零章)

1. 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* 需要的依赖:
* <dependency>
* <groupId>io.netty</groupId>
* <artifactId>netty-all</artifactId>
* <version>4.1.52.Final</version>
* </dependency>
*/
public static void main(String[] args) throws InterruptedException {

// 创建 BossGroup 和 WorkerGroup
// 1. bossGroup 只处理连接请求
// 2. 业务处理由 workerGroup 来完成
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(bossGroup, workerGroup)
// 说明服务器端通道的实现类(便于 Netty 做反射处理)
.channel(NioServerSocketChannel.class)
// 设置等待连接的队列的容量(当客户端连接请求速率大于 NioServerSocketChannel 接收速率的时候,会使用该队列做缓冲)
// option()方法用于给服务端的 ServerSocketChannel 添加配置
.option(ChannelOption.SO_BACKLOG, 128)
// 设置连接保活
// childOption()方法用于给服务端 ServerSocketChannel 接收到的 SocketChannel 添加配置
.childOption(ChannelOption.SO_KEEPALIVE, true)
// handler()方法用于给 BossGroup 设置业务处理器
// childHandler()方法用于给 WorkerGroup 设置业务处理器
.childHandler(
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new MyNettyServerHandler()
);

// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);

System.out.println("server is ready...");

// 绑定端口,启动服务器,生成一个 channelFuture 对象,ChannelFuture 涉及到 Netty 的异步模型
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

/**
* 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
* InboundHandler 用于处理数据流入本端(服务端)的 IO 事件
* OutboundHandler 用于处理数据流出本端(服务端)的 IO 事件
*/
static class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象,可以从中取得相关联的 Pipeline、Channel、客户端地址等
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 接收客户端发来的数据
System.out.println("client address: " + ctx.channel().remoteAddress());

// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8));
}

/**
* 数据读取完毕后执行
*
* @param ctx 上下文对象
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 发送响应给客户端
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具类,copiedBuffer 方法返回的 ByteBuf 对象类似于 NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer("hello client! i have got your data.", CharsetUtil.UTF_8)
);
}

/**
* 发生异常时执行
*
* @param ctx 上下文对象
* @param cause 异常对象
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭与客户端的 Socket 连接
ctx.channel().close();
}
}

2. 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* 需要的依赖:
* <dependency>
* <groupId>io.netty</groupId>
* <artifactId>netty-all</artifactId>
* <version>4.1.52.Final</version>
* </dependency>
*/
public static void main(String[] args) throws InterruptedException {

// 客户端只需要一个事件循环组,可以看做 BossGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
// 创建客户端的启动对象
Bootstrap bootstrap = new Bootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(eventLoopGroup)
// 说明客户端通道的实现类(便于 Netty 做反射处理)
.channel(NioSocketChannel.class)
// handler()方法用于给 BossGroup 设置业务处理器
.handler(
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new MyNettyClientHandler()
);

// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);

System.out.println("client is ready...");

// 启动客户端去连接服务器端,ChannelFuture 涉及到 Netty 的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

/**
* 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
* InboundHandler 用于处理数据流入本端(客户端)的 IO 事件
* OutboundHandler 用于处理数据流出本端(客户端)的 IO 事件
*/
static class MyNettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪时执行
*
* @param ctx 上下文对象
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务器发送数据
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具类,copiedBuffer 方法返回的 ByteBuf 对象类似于 NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8)
);
}

/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象
* @param msg 服务器端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 接收服务器端发来的数据
System.out.println("server address: " + ctx.channel().remoteAddress());

// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from server: " + byteBuf.toString(CharsetUtil.UTF_8));
}

/**
* 发生异常时执行
*
* @param ctx 上下文对象
* @param cause 异常对象
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭与服务器端的 Socket 连接
ctx.channel().close();
}
}

3. 流程

  1. BootstrapServerBootstrap 分别是客户端和服务器端的引导类,一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler)、绑定端口、发起连接等。
  2. 客户端创建一个 NioSocketChannel 作为客户端通道,去连接服务器。
  3. 服务端首先创建一个 NioServerSocketChannel 作为服务器端通道,每当接收一个客户端连接就产生一个 NioSocketChannel 应对该客户端。
  4. 使用 Channel 构建网络 IO 程序的时候,不同的协议、不同的阻塞类型和 Netty 中不同的 Channel 对应,常用的 Channel 有:
    • NioSocketChannel:非阻塞的 TCP 客户端 Channel(本案例的客户端使用的 Channel)
    • NioServerSocketChannel:非阻塞的 TCP 服务器端 Channel(本案例的服务器端使用的 Channel)
    • NioDatagramChannel:非阻塞的 UDP Channel
    • NioSctpChannel:非阻塞的 SCTP 客户端 Channel
    • NioSctpServerChannel:非阻塞的 SCTP 服务器端 Channel

4. 注意

  1. 默认情况下 BossGroupWorkerGroup 都包含“2倍核心数”个线程(NioEventLoop),例如8核心就是16个线程,NioEventLoop 数量 = coreNum*2。这 16 个线程相当于MainReactor
  2. 创建 BossGroupWorkerGroup 的时候可以指定 NioEventLoop 数量。
  3. NettyServerHandlerchannelRead方法中的 ChannelHandlerContext ctx
    • 当前 ChannelHandlerContext ctx 是位于 ChannelHandlerContext 责任链中的一环,可以看到其 next、prev 属性
    • 当前 ChannelHandlerContext ctx 包含一个 Handler
    • 当前 ChannelHandlerContext ctx 包含一个 Pipeline
    • Pipeline 本质上是一个双向循环列表,其有 tail、head 属性
    • Pipeline 中包含一个 ChannelChannel 中又包含了该 Pipeline,两者互相引用

5. 组件

1. Handler

1.1. 概念

服务端中自定义的 MyNettyServerHandler 和客户端中自定义的 MyNettyClientHandler,都继承于 ChannelInboundHandlerAdapter,其又继承于 ChannelHandlerAdapterChannelHandlerAdapter 又实现了 ChannelHandler,因此统称为 ChannelHandler

1.2. 作用

在当前 ChannelHandler 中处理 IO 事件,并将其传递给 ChannelPipeline 中下一个 ChannelHandler 处理,因此多个 ChannelHandler 形成一个责任链,责任链位于 ChannelPipeline 中。

数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据–>解码数据–>处理数据–>编码数据–>发送数据。其中的每个过程都用得到 ChannelHandler 责任链。

1.3. 继承关系

netty_handler

  • ChannelInboundHandler 用于处理入站 IO 事件
  • ChannelOutboundHandler 用于处理出站 IO 事件
  • ChannelInboundHandlerAdapter 用于处理入站 IO 事件
  • ChannelOutboundHandlerAdapter 用于处理出站 IO 事件

ChannelPipeline 提供了 ChannelHandler链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline 中的一系列 ChannelOutboundHandler 进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline 中的一系列 ChannelInboundHandler 进行处理。

2. Pipeline

2.1. 概念

每个 Netty Channel 包含了一个 ChannelPipeline(其实 ChannelChannelPipeline 互相引用),而 ChannelPipeline 又维护了一个由 ChannelHandlerContext 构成的双向循环列表,其中的每一个 ChannelHandlerContext 都包含一个 ChannelHandler。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)

2.2. 作用

netty_pipeline

在处理入站事件的时候,入站事件及数据会从 Pipeline 中的双向链表的头 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每个 ChannelInboundHandler(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline 中的双向链表的尾 ChannelHandlerContext 流向头 ChannelHandlerContext,并依次在其中每个 ChannelOutboundHandler(例如编码 Handler)中得到处理。

3. EventLoopGroup

3.1. 概念

在基于 Netty 的 TCP Server 代码中,包含了两个 EventLoopGroupbossGroupworkerGroupEventLoopGroup 是一组 EventLoop 的抽象。其最终继承于 JUC Executor(java.util.concurrent)

3.2. 作用

在服务端,通常 Boss EventLoopGroup 只包含一个 Boss EventLoop(单线程),该 EventLoop 维护者一个注册了 ServerSocketChannelSelector 实例。该 EventLoop 不断轮询 Selector 得到 OP_ACCEPT 事件(客户端连接事件),然后将接收到的 SocketChannel 交给 Worker EventLoopGroupWorker EventLoopGroup 会通过 next()方法选取一个 Worker EventLoop 并将这个 SocketChannel 注册到其中的 Selector 上,由这个 Worker EventLoop 负责该 SocketChannel 上后续的 IO 事件处理。整个过程如下图所示:

netty_evenloop

4. TaskQueue

4.1. 概念

在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。

netty_taskqueue

4.2. 使用场景

4.2.1. 处理用户程序的自定义普通任务时

假如 channelRead 方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop 的并发度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 借助休眠模拟耗时操作
Thread.sleep(LONG_TIME);

ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8));
}

改进方法就是借助任务队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 假如这里的处理非常耗时,那么就需要借助任务队列异步执行

final Object finalMsg = msg;

// 通过 ctx.channel().eventLoop().execute()将耗时操作放入任务队列异步执行
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
// 借助休眠模拟耗时操作
try {
Thread.sleep(LONG_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}

ByteBuf byteBuf = (ByteBuf) finalMsg;
System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8));
}
});

// 可以继续调用 ctx.channel().eventLoop().execute() 将更多操作放入队列

System.out.println("return right now.");
}
4.2.2. 处理用户程序的自定义定时任务时

假如 channelRead 方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 当通道有数据可读时执行
*
* @param ctx 上下文对象
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

final Object finalMsg = msg;

// 通过 ctx.channel().eventLoop().schedule()将操作放入任务队列定时执行(5min 之后才进行处理)
ctx.channel().eventLoop().schedule(new Runnable() {
public void run() {

ByteBuf byteBuf = (ByteBuf) finalMsg;
System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8));
}
}, 5, TimeUnit.MINUTES);

// 可以继续调用 ctx.channel().eventLoop().schedule() 将更多操作放入队列

System.out.println("return right now.");
}
4.2.3. 非当前 Reactor 线程调用当前 Channel 的各种方法时

假如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel 引用,然后调用 write 方法向该用户推送消息,这时候就会将这一 write 任务放在任务队列中,write 任务最终被异步消费:

5. Future

5.1. 概念

Netty 对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture 就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,也可以通过 Netty Future 的 addListener() 添加一个回调方法来异步处理 IO 结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 启动客户端去连接服务器端,由于 bootstrap.connect()是一个异步操作,因此用.sync()等待这个异步操作完成
final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();

channelFuture.addListener(new ChannelFutureListener() {
/**
* 回调方法,上面的 bootstrap.connect()操作执行完之后触发
*/
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("client has connected to server!");
// TODO 其他处理
} else {
System.out.println("connect to serverfail!");
// TODO 其他处理
}
}
});

5.2. Future 提供的接口

netty_future

6. Promise

6.1. 概念

Promise 是可写的 FutureFuture 自身并没有写操作相关的接口,Netty 通过 PromiseFuture 进行扩展,用于设置 IO 操作的结果。

6.2. Promise 提供的接口

Promise 继承了 Future,相关的接口定义如下图所示,相比于上图 Future 的接口,它多出了一些 setXXX 方法:
netty_promise