Netty 源码分析之 四 Promise 与 Future: 双子星的秘密

永顺大牛写的系列教程《源码之下无秘密 ── 做最好的 Netty 源码分析教程》是目前我读过最好的netty源码分析文章。但不知道什么原因,作者在写到第三章的时候停更了。因此,我想尝试凭着个人的理解,续写后边几个章节。

写在最前

永顺前辈已经写完章节有如下:

续写章节:

本文使用的netty版本为4.1.33

Future和Promise的关系

Netty内部的io.netty.util.concurrent.Future 继承自java.util.concurrent.Future,而Promise是前者的一个特殊实现。
Future和Promise类图.png

Java原生Future

Java并发编程包下提供了Future接口。Future在异步编程中表示该异步操作的结果,通过Future的内部方法可以实现状态检查、取消执行、获取执行结果等操作。内部的方法如下:

    // 尝试取消执行
    boolean cancel(boolean mayInterruptIfRunning);
    // 是否已经被取消执行
    boolean isCancelled();
    // 是否已经执行完毕
    boolean isDone();
    // 阻塞获取执行结果
    V get() throws InterruptedException, ExecutionException;
    // 阻塞获取执行结果或超时后返回
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Netty对Future的扩展

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

  • 增加了更加丰富的状态判断方法
    // 判断是否执行成功
    boolean isSuccess();
    // 判断是否可以取消执行
    boolean isCancellable();
  • 支持获取导致I/O操作异常
    Throwable cause();
  • 增加了监听回调有关方法,支持future完成后执行用户指定的回调方法
    // 增加回调方法
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 增加多个回调方法
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 删除回调方法
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 删除多个回调方法
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  • 增加了更丰富的阻塞等待结果返回的两类方法。其中一类是sync方法,阻塞等待结果且如果执行失败后向外抛出导致失败的异常;另外一类是await方法,仅阻塞等待结果返回,不向外抛出异常。
    // 阻塞等待,且如果失败抛出异常
    Future<V> sync() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> syncUninterruptibly();

    // 阻塞等待
    Future<V> await() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> awaitUninterruptibly();

Promise

Promise接口继续继承了Future,并增加若干个设置状态并回调的方法:

    // 设置成功状态并回调
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    // 设置失败状态并回调
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 设置为不可取消状态
    boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。

通过Promise设置I/O执行结果

以客户端连接的注册过程为例,调用链路如下:

io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()

一直跟踪到SingleThreadEventLoop中,会看到这段代码:

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图我们知道,DefaultChannelPromise同时实现了Future和Promise,具有上述提到的所有方法。

然后继续将该promise传递进另外一个register方法中:

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

在该register方法中,继续将promise传递到Unsafe的register方法中,而立即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务可以拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise可以用于设置执行状态并且回调设置的方法。

我们继续往下debug可以看到:

 // io.netty.channel.AbstractChannel.AbstractUnsafe.java
 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                // 如果已经注册过,则置为失败
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 如果线程类型不兼容,则置为失败
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    // 出现异常情况置promise为失败
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 注册之前,先将promise置为不可取消转态
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();
                // promise置为成功
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
             
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                // 出现异常情况置promise为失败
                safeSetFailure(promise, t);
            }
        }

可见,底层的I/O操作成功与否都可以通过Promise设置状态,并使得外层的ChannelFuture可以感知得到I/O操作的结果。

通过ChannelFuture获取I/O执行结果

我们再来看看被返回的ChannelFuture的用途:

// io.netty.bootstrap.AbstractBootstrap.java

    final ChannelFuture initAndRegister() {
        //...
        ChannelFuture regFuture = config().group().register(channel);
        // 如果异常不为null,则意味着底层的I/O已经失败,并且promise设置了失败异常
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

这里通过检查失败异常栈是否为空,可以提前检查到I/O是否失败。继续回溯,还可以看到:

// io.netty.bootstrap.AbstractBootstrap.java

 private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 如果注册已经成功
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
       // 如果注册尚未完成
       // ...
    }
}

此处,通过ChannelFuture#isDone()方法可以知道底层的注册是否完成,如果完成,则继续进行bind操作。

但是因为注册是个异步操作,如果此时注册可能还没完成,那就会进入如下逻辑:

// io.netty.bootstrap.AbstractBootstrap.java

//...
else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,而且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置并且回调,就会进入该回调方法。从而将I/O状态继续向外传递。

DefaultChannelPromise的结果传递实现原理

我们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最常用的DefaultChannelPromise为例,内部非常简单,我们主要看它的父类DefaultPromise:

    // result字段的原子更新器
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // 缓存执行结果的字段
    private volatile Object result;
    // promise所在的线程
    private final EventExecutor executor;
    // 一个或者多个回调方法
    private Object listeners;
    // 阻塞线程数量计数器  
    private short waiters;

设置状态

以设置成功状态为例(setSuccess):


    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            // 调用回调方法
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        // 原子修改result字段为objResult
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

    private synchronized void checkNotifyWaiters() {
        if (waiters > 0) {
            // 如果有其他线程在等待该promise的结果,则唤醒他们
            notifyAll();
        }
    }

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

其他设置状态方法不再赘言,基本上大同小异。

阻塞线程以等待执行结果

上文提到其他线程会阻塞等待该promise返回结果,具体实现以sync方法为例:

    @Override
    public Promise<V> sync() throws InterruptedException {
        // 阻塞等待
        await();
        // 如果有异常则抛出
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            // 如果已经完成,直接返回
            return this;
        }
        // 可以被中断
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        //检查死循环
        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                // 递增计数器(用于记录有多少个线程在等待该promise返回结果)
                incWaiters();
                try {
                    // 阻塞等待结果
                    wait();
                } finally {
                    // 递减计数器
                    decWaiters();
                }
            }
        }
        return this;
    }

所有调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为何Netty客户端或者服务端启动的时候一般都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,如下:

    Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 管道中添加基于换行符分割字符串的解析器
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    // 管道中添加字符串编码解码器
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                    // 管道中添加服务端处理逻辑
                    ch.pipeline().addLast(new MyClientEchoHandler());
                }
            }).connect("127.0.0.1", 9898).sync();

    future.channel().closeFuture().sync();

回调机制

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            // 添加回调方法
            addListener0(listener);
        }

        if (isDone()) {
            // 如果I/O操作已经结束,直接触发回调
            notifyListeners();
        }

        return this;
    }

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            // 只有一个回调方法直接赋值
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            // 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            // 如果有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

从上边可以看到,添加回调方法完成之后,会立即检查promise是否已经完成;如果promise已经完成,则马上调用回调方法。

总结

Netty的Promise和Future机制是基于Java并发包下的Future开发的。其中Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

因此,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星一般紧密相连。但我觉得这两者更像是量子纠缠里的两个电子,因为改变其中一个方的状态,另外一方能够马上感知。

至此,Promise和Future的核心原理已经分析完毕。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 duval1024@gmail.com

×

喜欢就点赞,疼爱就打赏