Reactor模型是基于事件驱动的模型,是高性能网络编程中非常重要概念,常用于解决多核服务器下的如何处理海量I/O问题。Java中大名鼎鼎的Netty网络编程框架的线程模型正是基于Reactor模型。
本文主要基于Doug Lea的文章Scalable IO in Java来介绍下Reactor模型。
本文所有内容均基于前人资料总结而成,如有侵权必删。
传统网络编程
初学Java网络编程的时候,我们学过使用ServerSocket以及Socket来编码客户端与服务端的网络通讯程序。常见的服务端逻辑如下:
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
上述样例代码是一个典型的TPC(Thread Per Connection)模式,这种模式有几个显而易见的问题:
- 1.所有的I/O操作都是阻塞的,包括accept、read、write等操作都会阻塞所在线程;
- 2.每个连接都会新建一个线程,连接结束后线程即被销毁,频繁创建线程带来较高的性能损耗;
- 3.I/O线程和业务process线程耦合;
- 4.服务端采用单线程accept客户端的请求,海量I/O下存在性能瓶颈。
因此,这种TPC的网络模式只适合并发请求量较小的业务情景。
为了解决以上的问题,于是有了Reactor模型。
单线程版本
先来看看最简单的单线程版本Reactor模型。如下图所示:
配图是Doug Lea在他的文章中的图。从图里可以看到:
- Reactor对象负责监听事件,并将事件委派给各个I/O方法执行;
- Reactor线程收到accept事件后,便交给acceptor实例进行处理(注意acceptor实例属于Reactor线程);
- acceptor处理完accept事件后,会将该channel重新注册会Reactor线程中;
- Reactor线程中继续监听这些channel的其他I/O事件,并通过dispatch方法分别处理read、decode、compute、encode、send等I/O事件。
值得注意的是,这张图比较容易混淆的地方是Acceptor并不是一个单独的线程,而是Reactor线程内部的一个实例;且以上所有步骤都是在同一个Reactor线程内完成的。
因为只有一个线程,所以这种单线程的Reactor模型,并没有充分利用多核服务器的CPU资源,性能上甚至不如上边提到的TPC。
Java一般基于NIO的来实现Reactor模型,样例如下:
class Reactor implements Runnable
{
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException
{ //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//附加一个Acceptor实例
sk.attach(new Acceptor());
}
public void run()
{
try
{
while (!Thread.interrupted())
{
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
{
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex)
{ /* ... */ }
}
void dispatch(SelectionKey k)
{
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null)
{
r.run();
}
}
// inner class
class Acceptor implements Runnable
{
public void run()
{
try
{
SocketChannel channel = serverSocket.accept();
if (channel != null)
new Handler(selector, channel);
} catch (IOException ex)
{ /* ... */ }
}
}
}
class Handler implements Runnable
{
final SocketChannel channel;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel c) throws IOException
{
channel = c;
c.configureBlocking(false);
// 一般来说先注册读时间
sk = channel.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete()
{
/* ... */
return false;
}
boolean outputIsComplete()
{
/* ... */
return false;
}
void process()
{
/* ... */
return;
}
public void run()
{
try
{
if (state == READING)
{
read();
}
else if (state == SENDING)
{
send();
}
} catch (IOException ex)
{ /* ... */ }
}
void read() throws IOException
{
channel.read(input);
if (inputIsComplete())
{
// 进行其他业务处理
process();
state = SENDING;
// 一般来说读完数据之后,重新注册写实践到Selector中
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException
{
channel.write(output);
//write事件结束后, 关闭select key
if (outputIsComplete())
{
sk.cancel();
}
}
}
如上实现,Acceptor只是Reactor线程内部实例,相当于一个特殊的Handler。
多线程版本
单线程版本的主要问题之一是:I/O处理逻辑与Reactor耦合共用了同一个线程。于是,将I/O处理逻辑剥离出来,交由单独的线程池来运行。于是有如下设计:
实现上只需要变更Handler的代码,由一个全局的线程池来执行任务即可:
class Handler implements Runnable {
// 全局公用的线程池
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}
该版本存在问题是:acceptor与Reactor共用一个线程,如果有海量的请求或者连接时候需要身份认证等耗时操作,会阻塞Reactor线程,影响了Reactor线程监控I/O事件而成为性能瓶颈。
主从Reator多线程版本
为了解决以上问题,多Reactor多线程版本,引入多个Reactor,部分Reactor线程专门负责处理请求事件,以应对海量的请求或者耗时的连接操作;而其他的Reactor线程负责监听其他I/O事件。于是有如下设计:
这个版本有如下特点:
- 单独的mainReactor线程来监听accept事件;
- 单独的subReactor线程来监听其他I/O事件;
- 使用单独的线程池来处理业务。
- 实际生产中常用线程池来运行mainReactor和subReactor
现在许多高性能的网络应用或中间件都是采用这种主从多线程版本。
Reactor模型在Netty中的应用
Netty的服务端在初始化ServerBootstrap的时候可以指定parentGroup和childGroup两个线程池。这两个线程池正好对应承载mainReactor和subReactor的两个线程池。
// io.netty.bootstrap.ServerBootstrap
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
因此通过控制parentGroup和childGroup的线程池大小,可以实现以上各个版本的reactor模型。
- 单线程版本:
NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
- 多线程版本:
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
- 主从多线程版本
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
参考资料
- Scalable IO in Java Doug Lea大牛写的文章,简明扼要,网上的资料基本都是从这文章衍生出来的。
- Reactor模式 这篇文章讲得比较清晰,但是画的图相对容易理解,但是不完全吻合Doug Lea在文章所描述的意思。
- Netty源码分析 目前网上最好的Netty源码分析
- Netty 系列之 Netty 线程模型 这文章线程模型总结的比较细致
- 单服务器高性能模式:Reactor与Proactor 这文章介绍了Reactor模型以及Proactor模型
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 duval1024@gmail.com