Java NIO - 高性能服务器的 Reactor 设计模式

迄今为止,高性能网络编程都绕不开Reactor模式。很多著名的服务器软件或者中间件都是基于Reactor模式实现的。例如 Nginx、Netty、Redis。Doug Lea《Scalable IO in Java》 中对Reactor模式的定义。Reactor模式由Reactor线程、Handlers处理器两大角色组成,两 大角色的职责分别如下:

  • Reactor 线程:负责响应IO事件,并且分发到 Handlers处理器。
  • Handlers 处理器:非阻塞的执行业务处理逻辑。

从上面的Reactor模式定义中看不出这种模式有什么神奇的地方。 当然,从简单到复杂,Reactor模式也有很多版本,前面的定义仅仅是 最为简单的一个版本。如果需要彻底了解Reactor模式,还得从最原始 的OIO编程开始讲起。


Reactor 之前高性能服务的实现

在Java的OIO编程中,原始的网络服务器程序一般使用一个while 循环不断地监听端口是否有新的连接。如果有,就调用一个处理函数 来完成传输处理。

1
2
3
4
while(true){
socket = accept(); //阻塞,接收连接
handle(socket) ; //读取数据、业务处理、写入结果
}

这种方法的最大问题是:如果前一个网络连接的handle(socket) 没有处理完,那么后面的新连接无法被服务端接收,于是后面的请求 就会被阻塞,导致服务器的吞吐量太低。这对于服务器来说是一个严重的问题。为了解决这个严重的连接阻塞问题,出现了一个极为经典的模 式:Connection Per Thread(一个线程处理一个连接)模式。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Demo {

class ConnectionPerThread implements Runnable {
@Override
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(9000);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
// 创建新线程,专门负责一个连接的处理
Handler handler = new Handler(socket);
new Thread(handler).start();
}
} catch (IOException ignored) {
}
}
}

static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}

@Override
public void run() {
byte[] buffer = new byte[1024];
try (InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream()) {
int len;
while ((len = in.read(buffer)) != -1) {
// 业务处理(示例:原样返回)
out.write(buffer, 0, len);
out.flush(); // 强制刷出缓冲区
}
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try { socket.close(); } catch (IOException ignored) {}
}
}
}
}

以上示例代码中,对于每一个新的网络连接都分配给一个线程。 每个线程都独自处理自己负责的socket连接的输入和输出。当然,服 务器的监听线程也是独立的,任何socket连接的输入和输出处理都不 会阻塞到后面新socket连接的监听和建立,这样服务器的吞吐量就得 到了提升。早期版本的Tomcat服务器就是这样实现的。

Connection Per Thread模式(一个线程处理一个连接)的优点是 解决了前面的新连接被严重阻塞的问题,在一定程度上较大地提高了 服务器的吞吐量。缺点是对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统将无法承受。而且,线程的反复创建、销毁、切换也需要代价。因此,在高并发的应用场景 下,多线程OIO的缺陷是致命的。

总结起来,Reactor模式中IO事件的处理流程大致分为4步,具体如下:

  • 第1步:通道注册。IO事件源于通道(Channel),IO是和通道 (对应于底层连接而言)强相关的。一个IO事件一定属于某个通道。 如果要查询通道的事件,首先就要将通道注册到选择器。
  • 第2步:查询事件。在Reactor模式中,一个线程会负责一个反应 器(或者SubReactor子反应器),不断地轮询,查询选择器中的IO事件(选择键)。
  • 第3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定 关系的Handler业务处理器。
  • 第4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。

以上4步就是整个Reactor模式的IO处理器流程。其中,第1步和第 2步其实是Java NIO的功能,Reactor模式仅仅是利用了Java NIO的优势而已。


单线程Reactor模式

什么是单线程版本的 Reactor 模式呢?简单地说,Reactor 和 Handlers 处于一个线程中执行。这是最简单的Reactor 模型。在 dispatch 方法中,我们根本不判断是 isAcceptable 还是 isReadable。因为我们在注册时,就把对应的对象塞进了 SelectionKey。Acceptor 对象绑定在 ServerSocketChannel 上,Handler 对象绑定在 SocketChannel 上。 这体现了面向对象的分发思想。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* Reactor 模式:负责响应 IO 事件,并将事件分发(Dispatch)给对应的 Handler 处理器。
* 本类既是 Reactor 角色,也通过实现 Runnable 运行在主循环线程中。
*
* 附件(Attachment):是该模式最聪明的地方。Acceptor 挂在 Server 上,Handler 挂在 Client Socket 上。
* 当 Selector 说“有动静”时,Reactor 直接通过 key.attachment() 拿到 Runnable,不需要写一堆 if-else 去判断事件类型。
*/
public class SingleThreadReactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;

public SingleThreadReactor(int port) throws IOException {
// 1. 打开多路复用器 Selector
selector = Selector.open();
// 2. 打开服务器通道并绑定端口
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
// 3. 核心:必须设置为非阻塞模式,否则无法注册到 Selector
serverSocket.configureBlocking(false);
// 4. 将 ServerSocket 注册到 Selector,监听 "接收连接" (OP_ACCEPT) 事件
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 5. 关键技巧:给 SelectionKey 绑定一个附件(Acceptor),当发生 ACCEPT 事件时,dispatch 方法会直接取出这个附件并执行其 run()
sk.attach(new Acceptor());
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
// 阻塞等待,直到至少有一个注册的事件就绪
selector.select();
// 获取所有就绪事件的 SelectionKey 集合
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
// 6. 分发事件:将就绪的 Key 传给 dispatch 处理
dispatch(it.next());
}
// 处理完后必须清空已选择键集,否则下次 select 会重复触发旧事件
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}

/**
* 分发器:Reactor 模式的灵魂。
* 它不处理业务,只负责根据 Key 找到之前绑定的 Runnable 附件并运行。
*/
void dispatch(SelectionKey k) {
// 取出注册时附加的对象(Acceptor 或 Handler)并执行其 run 方法
Runnable r = (Runnable) k.attachment();
if (r != null) {
r.run();
}
}

/**
* 内部类:Acceptor 处理器
* 专门负责处理 ServerSocketChannel 的 OP_ACCEPT 事件
*/
class Acceptor implements Runnable {
public void run() {
try {
// 接收客户端连接,返回 SocketChannel
SocketChannel socket = serverSocket.accept();
if (socket != null) {
System.out.println("收到新连接: " + socket.getRemoteAddress());
// 7. 为该连接创建一个专属的 Handler
new Handler(selector, socket);
}
} catch (IOException ex) { ex.printStackTrace(); }
}
}

/**
* 内部类:Handler 处理器
* 负责处理已连接 Socket 的 READ/WRITE 业务逻辑
*/
class Handler implements Runnable {
private final SocketChannel socket;
private final SelectionKey sk;
private final ByteBuffer input = ByteBuffer.allocate(1024);

Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
// 同样需要设置为非阻塞
socket.configureBlocking(false);
// 8. 将该客户端通道注册到 Selector,监听 "可读" (OP_READ) 事件,这里将 "this" (当前 Handler 对象) 作为附件绑定到 Key 上
sk = socket.register(sel, SelectionKey.OP_READ);
sk.attach(this);
sel.wakeup(); // 唤醒 selector 立即处理新注册
}

public void run() {
try {
// 根据 Key 的状态判断触发了什么事件
if (sk.isReadable()) {
read();
}
} catch (IOException ex) { ex.printStackTrace(); }
}

void read() throws IOException {
// 从通道读取数据到缓冲区
int num = socket.read(input);
// 如果 read 返回 -1,说明对端已经优雅地关闭了连接
if (num == -1) {
System.out.println("客户端断开连接");
// 取消在 Selector 上的注册
sk.cancel();
// 关闭通道
socket.close();
return;
}

// 读取完成后,切换为读模式 (flip),准备从 Buffer 取出数据打印
input.flip();
byte[] data = new byte[input.remaining()]; ////
input.get(data); ////
System.out.println("收到消息: " + new String(data));

// 回显逻辑 (Echo):将读到的数据原样写回客户端
input.rewind(); // 重置指针,准备重复读取 Buffer 内的数据并写入 Channel
socket.write(input);
// 读写完毕,清空缓冲区,为下一次读取做准备
input.clear();
}
}

public static void main(String[] args) throws IOException {
// 启动 Reactor 线程
new Thread(new SingleThreadReactor(9000)).start();
}
}

单线程Reactor 的优点是没有线程切换开销,没有锁竞争,实现简单。适用于业务处理极其迅速的情况(如早期 Redis 的纯内存操作)。缺点是无法发挥多核 CPU 优势。一旦某个业务逻辑稍微耗时,系统吞吐量就会断崖式下跌。所以实际上在高性能服务器应用场景中,单线程 Reactor 模式实际使用的很少。


多线程Reactor模式

多线程Reactor的演进分为两个方面:

  • 升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。
  • 升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。

这份代码通过引入多个 Selector 和 ThreadPool,彻底解决了单线程 Reactor 的瓶颈:

  • MainReactor:专门负责 “接客”(OP_ACCEPT)。它独立运行在一个线程中,保证了无论业务多忙,新连接都能秒级接入。
  • SubReactor:专门负责 “服务”(OP_READ/WRITE)。它将 I/O 事件轮询与接入逻辑解耦,提升了并发吞吐量。
  • Worker Thread Pool:将最耗时的业务逻辑(asyncRun)从 IO 线程中剥离。即使业务计算需要耗时 100ms,也不会阻塞 SubReactor 去监听其他 Socket 的读写。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
*
* 主从多线程 Reactor 模型:
* MainReactor 负责 Accept
* SubReactor 负责 IO 读写
* ThreadPool 负责业务计算
*/
public class MultiThreadEchoServerReactor {
private final ServerSocketChannel serverSocket;
private final Selector mainSelector; // 主选择器:仅负责监听 OP_ACCEPT
private final SubReactor[] subReactors; // 子反应器阵列
private final AtomicInteger next = new AtomicInteger(0); // 轮询计数器

public MultiThreadEchoServerReactor(int port) throws IOException {
// 1. 初始化主选择器
mainSelector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(port));

// 2. 将主选择器注册为监听接收事件,并绑定 Acceptor 处理器
SelectionKey sk = serverSocket.register(mainSelector, SelectionKey.OP_ACCEPT);
sk.attach(new AcceptorHandler());

// 3. 初始化子反应器(建议数量等于 CPU 核心数)
int coreNum = Runtime.getRuntime().availableProcessors();
subReactors = new SubReactor[coreNum];
for (int i = 0; i < coreNum; i++) {
subReactors[i] = new SubReactor();
// 每个子反应器运行在独立的线程中
new Thread(subReactors[i], "SubReactor-Thread-" + i).start();
}

System.out.println("Reactor 服务器已启动,监听端口:" + port + ",子反应器数:" + coreNum);
}

/**
* 启动主反应器循环
*/
public void startService() {
try {
while (!Thread.interrupted()) {
mainSelector.select(); // 阻塞等待新连接
Set<SelectionKey> keys = mainSelector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
// 分发 Accept 事件
Runnable handler = (Runnable) sk.attachment();
if (handler != null) handler.run();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* Acceptor:负责接入连接并分发给 SubReactor
*/
class AcceptorHandler implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null) {
System.out.println("MainReactor: 接收到新连接 " + channel.getRemoteAddress());
// 轮询算法:选择一个子反应器进行绑定。使用 & 0x7FFFFFFF 屏蔽符号位,彻底解决 Math.abs 无法处理 MIN_VALUE 的 Bug
int index = (next.getAndIncrement() & 0x7FFFFFFF) % subReactors.length;
subReactors[index].register(channel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 子反应器:负责该通道后续所有的 IO 事件轮询
*/
class SubReactor implements Runnable {
private final Selector selector;

public SubReactor() throws IOException {
this.selector = Selector.open();
}

// 由 Acceptor 调用,将新通道注册到本子反应器的 Selector 上
public void register(SocketChannel sc) throws IOException {
sc.configureBlocking(false);
// 必须先唤醒,防止 selector.select() 阻塞导致注册失败
selector.wakeup();
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
sk.attach(new MultiThreadEchoHandler(selector, sc));
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); // 监听已分配给自己的所有客户端通道
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
Runnable handler = (Runnable) sk.attachment();
if (handler != null) handler.run();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* Handler:业务处理器,整合了读写逻辑与线程池
*/
static class MultiThreadEchoHandler implements Runnable {
private final SocketChannel channel;
private final SelectionKey sk;
// 每个 Handler 自带一个 Buffer,仅由 SubReactor 线程操作
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

static final int RECEIVING = 0, SENDING = 1;
int state = RECEIVING;

// 业务逻辑线程池:处理耗时的非 IO 操作
private static final ExecutorService pool = Executors.newFixedThreadPool(8);

MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
sk = channel.register(selector, 0); // 0 表示暂不监听任何事件
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

@Override
public void run() {
// synchronized 保护 state 切换和 byteBuffer 操作,防止 IO 线程与业务线程冲突
synchronized (this) {
try {
if (state == RECEIVING) {
handleRead();
} else if (state == SENDING) {
handleWrite();
}
} catch (IOException e) {
close();
}
}
}

private void handleRead() throws IOException {
// 内存安全:通过 byte[] dataCopy 实现了数据隔离。线程池里的线程无论怎么折腾这个 data,都不会破坏 SubReactor 线程正在使用的 byteBuffer。
int length = channel.read(byteBuffer);
if (length > 0) {
// 1. 关键优化:在 IO 线程读完数据,拷贝出一份副本
byteBuffer.flip();
byte[] dataCopy = new byte[byteBuffer.remaining()];
byteBuffer.get(dataCopy); // 拷贝数据实现内存隔离
byteBuffer.clear();

// 2. 异步处理业务逻辑
pool.submit(() -> processBusiness(dataCopy));
} else if (length == -1) {
close();
}
}

private void processBusiness(byte[] data) { // 即使 processBusiness 处理得非常慢,也不会影响 SubReactor 线程去读取其他客户端的数据。
// 这里是纯业务逻辑(计算、数据库等),不涉及 Channel 直接读写
String content = new String(data, StandardCharsets.UTF_8);
System.out.println("线程池处理中: " + content.trim());

// 业务处理完,准备回显
synchronized (this) { // 仅保护状态切换
// 将数据重新放回 Buffer(或者放入发送队列)
byteBuffer.put(data);
state = SENDING;
// 3. 切换兴趣为写,并唤醒 Selector 执行 handleWrite
sk.interestOps(SelectionKey.OP_WRITE);
}

// 唤醒子反应器开始执行 handleWrite
sk.selector().wakeup();
}

private void handleWrite() throws IOException {
byteBuffer.flip();
channel.write(byteBuffer);
if (!byteBuffer.hasRemaining()) {
// 写完后切换回读模式
byteBuffer.clear();
state = RECEIVING;
sk.interestOps(SelectionKey.OP_READ);
} else {
// 没写完则整理 Buffer 等待下次可写事件
byteBuffer.compact();
}
}
private void close() {
try {
System.out.println("连接关闭: " + channel.getRemoteAddress());
sk.cancel();
channel.close();
} catch (IOException ignored) {}
}
}

public static void main(String[] args) throws IOException {
// 启动主反应器
new MultiThreadEchoServerReactor(9000).startService();
}
}

可以使用 nc 进行测试:

1
$ nc 127.0.0.1 9000

在 Netty 等成熟框架中,线程安全并不是通过给 Handler 加锁来解决的,而是通过以下两种策略:

方案 A:IO 读写必须在 IO 线程完成 (推荐)

  • SubReactor 线程(IO 线程):负责 read()。读完后,把读到的字节副本(不再是共享的 Buffer)交给线程池。
  • 线程池(业务线程):只负责计算。算完后,把结果交给一个队列,或者直接唤醒 OP_WRITE。
  • 好处:永远只有一个线程在操作 SocketChannel 的读写,根本不需要 synchronized。

方案 B:无锁串行化

确保同一个 SocketChannel 的所有任务始终提交给线程池中的同一个线程执行。

  • 实现:通过 channel.hashCode() % threadPoolSize 定向分发。
  • 好处:同一个连接的请求在线程池里是排队执行的,天然避开了并发冲突。