1. 服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new MyNettyServerHandler() ); } } );
System.out.println("server is ready...");
ChannelFuture channelFuture = bootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
static class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client address: " + ctx.channel().remoteAddress());
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush( Unpooled.copiedBuffer("hello client! i have got your data.", CharsetUtil.UTF_8) ); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
|
2. 客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new MyNettyClientHandler() ); } } );
System.out.println("client is ready...");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } }
static class MyNettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush( Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8) ); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server address: " + ctx.channel().remoteAddress());
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from server: " + byteBuf.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
|
3. 流程
Bootstrap 和 ServerBootstrap 分别是客户端和服务器端的引导类,一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler)、绑定端口、发起连接等。
- 客户端创建一个
NioSocketChannel 作为客户端通道,去连接服务器。
- 服务端首先创建一个
NioServerSocketChannel 作为服务器端通道,每当接收一个客户端连接就产生一个 NioSocketChannel 应对该客户端。
- 使用
Channel 构建网络 IO 程序的时候,不同的协议、不同的阻塞类型和 Netty 中不同的 Channel 对应,常用的 Channel 有:
NioSocketChannel:非阻塞的 TCP 客户端 Channel(本案例的客户端使用的 Channel)
NioServerSocketChannel:非阻塞的 TCP 服务器端 Channel(本案例的服务器端使用的 Channel)
NioDatagramChannel:非阻塞的 UDP Channel
NioSctpChannel:非阻塞的 SCTP 客户端 Channel
NioSctpServerChannel:非阻塞的 SCTP 服务器端 Channel
4. 注意
- 默认情况下
BossGroup 和 WorkerGroup 都包含“2倍核心数”个线程(NioEventLoop),例如8核心就是16个线程,NioEventLoop 数量 = coreNum*2。这 16 个线程相当于MainReactor。
- 创建
BossGroup 和 WorkerGroup 的时候可以指定 NioEventLoop 数量。
NettyServerHandler的channelRead方法中的 ChannelHandlerContext ctx:
- 当前
ChannelHandlerContext ctx 是位于 ChannelHandlerContext 责任链中的一环,可以看到其 next、prev 属性
- 当前
ChannelHandlerContext ctx 包含一个 Handler
- 当前
ChannelHandlerContext ctx 包含一个 Pipeline
Pipeline 本质上是一个双向循环列表,其有 tail、head 属性
Pipeline 中包含一个 Channel,Channel 中又包含了该 Pipeline,两者互相引用
5. 组件
1. Handler
1.1. 概念
服务端中自定义的 MyNettyServerHandler 和客户端中自定义的 MyNettyClientHandler,都继承于 ChannelInboundHandlerAdapter,其又继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又实现了 ChannelHandler,因此统称为 ChannelHandler。
1.2. 作用
在当前 ChannelHandler 中处理 IO 事件,并将其传递给 ChannelPipeline 中下一个 ChannelHandler 处理,因此多个 ChannelHandler 形成一个责任链,责任链位于 ChannelPipeline 中。
数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据–>解码数据–>处理数据–>编码数据–>发送数据。其中的每个过程都用得到 ChannelHandler 责任链。
1.3. 继承关系

ChannelInboundHandler 用于处理入站 IO 事件
ChannelOutboundHandler 用于处理出站 IO 事件
ChannelInboundHandlerAdapter 用于处理入站 IO 事件
ChannelOutboundHandlerAdapter 用于处理出站 IO 事件
ChannelPipeline 提供了 ChannelHandler链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline 中的一系列 ChannelOutboundHandler 进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline 中的一系列 ChannelInboundHandler 进行处理。
2. Pipeline
2.1. 概念
每个 Netty Channel 包含了一个 ChannelPipeline(其实 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又维护了一个由 ChannelHandlerContext 构成的双向循环列表,其中的每一个 ChannelHandlerContext 都包含一个 ChannelHandler。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)
2.2. 作用

在处理入站事件的时候,入站事件及数据会从 Pipeline 中的双向链表的头 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每个 ChannelInboundHandler(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline 中的双向链表的尾 ChannelHandlerContext 流向头 ChannelHandlerContext,并依次在其中每个 ChannelOutboundHandler(例如编码 Handler)中得到处理。
3. EventLoopGroup
3.1. 概念
在基于 Netty 的 TCP Server 代码中,包含了两个 EventLoopGroup:bossGroup 和 workerGroup,EventLoopGroup 是一组 EventLoop 的抽象。其最终继承于 JUC Executor(java.util.concurrent)。
3.2. 作用
在服务端,通常 Boss EventLoopGroup 只包含一个 Boss EventLoop(单线程),该 EventLoop 维护者一个注册了 ServerSocketChannel 的 Selector 实例。该 EventLoop 不断轮询 Selector 得到 OP_ACCEPT 事件(客户端连接事件),然后将接收到的 SocketChannel 交给 Worker EventLoopGroup,Worker EventLoopGroup 会通过 next()方法选取一个 Worker EventLoop 并将这个 SocketChannel 注册到其中的 Selector 上,由这个 Worker EventLoop 负责该 SocketChannel 上后续的 IO 事件处理。整个过程如下图所示:

4. TaskQueue
4.1. 概念
在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。

4.2. 使用场景
4.2.1. 处理用户程序的自定义普通任务时
假如 channelRead 方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop 的并发度:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Thread.sleep(LONG_TIME);
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); }
|
改进方法就是借助任务队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Object finalMsg = msg;
ctx.channel().eventLoop().execute(new Runnable() { public void run() { try { Thread.sleep(LONG_TIME); } catch (InterruptedException e) { e.printStackTrace(); }
ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }); System.out.println("return right now."); }
|
4.2.2. 处理用户程序的自定义定时任务时
假如 channelRead 方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Object finalMsg = msg;
ctx.channel().eventLoop().schedule(new Runnable() { public void run() {
ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }, 5, TimeUnit.MINUTES);
System.out.println("return right now."); }
|
4.2.3. 非当前 Reactor 线程调用当前 Channel 的各种方法时
假如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel 引用,然后调用 write 方法向该用户推送消息,这时候就会将这一 write 任务放在任务队列中,write 任务最终被异步消费:
5. Future
5.1. 概念
Netty 对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture 就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,也可以通过 Netty Future 的 addListener() 添加一个回调方法来异步处理 IO 结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { if (channelFuture.isSuccess()) { System.out.println("client has connected to server!"); } else { System.out.println("connect to serverfail!"); } } });
|
5.2. Future 提供的接口

6. Promise
6.1. 概念
Promise 是可写的 Future,Future 自身并没有写操作相关的接口,Netty 通过 Promise 对 Future 进行扩展,用于设置 IO 操作的结果。
6.2. Promise 提供的接口
Promise 继承了 Future,相关的接口定义如下图所示,相比于上图 Future 的接口,它多出了一些 setXXX 方法:
