1. 服务端
1 | /** |
2. 客户端
1 | /** |
3. 流程
Bootstrap
和ServerBootstrap
分别是客户端和服务器端的引导类,一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler
)、绑定端口、发起连接等。- 客户端创建一个
NioSocketChannel
作为客户端通道,去连接服务器。 - 服务端首先创建一个
NioServerSocketChannel
作为服务器端通道,每当接收一个客户端连接就产生一个NioSocketChannel
应对该客户端。 - 使用
Channel
构建网络 IO 程序的时候,不同的协议、不同的阻塞类型和 Netty 中不同的Channel
对应,常用的Channel
有:NioSocketChannel
:非阻塞的 TCP 客户端 Channel(本案例的客户端使用的 Channel)NioServerSocketChannel
:非阻塞的 TCP 服务器端 Channel(本案例的服务器端使用的 Channel)NioDatagramChannel
:非阻塞的 UDP ChannelNioSctpChannel
:非阻塞的 SCTP 客户端 ChannelNioSctpServerChannel
:非阻塞的 SCTP 服务器端 Channel
4. 注意
- 默认情况下
BossGroup
和WorkerGroup
都包含“2倍核心数”个线程(NioEventLoop
),例如8核心就是16个线程,NioEventLoop 数量 = coreNum*2
。这 16 个线程相当于MainReactor
。 - 创建
BossGroup
和WorkerGroup
的时候可以指定NioEventLoop
数量。 NettyServerHandler
的channelRead
方法中的ChannelHandlerContext ctx
:- 当前
ChannelHandlerContext ctx
是位于ChannelHandlerContext
责任链中的一环,可以看到其next、prev
属性 - 当前
ChannelHandlerContext ctx
包含一个Handler
- 当前
ChannelHandlerContext ctx
包含一个Pipeline
Pipeline
本质上是一个双向循环列表,其有tail、head
属性Pipeline
中包含一个Channel
,Channel
中又包含了该Pipeline
,两者互相引用
- 当前
5. 组件
1. Handler
1.1. 概念
服务端中自定义的 MyNettyServerHandler
和客户端中自定义的 MyNettyClientHandler
,都继承于 ChannelInboundHandlerAdapter
,其又继承于 ChannelHandlerAdapter
,ChannelHandlerAdapter
又实现了 ChannelHandler
,因此统称为 ChannelHandler
。
1.2. 作用
在当前 ChannelHandler
中处理 IO 事件,并将其传递给 ChannelPipeline
中下一个 ChannelHandler
处理,因此多个 ChannelHandler
形成一个责任链,责任链位于 ChannelPipeline
中。
数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据–>解码数据–>处理数据–>编码数据–>发送数据。其中的每个过程都用得到 ChannelHandler
责任链。
1.3. 继承关系
ChannelInboundHandler
用于处理入站 IO 事件ChannelOutboundHandler
用于处理出站 IO 事件ChannelInboundHandlerAdapter
用于处理入站 IO 事件ChannelOutboundHandlerAdapter
用于处理出站 IO 事件
ChannelPipeline
提供了 ChannelHandler
链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline
中的一系列 ChannelOutboundHandler
进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline
中的一系列 ChannelInboundHandler
进行处理。
2. Pipeline
2.1. 概念
每个 Netty Channel
包含了一个 ChannelPipeline
(其实 Channel
和 ChannelPipeline
互相引用),而 ChannelPipeline
又维护了一个由 ChannelHandlerContext
构成的双向循环列表,其中的每一个 ChannelHandlerContext
都包含一个 ChannelHandler
。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)
2.2. 作用
在处理入站事件的时候,入站事件及数据会从 Pipeline
中的双向链表的头 ChannelHandlerContext
流向尾 ChannelHandlerContext
,并依次在其中每个 ChannelInboundHandler
(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline
中的双向链表的尾 ChannelHandlerContext
流向头 ChannelHandlerContext
,并依次在其中每个 ChannelOutboundHandler
(例如编码 Handler)中得到处理。
3. EventLoopGroup
3.1. 概念
在基于 Netty 的 TCP Server
代码中,包含了两个 EventLoopGroup
:bossGroup
和 workerGroup
,EventLoopGroup
是一组 EventLoop
的抽象。其最终继承于 JUC Executor(java.util.concurrent)
。
3.2. 作用
在服务端,通常 Boss EventLoopGroup
只包含一个 Boss EventLoop
(单线程),该 EventLoop
维护者一个注册了 ServerSocketChannel
的 Selector
实例。该 EventLoop
不断轮询 Selector
得到 OP_ACCEPT 事件
(客户端连接事件),然后将接收到的 SocketChannel
交给 Worker EventLoopGroup
,Worker EventLoopGroup
会通过 next()
方法选取一个 Worker EventLoop
并将这个 SocketChannel
注册到其中的 Selector
上,由这个 Worker EventLoop
负责该 SocketChannel
上后续的 IO 事件处理。整个过程如下图所示:
4. TaskQueue
4.1. 概念
在 Netty 的每一个 NioEventLoop
中都有一个 TaskQueue
,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector
监听到的 IO 事件。
4.2. 使用场景
4.2.1. 处理用户程序的自定义普通任务时
假如 channelRead
方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop
的并发度:
1 | /** |
改进方法就是借助任务队列:
1 | /** |
4.2.2. 处理用户程序的自定义定时任务时
假如 channelRead
方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:
1 | /** |
4.2.3. 非当前 Reactor 线程调用当前 Channel 的各种方法时
假如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel
引用,然后调用 write
方法向该用户推送消息,这时候就会将这一 write
任务放在任务队列中,write
任务最终被异步消费:
5. Future
5.1. 概念
Netty 对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture
就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,也可以通过 Netty Future 的 addListener()
添加一个回调方法来异步处理 IO 结果:
1 | // 启动客户端去连接服务器端,由于 bootstrap.connect()是一个异步操作,因此用.sync()等待这个异步操作完成 |
5.2. Future
提供的接口
6. Promise
6.1. 概念
Promise
是可写的 Future
,Future
自身并没有写操作相关的接口,Netty 通过 Promise
对 Future
进行扩展,用于设置 IO 操作的结果。
6.2. Promise
提供的接口
Promise
继承了 Future
,相关的接口定义如下图所示,相比于上图 Future
的接口,它多出了一些 setXXX
方法: