1. 服务端
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new MyNettyServerHandler() ); } } );
System.out.println("server is ready...");
ChannelFuture channelFuture = bootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
static class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client address: " + ctx.channel().remoteAddress());
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush( Unpooled.copiedBuffer("hello client! i have got your data.", CharsetUtil.UTF_8) ); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
2. 客户端
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new MyNettyClientHandler() ); } } );
System.out.println("client is ready...");
ChannelFuture channelFuture = bootstrap.connect("", 8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } }
static class MyNettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush( Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8) ); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server address: " + ctx.channel().remoteAddress());
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from server: " + byteBuf.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
3. 流程
和 ServerBootstrap
分别是客户端和服务器端的引导类,一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler
- 客户端创建一个
- 服务端首先创建一个
作为服务器端通道,每当接收一个客户端连接就产生一个 NioSocketChannel
- 使用
构建网络 IO 程序的时候,不同的协议、不同的阻塞类型和 Netty 中不同的 Channel
对应,常用的 Channel
:非阻塞的 TCP 客户端 Channel(本案例的客户端使用的 Channel)
:非阻塞的 TCP 服务器端 Channel(本案例的服务器端使用的 Channel)
:非阻塞的 UDP Channel
:非阻塞的 SCTP 客户端 Channel
:非阻塞的 SCTP 服务器端 Channel
4. 注意
- 默认情况下
和 WorkerGroup
),例如8核心就是16个线程,NioEventLoop 数量 = coreNum*2
。这 16 个线程相当于MainReactor
- 创建
和 WorkerGroup
的时候可以指定 NioEventLoop
方法中的 ChannelHandlerContext ctx
- 当前
ChannelHandlerContext ctx
是位于 ChannelHandlerContext
责任链中的一环,可以看到其 next、prev
- 当前
ChannelHandlerContext ctx
包含一个 Handler
- 当前
ChannelHandlerContext ctx
包含一个 Pipeline
本质上是一个双向循环列表,其有 tail、head
中包含一个 Channel
中又包含了该 Pipeline
5. 组件
1. Handler
1.1. 概念
服务端中自定义的 MyNettyServerHandler
和客户端中自定义的 MyNettyClientHandler
,都继承于 ChannelInboundHandlerAdapter
,其又继承于 ChannelHandlerAdapter
又实现了 ChannelHandler
,因此统称为 ChannelHandler
1.2. 作用
在当前 ChannelHandler
中处理 IO 事件,并将其传递给 ChannelPipeline
中下一个 ChannelHandler
处理,因此多个 ChannelHandler
形成一个责任链,责任链位于 ChannelPipeline
数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据–>解码数据–>处理数据–>编码数据–>发送数据。其中的每个过程都用得到 ChannelHandler
1.3. 继承关系

用于处理入站 IO 事件
用于处理出站 IO 事件
用于处理入站 IO 事件
用于处理出站 IO 事件
提供了 ChannelHandler
链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline
中的一系列 ChannelOutboundHandler
进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline
中的一系列 ChannelInboundHandler
2. Pipeline
2.1. 概念
每个 Netty Channel
包含了一个 ChannelPipeline
(其实 Channel
和 ChannelPipeline
互相引用),而 ChannelPipeline
又维护了一个由 ChannelHandlerContext
构成的双向循环列表,其中的每一个 ChannelHandlerContext
都包含一个 ChannelHandler
。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)
2.2. 作用

在处理入站事件的时候,入站事件及数据会从 Pipeline
中的双向链表的头 ChannelHandlerContext
流向尾 ChannelHandlerContext
,并依次在其中每个 ChannelInboundHandler
(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline
中的双向链表的尾 ChannelHandlerContext
流向头 ChannelHandlerContext
,并依次在其中每个 ChannelOutboundHandler
(例如编码 Handler)中得到处理。
3. EventLoopGroup
3.1. 概念
在基于 Netty 的 TCP Server
代码中,包含了两个 EventLoopGroup
和 workerGroup
是一组 EventLoop
的抽象。其最终继承于 JUC Executor(java.util.concurrent)
3.2. 作用
在服务端,通常 Boss EventLoopGroup
只包含一个 Boss EventLoop
(单线程),该 EventLoop
维护者一个注册了 ServerSocketChannel
的 Selector
实例。该 EventLoop
不断轮询 Selector
(客户端连接事件),然后将接收到的 SocketChannel
交给 Worker EventLoopGroup
,Worker EventLoopGroup
会通过 next()
方法选取一个 Worker EventLoop
并将这个 SocketChannel
注册到其中的 Selector
上,由这个 Worker EventLoop
负责该 SocketChannel
上后续的 IO 事件处理。整个过程如下图所示:

4. TaskQueue
4.1. 概念
在 Netty 的每一个 NioEventLoop
中都有一个 TaskQueue
,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector
监听到的 IO 事件。

4.2. 使用场景
4.2.1. 处理用户程序的自定义普通任务时
假如 channelRead
方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Thread.sleep(LONG_TIME);
ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Object finalMsg = msg;
ctx.channel().eventLoop().execute(new Runnable() { public void run() { try { Thread.sleep(LONG_TIME); } catch (InterruptedException e) { e.printStackTrace(); }
ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }); System.out.println("return right now."); }
4.2.2. 处理用户程序的自定义定时任务时
假如 channelRead
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Object finalMsg = msg;
ctx.channel().eventLoop().schedule(new Runnable() { public void run() {
ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }, 5, TimeUnit.MINUTES);
System.out.println("return right now."); }
4.2.3. 非当前 Reactor 线程调用当前 Channel 的各种方法时
假如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel
引用,然后调用 write
方法向该用户推送消息,这时候就会将这一 write
5. Future
5.1. 概念
Netty 对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture
就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,也可以通过 Netty Future 的 addListener()
添加一个回调方法来异步处理 IO 结果:
| final ChannelFuture channelFuture = bootstrap.connect("", 8080).sync();
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { if (channelFuture.isSuccess()) { System.out.println("client has connected to server!"); } else { System.out.println("connect to serverfail!"); } } });
5.2. Future

6. Promise
6.1. 概念
是可写的 Future
自身并没有写操作相关的接口,Netty 通过 Promise
对 Future
进行扩展,用于设置 IO 操作的结果。
6.2. Promise
继承了 Future
,相关的接口定义如下图所示,相比于上图 Future
的接口,它多出了一些 setXXX
