共享模型之无锁
本章主要内容:
- CAS 与 volatile
- 原子整数
- 原子引用
- 原子累加器
- Unsafe
# 1. 问题提出
有一个银行账户用来取钱,如何才能在多线程并发取钱时还能正确执行呢?如下代码所示,如何才能实现 getBalance()
和 withdraw()
方法呢?
import java.util.ArrayList;
import java.util.List;
interface Account {
// 获取余额
Integer getBalance() {
...
};
// 取款
void withdraw(Integer amount) {
...
};
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end-start)/1000_000 + " ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 1.1 一种不安全的简单实现
但如下的一种简单实现是线程不安全的:
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return balance;
}
@Override
public void withdraw(Integer amount) {
balance -= amount;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 1.2 思路 1 - synchronized 锁
首先想到的是给 Account 对象加锁:
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public synchronized Integer getBalance() {
return balance;
}
@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1.3 思路 2 - 无锁(AtomicInteger)
这里的“余额”使用 AtomicInteger,它支持原子性的更新操作。这里注意一下 withdraw()
的实现。
class AccountSafe implements Account {
private AtomicInteger balance; //原子整数
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next)) {
break;
}
}
// 可以简化为下面的方法
// balance.addAndGet(-1 * amount);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 2. CAS 与 volatile
# 2.1 CAS
前面看到 AtomicInteger 内部没有使用锁来保护共享变量的线程安全,那它是如何实现的呢?
public void withdraw(Integer amount) {
while(true) {
// 需要不断尝试,直到成功为止
while (true) {
// 比如拿到了旧值 1000
int prev = balance.get();
// 在这个基础上 1000-10 = 990
int next = prev - amount;
/*
compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
- 不一致了,next 作废,返回 false 表示失败
比如,别的线程已经做了减法,当前值已经被减成了 990
那么本线程的这次 990 就作废了,进入 while 下次循环重试
- 一致,以 next 设置为新值,返回 true 表示成功
*/
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
其中的关键是 compareAndSet,它的简称就是 CAS(也有 Compare And Swap 的说法),它内部在 CPU 级别上实现了【比较-交换】的原子性。
# 2.2 volatile
我们看 AtomicInteger 的实现的话:
可以看到他其中使用 volatile 来修饰它包装的 value。
为什么它要用 volatile 来修饰呢?之前我们获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰,因为它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
所以 CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果。
# 2.3 为什么 CAS 的效率相对更高
synchronized 和 cas(无锁) 没有绝对的谁效率高,要看所处的场景。
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
- 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速... 恢复到高速运行,代价比较大
- 当然,无锁情况下,线程想要运行也需要 CPU 分给他时间片。
# 2.4 CAS 特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
# 3. 原子整数
JUC 并发包提供了:
- AtomicInteger
- AtomicBoolean
- AtomicLong
以 AtomicInteger 为例来看一下常用方法:
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 4. 原子引用
为什么需要原子引用类型?因为我们想要保护的共享数据并不一定都是基本类型的,可能是 List 或 Map 等。
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
有下面一个例子,我们让“账户余额”使用 BigDecimal 类型,现在需要将 withdraw()
方法实现为线程安全的操作:
public interface DecimalAccount {
// 获取余额
BigDecimal getBalance();
// 取款
void withdraw(BigDecimal amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(DecimalAccount account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 4.1 不安全的实现
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 4.2 安全实现 - 使用锁
可以给 withdraw()
方法加上 synchronized 来保护。
# 4.3 安全实现 - 使用 CAS AtomicReference
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
测试一下可以发现,并发情况能也能正确执行。
# 4.4 ABA 问题及解决
# 4.4.1 什么是 ABA 问题
有如下这么一个共享变量:
static AtomicReference<String> ref = new AtomicReference<>("A");
现在对其做 compareAndSet
操作,某个线程的 CAS 操作是“比较是否为 A,是的话就改成 B”,但是它仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况。因此如果希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。
# 4.4.2 AtomicStampedReference(维护版本号)
AtomicStampedReference 可以给原子引用加上版本号,用于追踪原子引用整个的变化过程,比如 A -> B -> A -> C
,通过 AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
常用 API:
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) {
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
// CAS 操作
ref.compareAndSet(prev, "C", stamp, stamp + 1)
}
2
3
4
5
6
7
8
9
10
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference。
# 4.4.3 AtomicMarkableReference(仅维护是否修改过)
用一个简单的例子来解释这个东西的使用。
假设我们有一个保洁阿姨和主人,主人负责检查垃圾袋是否满了,如果满了就换一个新垃圾袋,保洁阿姨负责倒空垃圾袋,当保洁阿姨倒空了垃圾之后,主人就不需要更换垃圾袋了。
先定义一个垃圾袋:
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后看一下示例代码:
@Slf4j
public class TestABAAtomicMarkableReference {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
ref.compareAndSet(bag, bag, true, false); // 倒空垃圾袋,但还是那个旧垃圾袋
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false); // 换一个新的垃圾袋
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
compareAndSet()
除了传入旧引用和新引用外,还要传入 expectedMark 和 newMark 来设置标记,这里标记(mark)就代表垃圾袋是否满了。
# 5. 原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
对比一下普通数组的操作与原子数组的操作:
操作 | 普通数组 | 原子数组 |
---|---|---|
初始化 | new int[10] | new AtomicIntegerArray(10) |
对某个元素自增 | array[index]++ | array.getAndIncrement(index) |
打印数组元素 | System.out.println(Arrays.toString(array)) | System.out.println(array) |
# 6. 字段原子更新器 AtomicXXXFieldUpdater
刚刚的原子数组保护的是数组内的某个元素,这里的字段原子更新器保护的是成员变量,它能保证当多个线程共同访问同一对象的同一成员属性时的安全性。
- AtomicReferenceFieldUpdater // 域字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
public class Test5 {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
Test5 test5 = new Test5();
fieldUpdater.compareAndSet(test5, 0, 10);
// 修改成功 field = 10
System.out.println(test5.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test5, 10, 20);
System.out.println(test5.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test5, 10, 30);
System.out.println(test5.field);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 7. 原子累加器
虽然 AtomicInteger 也能实现一个原子累加器,但 JDK 8 之后增加了些专门用于做累加的原子类,包括 LongAdder 和 LongAccumulator,它们的性能都比 AtomicLong 的性能要好很多。
# 7.1 以 LongAdder 为例
LongAdder 的 increment()
方法便是用来自增。
实现原理的思路:相比于 AtomicLong 的性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0]
,而 Thread-1 累加 Cell[1]...
最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
# 7.2 源码之 LongAdder
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧。
LongAdder 类有几个关键字段:
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
2
3
4
5
6
7
8
// 先跳过,原视频 LongAdder 原理 (opens new window)
# 8.Unsafe 类
取名 Unsafe 并不是说该类不安全,而是因为该类直接操作内存,比较复杂,意在告诉程序员使用该类有较大风险。
前面讲的 CAS、LockSupport 等类的底层实现都是使用了 Unsafe 来实现的。
# 8.1 概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。
下面的代码可以用于获取 Unsafe 类的对象:
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 8.2 Unsafe CAS 操作
这里借助 Unsafe 的一些 CAS 操作来实现对一个类中字段的安全修改:
import lombok.Data;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class TestUnsafeCAS {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
// Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
// theUnsafe.setAccessible(true);
// Unsafe unsafe = (Unsafe) theUnsafe.get(null);
Unsafe unsafe = UnsafeAccessor.getUnsafe();
System.out.println(unsafe);
// 1. 获取域的偏移地址
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
Teacher t = new Teacher();
System.out.println(t);
// 2. 执行 cas 操作
unsafe.compareAndSwapInt(t, idOffset, 0, 1);
unsafe.compareAndSwapObject(t, nameOffset, null, "张三");
// 3. 验证
System.out.println(t);
}
}
@Data
class Teacher {
volatile int id;
volatile String name;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
输出:
sun.misc.Unsafe@77556fd
Teacher(id=0, name=null)
Teacher(id=1, name=张三)
2
3
# 9. 小结
这一章主要学习了一些无锁的数据结构
- CAS 与 volatile
- API
- 原子整数
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
- Unsafe