JAVA的并发编程包(java.util.concurrent)中的源码大量用到CAS操作。
CAS操作使得JAVA可以低成本地实现并发操作。
本文将深入探讨CAS操作的实现原理。
什么是CAS操作
CAS(Compare and Swap),也就是比较并替换,简单说就是有三个值:当前内存值(current value)、内存期望值(expected value)、更新值(updated value),当且仅当当前内存值等于内存期望值的时候,将内存值修改为更新值,并且返回true,否则不作任何操作直接返回false。需要注意的事,该操作是原子操作。
例如,AtomicInteger里的compareAndSet方法
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
Unsafe类
JAVA的CAS操作底层都是调用了Unsafe类内部的native方法。这些方法通过JNI调用,直接调用了底层的C语言接口,使得JAVA可以直接操作内存空间。
我们注意到前文提到的unsafe.compareAndSwapInt方法的入参中,有一个静态字段valueOffset。这个字段表示的是value字段在内存相对偏移量,在初始化的时候获取。如下:
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
所有的unsafe的native方法,追踪到hotspot源码,都在hotspot/src/share/vm/prims/unsafe.cpp中实现。
获取字段的字节偏移量
先来看unsafe.objectFieldOffset,我对C++不是特别熟悉,但从语义上其实可以看出,这个方法是根据字段类型获取其相对偏移字节数。
// hotspot/src/share/vm/prims/unsafe.cpp
inline jlong field_offset_from_byte_offset(jlong byte_offset) {
return byte_offset;
}
jint find_field_offset(jobject field, int must_be_static, TRAPS) {
if (field == NULL) {
THROW_0(vmSymbols::java_lang_NullPointerException());
}
oop reflected = JNIHandles::resolve_non_null(field);
oop mirror = java_lang_reflect_Field::clazz(reflected);
Klass* k = java_lang_Class::as_Klass(mirror);
int slot = java_lang_reflect_Field::slot(reflected);
int modifiers = java_lang_reflect_Field::modifiers(reflected);
if (must_be_static >= 0) {
int really_is_static = ((modifiers & JVM_ACC_STATIC) != 0);
if (must_be_static != really_is_static) {
THROW_0(vmSymbols::java_lang_IllegalArgumentException());
}
}
int offset = InstanceKlass::cast(k)->field_offset(slot);
return field_offset_from_byte_offset(offset);
}
UNSAFE_ENTRY(jlong, Unsafe_ObjectFieldOffset(JNIEnv *env, jobject unsafe, jobject field))
UnsafeWrapper("Unsafe_ObjectFieldOffset");
return find_field_offset(field, 0, THREAD);
UNSAFE_END
原子操作的实现
然后接着来看compareAndSwapInt的native实现,先是利用上文获取的字节偏移量offset,获取需要进行CAS操作的对象所在的地址。这部分功能在index_oop_from_field_offset_long中实现,此处不作展开。此外可以看到,
该方法接着通过 Atomic::cmpxchg 来实现比较和替换操作,其中参数x是即将更新的值,参数e是原内存的值。如下:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
最关键的原子操作Atomic::cmpxchg。这个方法在不同的操作系统中实现不一样,这里以linux x86为例:
// hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp
// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
LOCK_IF_MP 表示如果是多处理器,则添加一个lock指令作为前缀,反之,就省略lock前缀(单处理器会不需要lock前缀提供的内存屏障效果)。这里的lock前缀就是使用了处理器的总线锁(最新的处理器都使用缓存锁代替总线锁来提高性能)。从而,实现了原子地交换更新值。
CAS操作的几个问题
ABA问题
CAS操作只有在检查值等于期望值的时候,才会发生替换,但是如果期间发生了两次替换,使得值从A–>B–>A,虽然检查值依然是A,但是期间其实发生了变化。因此,CAS无法检测到ABA问题。
解决办法就是给每个值附带上版本号,每次更新数据都要递增版本号。
因此,JDK1.5中引入了AtomicStampedReference类。如它的compareAndSet方法,需要指定期望版本号expectedStamp,只有在版本号对得上的情况下,才会发生值替换。如下:
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
自旋耗时问题
CAS操作一般是在循环里不断重试,直到成功才中断循环。在并发量高的情况下,这种自旋会增加循环次数,从而增加耗时。比如CountDownLatch类里有如下代码:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 不断重试,直到更新成功才中断循环
return nextc == 0;
}
}
单个变量问题
上述提到的CAS操作只适合单个变量。如果要同时对多个变量进行CAS操作,就无能为力了。为此JDK1.5中有AtomicReference类,这个类支持对一个复杂对象的原子操作,底层同样适用了unsafe。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 duval1024@gmail.com