永顺大牛写的系列教程《源码之下无秘密 ── 做最好的 Netty 源码分析教程》是目前我读过最好的netty源码分析文章。但不知道什么原因,作者在写到第三章的时候停更了。因此,我想尝试凭着个人的理解,续写后边几个章节。
写在最前
永顺前辈已经写完章节有如下:
- Netty 源码分析之 番外篇 Java NIO 的前生今世
- Netty 源码分析之 零 磨刀不误砍柴工 源码分析环境搭建
- Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
- Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
- Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)
- Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (二)
- Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)
- Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(二)
续写章节:
本文使用的netty版本为4.1.33
Future和Promise的关系
Netty内部的io.netty.util.concurrent.Future
Java原生Future
Java并发编程包下提供了Future
// 尝试取消执行
boolean cancel(boolean mayInterruptIfRunning);
// 是否已经被取消执行
boolean isCancelled();
// 是否已经执行完毕
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果或超时后返回
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Netty对Future的扩展
原生的Future
- 增加了更加丰富的状态判断方法
// 判断是否执行成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
- 支持获取导致I/O操作异常
Throwable cause();
- 增加了监听回调有关方法,支持future完成后执行用户指定的回调方法
// 增加回调方法
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 增加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
- 增加了更丰富的阻塞等待结果返回的两类方法。其中一类是sync方法,阻塞等待结果且如果执行失败后向外抛出导致失败的异常;另外一类是await方法,仅阻塞等待结果返回,不向外抛出异常。
// 阻塞等待,且如果失败抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 阻塞等待
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
Promise
Promise
// 设置成功状态并回调
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
// 设置失败状态并回调
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();
可见,Promise
通过Promise设置I/O执行结果
以客户端连接的注册过程为例,调用链路如下:
io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()
一直跟踪到SingleThreadEventLoop中,会看到这段代码:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图我们知道,DefaultChannelPromise同时实现了Future和Promise,具有上述提到的所有方法。
然后继续将该promise传递进另外一个register方法中:
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
在该register方法中,继续将promise传递到Unsafe的register方法中,而立即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务可以拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise可以用于设置执行状态并且回调设置的方法。
我们继续往下debug可以看到:
// io.netty.channel.AbstractChannel.AbstractUnsafe.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
// 如果已经注册过,则置为失败
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// 如果线程类型不兼容,则置为失败
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
// 出现异常情况置promise为失败
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 注册之前,先将promise置为不可取消转态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
// promise置为成功
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
// 出现异常情况置promise为失败
safeSetFailure(promise, t);
}
}
可见,底层的I/O操作成功与否都可以通过Promise设置状态,并使得外层的ChannelFuture可以感知得到I/O操作的结果。
通过ChannelFuture获取I/O执行结果
我们再来看看被返回的ChannelFuture的用途:
// io.netty.bootstrap.AbstractBootstrap.java
final ChannelFuture initAndRegister() {
//...
ChannelFuture regFuture = config().group().register(channel);
// 如果异常不为null,则意味着底层的I/O已经失败,并且promise设置了失败异常
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这里通过检查失败异常栈是否为空,可以提前检查到I/O是否失败。继续回溯,还可以看到:
// io.netty.bootstrap.AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 如果注册已经成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 如果注册尚未完成
// ...
}
}
此处,通过ChannelFuture#isDone()方法可以知道底层的注册是否完成,如果完成,则继续进行bind操作。
但是因为注册是个异步操作,如果此时注册可能还没完成,那就会进入如下逻辑:
// io.netty.bootstrap.AbstractBootstrap.java
//...
else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,而且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置并且回调,就会进入该回调方法。从而将I/O状态继续向外传递。
DefaultChannelPromise的结果传递实现原理
我们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最常用的DefaultChannelPromise为例,内部非常简单,我们主要看它的父类DefaultPromise:
// result字段的原子更新器
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// promise所在的线程
private final EventExecutor executor;
// 一个或者多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;
设置状态
以设置成功状态为例(setSuccess):
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
// 调用回调方法
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 原子修改result字段为objResult
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
// 如果有其他线程在等待该promise的结果,则唤醒他们
notifyAll();
}
}
设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。
其他设置状态方法不再赘言,基本上大同小异。
阻塞线程以等待执行结果
上文提到其他线程会阻塞等待该promise返回结果,具体实现以sync方法为例:
@Override
public Promise<V> sync() throws InterruptedException {
// 阻塞等待
await();
// 如果有异常则抛出
rethrowIfFailed();
return this;
}
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
// 如果已经完成,直接返回
return this;
}
// 可以被中断
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//检查死循环
checkDeadLock();
synchronized (this) {
while (!isDone()) {
// 递增计数器(用于记录有多少个线程在等待该promise返回结果)
incWaiters();
try {
// 阻塞等待结果
wait();
} finally {
// 递减计数器
decWaiters();
}
}
}
return this;
}
所有调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为何Netty客户端或者服务端启动的时候一般都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,如下:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管道中添加基于换行符分割字符串的解析器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 管道中添加字符串编码解码器
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
// 管道中添加服务端处理逻辑
ch.pipeline().addLast(new MyClientEchoHandler());
}
}).connect("127.0.0.1", 9898).sync();
future.channel().closeFuture().sync();
回调机制
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
// 添加回调方法
addListener0(listener);
}
if (isDone()) {
// 如果I/O操作已经结束,直接触发回调
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
// 只有一个回调方法直接赋值
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中
((DefaultFutureListeners) listeners).add(listener);
} else {
// 如果有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
从上边可以看到,添加回调方法完成之后,会立即检查promise是否已经完成;如果promise已经完成,则马上调用回调方法。
总结
Netty的Promise和Future机制是基于Java并发包下的Future
因此,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星一般紧密相连。但我觉得这两者更像是量子纠缠里的两个电子,因为改变其中一个方的状态,另外一方能够马上感知。
至此,Promise和Future的核心原理已经分析完毕。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 duval1024@gmail.com