迄今为止,高性能网络编程都绕不开Reactor模式。很多著名的服务器软件或者中间件都是基于Reactor模式实现的。例如 Nginx、Netty、Redis。Doug Lea 在 《Scalable IO in Java》 中对Reactor模式的定义。Reactor模式由Reactor线程、Handlers处理器两大角色组成,两 大角色的职责分别如下:
- Reactor 线程:负责响应IO事件,并且分发到 Handlers处理器。
- Handlers 处理器:非阻塞的执行业务处理逻辑。
从上面的Reactor模式定义中看不出这种模式有什么神奇的地方。 当然,从简单到复杂,Reactor模式也有很多版本,前面的定义仅仅是 最为简单的一个版本。如果需要彻底了解Reactor模式,还得从最原始 的OIO编程开始讲起。
Reactor 之前高性能服务的实现
在Java的OIO编程中,原始的网络服务器程序一般使用一个while 循环不断地监听端口是否有新的连接。如果有,就调用一个处理函数 来完成传输处理。
1 2 3 4
| while(true){ socket = accept(); handle(socket) ; }
|
这种方法的最大问题是:如果前一个网络连接的handle(socket) 没有处理完,那么后面的新连接无法被服务端接收,于是后面的请求 就会被阻塞,导致服务器的吞吐量太低。这对于服务器来说是一个严重的问题。为了解决这个严重的连接阻塞问题,出现了一个极为经典的模 式:Connection Per Thread(一个线程处理一个连接)模式。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class Demo {
class ConnectionPerThread implements Runnable { @Override public void run() { try { ServerSocket serverSocket = new ServerSocket(9000); while (!Thread.interrupted()) { Socket socket = serverSocket.accept(); Handler handler = new Handler(socket); new Thread(handler).start(); } } catch (IOException ignored) { } } }
static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; }
@Override public void run() { byte[] buffer = new byte[1024]; try (InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream()) { int len; while ((len = in.read(buffer)) != -1) { out.write(buffer, 0, len); out.flush(); } } catch (IOException ex) { ex.printStackTrace(); } finally { try { socket.close(); } catch (IOException ignored) {} } } } }
|
以上示例代码中,对于每一个新的网络连接都分配给一个线程。 每个线程都独自处理自己负责的socket连接的输入和输出。当然,服 务器的监听线程也是独立的,任何socket连接的输入和输出处理都不 会阻塞到后面新socket连接的监听和建立,这样服务器的吞吐量就得 到了提升。早期版本的Tomcat服务器就是这样实现的。
Connection Per Thread模式(一个线程处理一个连接)的优点是 解决了前面的新连接被严重阻塞的问题,在一定程度上较大地提高了 服务器的吞吐量。缺点是对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统将无法承受。而且,线程的反复创建、销毁、切换也需要代价。因此,在高并发的应用场景 下,多线程OIO的缺陷是致命的。
总结起来,Reactor模式中IO事件的处理流程大致分为4步,具体如下:
- 第1步:通道注册。IO事件源于通道(Channel),IO是和通道 (对应于底层连接而言)强相关的。一个IO事件一定属于某个通道。 如果要查询通道的事件,首先就要将通道注册到选择器。
- 第2步:查询事件。在Reactor模式中,一个线程会负责一个反应 器(或者SubReactor子反应器),不断地轮询,查询选择器中的IO事件(选择键)。
- 第3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定 关系的Handler业务处理器。
- 第4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。
以上4步就是整个Reactor模式的IO处理器流程。其中,第1步和第 2步其实是Java NIO的功能,Reactor模式仅仅是利用了Java NIO的优势而已。
单线程Reactor模式
什么是单线程版本的 Reactor 模式呢?简单地说,Reactor 和 Handlers 处于一个线程中执行。这是最简单的Reactor 模型。在 dispatch 方法中,我们根本不判断是 isAcceptable 还是 isReadable。因为我们在注册时,就把对应的对象塞进了 SelectionKey。Acceptor 对象绑定在 ServerSocketChannel 上,Handler 对象绑定在 SocketChannel 上。 这体现了面向对象的分发思想。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;
public class SingleThreadReactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocket;
public SingleThreadReactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); }
@Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { dispatch(it.next()); } selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } }
void dispatch(SelectionKey k) { Runnable r = (Runnable) k.attachment(); if (r != null) { r.run(); } }
class Acceptor implements Runnable { public void run() { try { SocketChannel socket = serverSocket.accept(); if (socket != null) { System.out.println("收到新连接: " + socket.getRemoteAddress()); new Handler(selector, socket); } } catch (IOException ex) { ex.printStackTrace(); } } }
class Handler implements Runnable { private final SocketChannel socket; private final SelectionKey sk; private final ByteBuffer input = ByteBuffer.allocate(1024);
Handler(Selector sel, SocketChannel c) throws IOException { socket = c; socket.configureBlocking(false); sk = socket.register(sel, SelectionKey.OP_READ); sk.attach(this); sel.wakeup(); }
public void run() { try { if (sk.isReadable()) { read(); } } catch (IOException ex) { ex.printStackTrace(); } }
void read() throws IOException { int num = socket.read(input); if (num == -1) { System.out.println("客户端断开连接"); sk.cancel(); socket.close(); return; }
input.flip(); byte[] data = new byte[input.remaining()]; input.get(data); System.out.println("收到消息: " + new String(data));
input.rewind(); socket.write(input); input.clear(); } }
public static void main(String[] args) throws IOException { new Thread(new SingleThreadReactor(9000)).start(); } }
|
单线程Reactor 的优点是没有线程切换开销,没有锁竞争,实现简单。适用于业务处理极其迅速的情况(如早期 Redis 的纯内存操作)。缺点是无法发挥多核 CPU 优势。一旦某个业务逻辑稍微耗时,系统吞吐量就会断崖式下跌。所以实际上在高性能服务器应用场景中,单线程 Reactor 模式实际使用的很少。
多线程Reactor模式
多线程Reactor的演进分为两个方面:
- 升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。
- 升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。
这份代码通过引入多个 Selector 和 ThreadPool,彻底解决了单线程 Reactor 的瓶颈:
- MainReactor:专门负责 “接客”(OP_ACCEPT)。它独立运行在一个线程中,保证了无论业务多忙,新连接都能秒级接入。
- SubReactor:专门负责 “服务”(OP_READ/WRITE)。它将 I/O 事件轮询与接入逻辑解耦,提升了并发吞吐量。
- Worker Thread Pool:将最耗时的业务逻辑(asyncRun)从 IO 线程中剥离。即使业务计算需要耗时 100ms,也不会阻塞 SubReactor 去监听其他 Socket 的读写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadEchoServerReactor { private final ServerSocketChannel serverSocket; private final Selector mainSelector; private final SubReactor[] subReactors; private final AtomicInteger next = new AtomicInteger(0);
public MultiThreadEchoServerReactor(int port) throws IOException { mainSelector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false); serverSocket.bind(new InetSocketAddress(port));
SelectionKey sk = serverSocket.register(mainSelector, SelectionKey.OP_ACCEPT); sk.attach(new AcceptorHandler());
int coreNum = Runtime.getRuntime().availableProcessors(); subReactors = new SubReactor[coreNum]; for (int i = 0; i < coreNum; i++) { subReactors[i] = new SubReactor(); new Thread(subReactors[i], "SubReactor-Thread-" + i).start(); }
System.out.println("Reactor 服务器已启动,监听端口:" + port + ",子反应器数:" + coreNum); }
public void startService() { try { while (!Thread.interrupted()) { mainSelector.select(); Set<SelectionKey> keys = mainSelector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); Runnable handler = (Runnable) sk.attachment(); if (handler != null) handler.run(); } } } catch (IOException e) { e.printStackTrace(); } }
class AcceptorHandler implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel != null) { System.out.println("MainReactor: 接收到新连接 " + channel.getRemoteAddress()); int index = (next.getAndIncrement() & 0x7FFFFFFF) % subReactors.length; subReactors[index].register(channel); } } catch (IOException e) { e.printStackTrace(); } } }
class SubReactor implements Runnable { private final Selector selector;
public SubReactor() throws IOException { this.selector = Selector.open(); }
public void register(SocketChannel sc) throws IOException { sc.configureBlocking(false); selector.wakeup(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); sk.attach(new MultiThreadEchoHandler(selector, sc)); }
@Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); Runnable handler = (Runnable) sk.attachment(); if (handler != null) handler.run(); } } } catch (IOException e) { e.printStackTrace(); } } }
static class MultiThreadEchoHandler implements Runnable { private final SocketChannel channel; private final SelectionKey sk; private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECEIVING = 0, SENDING = 1; int state = RECEIVING;
private static final ExecutorService pool = Executors.newFixedThreadPool(8);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); sk = channel.register(selector, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
@Override public void run() { synchronized (this) { try { if (state == RECEIVING) { handleRead(); } else if (state == SENDING) { handleWrite(); } } catch (IOException e) { close(); } } }
private void handleRead() throws IOException { int length = channel.read(byteBuffer); if (length > 0) { byteBuffer.flip(); byte[] dataCopy = new byte[byteBuffer.remaining()]; byteBuffer.get(dataCopy); byteBuffer.clear();
pool.submit(() -> processBusiness(dataCopy)); } else if (length == -1) { close(); } }
private void processBusiness(byte[] data) { String content = new String(data, StandardCharsets.UTF_8); System.out.println("线程池处理中: " + content.trim());
synchronized (this) { byteBuffer.put(data); state = SENDING; sk.interestOps(SelectionKey.OP_WRITE); }
sk.selector().wakeup(); }
private void handleWrite() throws IOException { byteBuffer.flip(); channel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { byteBuffer.clear(); state = RECEIVING; sk.interestOps(SelectionKey.OP_READ); } else { byteBuffer.compact(); } } private void close() { try { System.out.println("连接关闭: " + channel.getRemoteAddress()); sk.cancel(); channel.close(); } catch (IOException ignored) {} } }
public static void main(String[] args) throws IOException { new MultiThreadEchoServerReactor(9000).startService(); } }
|
可以使用 nc 进行测试:
在 Netty 等成熟框架中,线程安全并不是通过给 Handler 加锁来解决的,而是通过以下两种策略:
方案 A:IO 读写必须在 IO 线程完成 (推荐)
- SubReactor 线程(IO 线程):负责 read()。读完后,把读到的字节副本(不再是共享的 Buffer)交给线程池。
- 线程池(业务线程):只负责计算。算完后,把结果交给一个队列,或者直接唤醒 OP_WRITE。
- 好处:永远只有一个线程在操作 SocketChannel 的读写,根本不需要 synchronized。
方案 B:无锁串行化
确保同一个 SocketChannel 的所有任务始终提交给线程池中的同一个线程执行。
- 实现:通过 channel.hashCode() % threadPoolSize 定向分发。
- 好处:同一个连接的请求在线程池里是排队执行的,天然避开了并发冲突。
标题:
Java NIO - 高性能服务器的 Reactor 设计模式