共享模型之管程 3:park、多把锁与活跃性
# 1. Park & Unpark
# 1.1 基本使用
它们都是 LockSupport 类中的方法:
LockSupport.park()
:暂停当前线程LockSupport.unpark(暂停线程对象)
:恢复某个线程的运行
# 1.1.1 先 park 再 unpark
使用时可以先 park 再 unpark:
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
2
3
4
5
6
7
8
9
10
11
12
输出:
18:42:52.585 c.TestParkUnpark [t1] - start...
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...
2
3
4
# 1.1.2 先 unpark 再 park
也可以先 unpark 再 park:
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
2
3
4
5
6
7
8
9
10
11
12
输出:
18:43:50.765 c.TestParkUnpark [t1] - start...
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...
2
3
4
这种可以先 unpark 再 park 也能运行的方式比较特别,之后会看到它的底层原理,这里先记住这个结论就好。
# 1.1.3 特点
与 Object 的 wait & notify 相比:
- wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
# 1.2 原理之 park & unpark
每个线程都有自己的一个(C代码实现的) Parker 对象,由三部分组成 _counter
, _cond
和 _mutex
。
打个比喻,线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _cond
就好比背包中的帐篷。_counter
就好比背包中的备用干粮(0 为耗尽,1 为充足):
- 调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
- 调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留,继续前进
- 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮,也就是多次 unpark 后只会让紧跟着的一次 park 失效
按照这个原理,我们看一下调用 park & unpark 时发生的情况:
# 1.2.1 先 park 再 unpark
- 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 0,这时,获得_mutex
互斥锁 - 线程进入
_cond
条件变量阻塞 - 设置
_counter = 0
# 1.2.2 先 unpark 再 park
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为 1 - 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 1,这时线程无需阻塞,继续运行 - 设置
_counter
为 0
# 2. 重新理解线程状态转换
# 2.1 概览图
IDEA 在 debug 时所使用的术语可能会有点不一致。
# 2.2 各种转换情况
# 1)NEW --> RUNNABLE
假设有线程 thread t
,NEW 状态表示只创建了一个 Java 的线程对象,还没有与 OS 的线程关联起来。
当调用 t.start()
时,由 NEW --> RUNNABLE。
# 2)RUNNABLE <---> WAITING
t 线程用 synchronized(obj)
获取了对象锁后
- 调用 obj.wait() 方法时,t 线程从 RUNNABLE --> WAITING
- 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
- 竞争锁成功,t 线程从 WAITING --> RUNNABLE
- 竞争锁失败,t 线程从 WAITING --> BLOCKED
public class TestWaitNotify {
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...."); // 断点
}
},"t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...."); // 断点
}
},"t2").start();
sleep(0.5);
log.debug("唤醒 obj 上其它线程");
synchronized (obj) {
obj.notifyAll(); // 唤醒obj上所有等待线程 断点
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 3)RUNNABLE <--> WAITING
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING。
注意不是 t 去 WAITING,而是调用
t.join()
的线程会在 t 线程对象的监视器上等待。
t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE。
# 4)RUNNABLE <--> WAITING
- 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING。
- 调用 LockSupport.unpark(目标线程) 或调用了线程的 interrupt() ,会让目标线程从 WAITING -->RUNNABLE
# 5)RUNNABLE <--> TIMED_WAITING
t 线程用 synchronized(obj)
获取了对象锁后
- 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
- t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
- 竞争锁成功,t 线程从TIMED_WAITING --> RUNNABLE
- 竞争锁失败,t 线程从TIMED_WAITING --> BLOCKED
# 6)RUNNABLE <--> TIMED_WAITING
- 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
- 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING --> RUNNABLE
# 7)RUNNABLE <--> TIMED_WAITING
- 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
- 当前线程等待时间超过了 n 毫秒,当前线程从TIMED_WAITING --> RUNNABLE
# 8)RUNNABLE <--> TIMED_WAITING
- 当前线程调用
LockSupport.parkNanos(long nanos)
或LockSupport.parkUntil(long millis)
时,当前线 程从 RUNNABLE --> TIMED_WAITING - 调用
LockSupport.unpark(目标线程)
或调用了线程的interrupt()
,或是等待超时,会让目标线程从 TIMED_WAITING--> RUNNABLE
# 9)RUNNABLE <--> BLOCKED
- t 线程用
synchronized(obj)
获取对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED - 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然BLOCKED
# 10)RUNNABLE --> TERMINATED
当前线程所有代码运行完毕,进入 TERMINATED。
至此,线程这六种状态的转变就讲解完了。
# 3. 多把锁 & 活跃性
# 3.1 多把不相干的锁
假如一间大屋子有两个功能:睡觉、学习,互不相干。 现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低。
解决方法就是——准备多个房间(多个对象锁)。
例如:
class BigRoom {
public void sleep() {
synchronized (this) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (this) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
上面这个代码中,study 和 sleeping 共用一个 this
作为锁,比较慢,可以将一个 big room 拆成多个 room,改进代码如下:
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
将锁的粒度细分:
- 好处,是可以增强并发度
- 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁
# 3.2 死锁
一个线程需要同时获取多把锁,这时就容易发生死锁:
- t1 线程已获得 A 锁,接下来想要 B 锁
- t2 线程已获得 B 锁,接下来想要 A 锁
这样就发生了死锁。
定位死锁的方法:可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁。
# 3.3 活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- 这两个线程分别单独运行都没问题,但如果一块运行的话,都在互相改变对方的结束运行条件,这导致最终谁也结束不了。
活锁的解决办法就是,让这两个的执行时间交错开来。
# 3.4 饥饿
饥饿:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及饥饿问题。
下面举一个“饥饿”的例子:在五人哲学家就餐问题中,五个哲学家围着坐,只有五根筷子,每位哲学家左右手边各有一根筷子,如果筷子被身边的人拿着,自己就得等待。
每个哲学家先拿起左边筷子再拿右边筷子,然后才能吃饭,实现代码如下:
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
@Override
public void run() {
while (true) {
// 获得左手筷子
synchronized (left) {
// 获得右手筷子
synchronized (right) {
// 吃饭
eat();
}
// 放下右手筷子
}
// 放下左手筷子
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
如果就餐过程如下:
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
2
3
4
5
6
7
8
9
10
11
运行后就会发现,一会就死锁了,因为每个人同时拿起左边的筷子,这个局面就死了。一种解决办法是顺序加锁,比如有 A 和 B 两把锁,要求所有线程必须先拿 A 锁才能再拿 B 锁,从而可以防止死锁的出现。把顺序加锁用到哲学家就餐问题中,就餐过程的代码如下:
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
// new Philosopher("阿基米德", c5, c1).start();
new Philosopher("阿基米德", c1, c5).start(); //线程饥饿
2
3
4
5
6
- 将
阿基米德
获取筷子的顺序变了一下,要求必须先获得小号的筷子,才能获得大号的筷子,从而实现了顺序加锁来防止死锁。
运行后你会发现,阿基米德很少能 eat
,发生了“饥饿”现象。解决饥饿问题的方法就是 ReentrantLock。