Java高性能网络编程--Reactor模型

  1. 传统网络编程
  2. 单线程版本
  3. 多线程版本
  4. 主从Reator多线程版本
  5. Reactor模型在Netty中的应用
  6. 参考资料

Reactor模型是基于事件驱动的模型,是高性能网络编程中非常重要概念,常用于解决多核服务器下的如何处理海量I/O问题。Java中大名鼎鼎的Netty网络编程框架的线程模型正是基于Reactor模型。

本文主要基于Doug Lea的文章Scalable IO in Java来介绍下Reactor模型。
本文所有内容均基于前人资料总结而成,如有侵权必删。

传统网络编程

初学Java网络编程的时候,我们学过使用ServerSocket以及Socket来编码客户端与服务端的网络通讯程序。常见的服务端逻辑如下:

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }

        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

上述样例代码是一个典型的TPC(Thread Per Connection)模式,这种模式有几个显而易见的问题:

  • 1.所有的I/O操作都是阻塞的,包括accept、read、write等操作都会阻塞所在线程;
  • 2.每个连接都会新建一个线程,连接结束后线程即被销毁,频繁创建线程带来较高的性能损耗;
  • 3.I/O线程和业务process线程耦合;
  • 4.服务端采用单线程accept客户端的请求,海量I/O下存在性能瓶颈。

因此,这种TPC的网络模式只适合并发请求量较小的业务情景。

为了解决以上的问题,于是有了Reactor模型。

单线程版本

先来看看最简单的单线程版本Reactor模型。如下图所示:

单线程Reactor模型.jpg

配图是Doug Lea在他的文章中的图。从图里可以看到:

  • Reactor对象负责监听事件,并将事件委派给各个I/O方法执行;
  • Reactor线程收到accept事件后,便交给acceptor实例进行处理(注意acceptor实例属于Reactor线程);
  • acceptor处理完accept事件后,会将该channel重新注册会Reactor线程中;
  • Reactor线程中继续监听这些channel的其他I/O事件,并通过dispatch方法分别处理read、decode、compute、encode、send等I/O事件。

值得注意的是,这张图比较容易混淆的地方是Acceptor并不是一个单独的线程,而是Reactor线程内部的一个实例;且以上所有步骤都是在同一个Reactor线程内完成的。

因为只有一个线程,所以这种单线程的Reactor模型,并没有充分利用多核服务器的CPU资源,性能上甚至不如上边提到的TPC。

Java一般基于NIO的来实现Reactor模型,样例如下:

class Reactor implements Runnable
{
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException
    { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //附加一个Acceptor实例
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                {
                    //Reactor负责dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //调用之前注册的callback对象
        if (r != null)
        {
            r.run();
        }
    }

    // inner class
    class Acceptor implements Runnable
    {
        public void run()
        {
            try
            {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new Handler(selector, channel);
            } catch (IOException ex)
            { /* ... */ }
        }
    }
}


class Handler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // 一般来说先注册读时间
        sk = channel.register(selector, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
        /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

        /* ... */
        return false;
    }

    void process()
    {
        /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void read() throws IOException
    {
        channel.read(input);
        if (inputIsComplete())
        {
            // 进行其他业务处理
            process();
            state = SENDING;
            // 一般来说读完数据之后,重新注册写实践到Selector中
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write事件结束后, 关闭select key
        if (outputIsComplete())
        {
            sk.cancel();
        }
    }
}

如上实现,Acceptor只是Reactor线程内部实例,相当于一个特殊的Handler。

多线程版本

单线程版本的主要问题之一是:I/O处理逻辑与Reactor耦合共用了同一个线程。于是,将I/O处理逻辑剥离出来,交由单独的线程池来运行。于是有如下设计:

多线程Reactor模型.jpg

实现上只需要变更Handler的代码,由一个全局的线程池来执行任务即可:

class Handler implements Runnable {
    // 全局公用的线程池
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE);
    }
    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}

该版本存在问题是:acceptor与Reactor共用一个线程,如果有海量的请求或者连接时候需要身份认证等耗时操作,会阻塞Reactor线程,影响了Reactor线程监控I/O事件而成为性能瓶颈。

主从Reator多线程版本

为了解决以上问题,多Reactor多线程版本,引入多个Reactor,部分Reactor线程专门负责处理请求事件,以应对海量的请求或者耗时的连接操作;而其他的Reactor线程负责监听其他I/O事件。于是有如下设计:

多Reactor多线程模型.jpg

这个版本有如下特点:

  • 单独的mainReactor线程来监听accept事件;
  • 单独的subReactor线程来监听其他I/O事件;
  • 使用单独的线程池来处理业务。
  • 实际生产中常用线程池来运行mainReactor和subReactor

现在许多高性能的网络应用或中间件都是采用这种主从多线程版本。

Reactor模型在Netty中的应用

Netty的服务端在初始化ServerBootstrap的时候可以指定parentGroup和childGroup两个线程池。这两个线程池正好对应承载mainReactor和subReactor的两个线程池。

// io.netty.bootstrap.ServerBootstrap
@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

/**
    * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
    * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
    * {@link Channel}'s.
    */
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

因此通过控制parentGroup和childGroup的线程池大小,可以实现以上各个版本的reactor模型。

  • 单线程版本:
NioEventLoopGroup group = new NioEventLoopGroup(1);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ServerHandlerInitializer());
  • 多线程版本:
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ServerHandlerInitializer());
  • 主从多线程版本
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ServerHandlerInitializer());

参考资料


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 duval1024@gmail.com

×

喜欢就点赞,疼爱就打赏