Netty 基础-总览(第一章)

基础

1. Java 中的网络 IO 模型

1.1. BIO:同步的、阻塞式 IO

在这种模型中,服务器上一个线程处理一次连接,即客户端每发起一个请求,服务端都要开启一个线程专门处理该请求。
构建 C/S 系统的 Java 编程组件是 ServerSocketSocket

1.2. NIO:同步的、非阻塞式 IO

在这种模型中,服务器上一个线程处理多个连接,即多个客户端请求都会被注册到多路复用器(Selector)上,多路复用器会轮训这些连接,轮训到连接上有 IO 活动就进行处理。这种模式也叫做 Reactor 模式。
构建 C/S 系统的 Java 编程组件是 ChannelBufferSelector

1.3. AIO:异步非阻塞式 IO

在这种模型中,由操作系统完成与客户端之间的read/write,之后再由操作系统主动通知服务器线程去处理后面的工作,在这个过程中服务器线程不必同步等待 read/write 完成。这种模式也叫做 Proactor 模式。

2. 网路 IO 术语

  • 阻塞:如果线程调用 read/write 过程,但 read/write 过程没有就绪或没有完成,则调用 read/write 过程的线程会一直等待,这个过程叫做阻塞式读写。
  • 非阻塞:如果线程调用 read/write 过程,但 read/write 过程没有就绪或没有完成,调用 read/write 过程的线程并不会一直等待,而是去处理其他工作,等到 read/write 过程就绪或完成后再回来处理,这个过程叫做非阻塞式读写。
  • 异步:read/write 过程托管给操作系统来完成,完成后操作系统会通知(通过回调或者事件)应用网络 IO 程序(其中的线程)来进行后续的处理。
  • 同步:read/write 过程由网络 IO 程序(其中的线程)来完成。

NIO 组件

1. 缓冲区(Buffer

缓冲区(Buffer)本质上是一个可读可写的内存块,可以理解成一个容器对象,Channel 读写文件或者网络都要经由 Buffer

1.1. 常用子类

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • DoubleBuffer
  • FloatBuffer
  • MappedByteBuffer

2. 通道(Channel

通道(Channel)是双向的,可读可写。

2.1. 常用子类

  • FileChannel:用于文件读写
  • DatagramChannel:用于 UDP 数据包收发
  • ServerSocketChannel:用于服务端 TCP 数据包收发
  • SocketChannel:用于客户端 TCP 数据包收发

3. 选择器(Selector

选择器(Selector)是实现 IO 多路复用的关键,多个 Channel 注册到某个 Selector 上,当 Channel 上有事件发生时,Selector就会取得事件然后调用线程去处理事件。

3.1. 常用子类

Selector 是一个抽象类,常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class Selector implements Closeable {
......

/**
* 得到一个选择器对象
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
......

/**
* 返回所有发生事件的 Channel 对应的 SelectionKey 的集合,通过
* SelectionKey 可以找到对应的 Channel
*/
public abstract Set<SelectionKey> selectedKeys();
......

/**
* 返回所有 Channel 对应的 SelectionKey 的集合,通过 SelectionKey
* 可以找到对应的 Channel
*/
public abstract Set<SelectionKey> keys();
......

/**
* 监控所有注册的 Channel,当其中的 Channel 有 IO 操作可以进行时,
* 将这些 Channel 对应的 SelectionKey 找到。参数用于设置超时时间
*/
public abstract int select(long timeout) throws IOException;

/**
* 无超时时间的 select 过程,一直等待,直到发现有 Channel 可以进行
* IO 操作
*/
public abstract int select() throws IOException;

/**
* 立即返回的 select 过程
*/
public abstract int selectNow() throws IOException;
......

/**
* 唤醒 Selector,对无超时时间的 select 过程起作用,终止其等待
*/
public abstract Selector wakeup();
......
}

服务端的工作流程

  1. 当客户端发起连接时,会通过 ServerSocketChannel 创建对应的 SocketChannel
  2. 调用 SocketChannel 的注册方法将 SocketChannel 注册到 Selector 上,注册方法返回一个 SelectionKey,该 SelectionKey 会被放入 Selector 内部的 SelectionKey 集合中。该 SelectionKeySelector 关联(即通过 SelectionKey 可以找到对应的 Selector),也和 SocketChannel 关联(即通过 SelectionKey 可以找到对应的 SocketChannel)。
  3. Selector 会调用 select()/select(timeout)/selectNow()方法对内部的 SelectionKey 集合关联的 SocketChannel 集合进行监听,找到有事件发生的 SocketChannel 对应的 SelectionKey
  4. 通过 SelectionKey 找到有事件发生的 SocketChannel,完成数据处理。

相关源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* SocketChannel 继承 AbstractSelectableChannel
*/
public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel,
ScatteringByteChannel,
GatheringByteChannel,
NetworkChannel
{
......
}

public abstract class AbstractSelectableChannel
extends SelectableChannel
{
......
/**
* AbstractSelectableChannel 中包含注册方法,SocketChannel 实例
* 借助该注册方法注册到 Selector 实例上去,该方法返回 SelectionKey
*/
public final SelectionKey register(
// 指明注册到哪个 Selector 实例
Selector sel,
// ops 是事件代码,告诉 Selector 应该关注该通道的什么事件
int ops,
// 附加信息 attachment
Object att) throws ClosedChannelException {
......
}
......
}

public abstract class SelectionKey {
......

/**
* 获取该 SelectionKey 对应的 Channel
*/
public abstract SelectableChannel channel();

/**
* 获取该 SelectionKey 对应的 Selector
*/
public abstract Selector selector();
......

/**
* 事件代码,上面的 ops 参数取这里的值
*/
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
......

/**
* 检查该 SelectionKey 对应的 Channel 是否可读
*/
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}

/**
* 检查该 SelectionKey 对应的 Channel 是否可写
*/
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}

/**
* 检查该 SelectionKey 对应的 Channel 是否已经建立起 socket 连接
*/
public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}

/**
* 检查该 SelectionKey 对应的 Channel 是否准备好接受一个新的 socket 连接
*/
public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}

/**
* 添加附件(例如 Buffer)
*/
public final Object attach(Object ob) {
return attachmentUpdater.getAndSet(this, ob);
}

/**
* 获取附件
*/
public final Object attachment() {
return attachment;
}
......
}

Linux 下零拷贝技术

1. 直接 IO 技术

直接IO

内核缓冲区是 Linux 系统的 Page Cahe。整个过程有四次数据拷贝,读进来两次,写回去又两次:磁盘–>内核缓冲区–>Socket 缓冲区–>网络。

直接 IO 过程使用的 Linux 系统 API 为:

1
2
ssize_t read(int filedes, void *buf, size_t nbytes);
ssize_t write(int filedes, void *buf, size_t nbytes);

2. 内存映射文件技术

内存映射文件

整个过程有三次数据拷贝,不再经过应用程序内存,直接在内核空间中从内核缓冲区拷贝到 Socket 缓冲区。

内存映射文件过程使用的 Linux 系统 API 为:

1
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);

3. 零拷贝技术

零拷贝技术

内核缓冲区到 Socket 缓冲区之间并没有做数据的拷贝,只是一个地址的映射。底层的网卡驱动程序要读取数据并发送到网络上的时候,看似读取的是 Socket 的缓冲区中的数据,其实直接读的是内核缓冲区中的数据。内存中数据拷贝的次数为 0。

零拷贝过程使用的 Linux 系统 API 为:

1
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

JDK中提供的是:

1
FileChannel.transderTo(long position, long count, WritableByteChannel target);

具体可参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
String fileName = "test.zip";

// 得到一个文件 channel
FileChannel fileChannel = new FileInputStream(fileName).getChannel();

// 使用零拷贝 IO 技术发送
long transferSize = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("file transfer done, size: " + transferSize);
fileChannel.close();
}