notebook notebook
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)

学习笔记

啦啦啦,向太阳~
首页
  • 计算机网络
  • 计算机系统
  • 数据结构与算法
  • 计算机专业课
  • 设计模式
  • 前端 (opens new window)
  • Java 开发
  • Python 开发
  • Golang 开发
  • Git
  • 软件设计与架构
  • 大数据与分布式系统
  • 常见开发工具

    • Nginx
  • 爬虫
  • Python 数据分析
  • 数据仓库
  • 中间件

    • MySQL
    • Redis
    • Elasticsearch
    • Kafka
  • 深度学习
  • 机器学习
  • 知识图谱
  • 图神经网络
  • 应用安全
  • 渗透测试
  • Linux
  • 云原生
面试
  • 收藏
  • paper 好句
GitHub (opens new window)
  • Java开发

    • My

    • Posts

    • Java SE

    • Java 并发编程

      • 黑马

        • 概览
        • Java 线程
        • 共享模型之管程 1:synchronized 和锁的优化
        • 共享模型之管程 2:wait-notify
          • 1. wait / notify
            • 1.1 原理之 wait / notify
            • 1.2 API 介绍
            • 1.3 wait 与 sleep 的区别
            • 1.4 wait-notify 的正确姿势
          • 2. 案例:同步模式之保护性暂停
            • 2.1 定义
            • 2.2 实现
            • 2.3 使用示例
            • 2.4 带超时版的 GuardedObject
            • 2.5 join 的实现原理
            • 2.6 扩展:多任务版 GuardedObject
            • 2.7 扩展:异步模式之生产者/消费者模式
        • 共享模型之管程 3:park、多把锁与活跃性
        • 共享模型之管程 4:ReentrantLock
        • 共享模型之内存
        • 共享模型之无锁
        • 共享模型之不可变类
        • 线程池
        • AQS 原理
        • ReentrantLock 实现原理
      • 专栏:Java 并发编程实战

    • JVM

    • JDBC

    • Java Web

    • Spring 与 SpringMVC

    • Spring Boot

    • Spring Cloud

    • Spring Security

    • Netty

    • MyBatis

  • Python开发

  • Golang开发

  • Git

  • 软件设计与架构

  • 大数据与分布式系统

  • 区块链

  • Nginx

  • 开发
  • Java开发
  • Java 并发编程
  • 黑马
yubin
2023-06-24
目录

共享模型之管程 2:wait-notify

# 1. wait / notify

# 1.1 原理之 wait / notify

20230624203454

owner 线程发现条件不满足,便会调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态。

BLOCKED 和 WAITING 的线程有以下区别:

  • 都是处于阻塞状态,不占用 CPU 时间片,只是阻塞的原因不同:
    • BLOCKED 的是在等待锁
    • WAITING 是已经得到过锁,但又因为缺少其他资源而放弃锁进行等待
  • 被唤醒的方式不同
    • BLOCKED 线程会在 owner 线程释放锁时被唤醒。
    • WAITING 线程会在 owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需要进入 EntrySet 重新竞争。

# 1.2 API 介绍

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法。

final static Object obj = new Object();

public static void main(String[] args) {
    new Thread(() -> {
        synchronized (obj) {
            log.debug("执行....");
            try {
                obj.wait(); // 让线程在obj上一直等待下去
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    
    new Thread(() -> {
        synchronized (obj) {
            log.debug("执行....");
            try {
                obj.wait(); // 让线程在obj上一直等待下去
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    
    // 主线程两秒后执行
    sleep(2);
    log.debug("唤醒 obj 上其它线程");
    synchronized (obj) {
        obj.notify(); // 唤醒obj上一个线程
        // obj.notifyAll(); // 唤醒obj上所有等待线程
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

主线程 main 调用 notify 的一种结果:

20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
1
2
3
4

调用 notifyAll 的结果:

19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....
1
2
3
4
5

wait 也分成有时限和无时限的:

  • wait() 或 wait(0) 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到 notify 为止
  • wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify

# 1.3 wait 与 sleep 的区别

sleep(n) 与 wait(n) 的区别如下:

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
    • sleep 只是释放 CPU,不释放锁

两者的状态是一样的,都是 TIMED_WAITING。

# 1.4 wait-notify 的正确姿势

假如我们有一个 room,现在两个人需要竞争这个 room,并在 room 里面执行一段线程安全的代码,但这两个人中,一个人需要有烟才能干活,另一个人需要有外卖吃才能干活。有下面三个变量:

static final Object room = new Object();  // room,是个共享资源
static boolean hasCigarette = false;   // 是否有烟
static boolean hasTakeout = false;    // 是否有外卖
1
2
3

下面看看如何一步步改造成正确的实现姿势。

# step-1/例 1:sleep 会阻碍其他线程的执行

下面这个代码中,小南在发现等待烟的时候使用了 sleep 来等待,这样会阻碍其他线程对 room 的使用,实现效果不太好:






 























new Thread(() -> {
    synchronized (room) {
        log.debug("有烟没?[{}]", hasCigarette);
        if (!hasCigarette) {
            log.debug("没烟,先歇会!");
            sleep(2);
        }
        log.debug("有烟没?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("可以开始干活了");
        }
    }
}, "小南").start();

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        synchronized (room) {
            log.debug("可以开始干活了");
        }
    }, "其它人").start();
}

sleep(1);
new Thread(() -> {
    // 这里能不能加 synchronized (room)? 不能
    hasCigarette = true;
    log.debug("烟到了噢!");
}, "送烟的").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

解决方式就是将 sleep 改为 wait-notify 的方式。

# step-2/例 2:虚假唤醒问题

我们将 sleep 改为 wait,同时将具有竞争关系的两个 thread 都加了进来,他们共同使用同一个 room:







 




















 






















new Thread(() -> {
    synchronized (room) {
        log.debug("有烟没?[{}]", hasCigarette);
        if (!hasCigarette) {
            log.debug("没烟,先歇会!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("有烟没?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("可以开始干活了");
        } else {
            log.debug("没干成活...");
        }
    }
}, "小南").start();

new Thread(() -> {
    synchronized (room) {
        Thread thread = Thread.currentThread();
        log.debug("外卖送到没?[{}]", hasTakeout);
        if (!hasTakeout) {
            log.debug("没外卖,先歇会!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("外卖送到没?[{}]", hasTakeout);
        if (hasTakeout) {
            log.debug("可以开始干活了");
        } else {
            log.debug("没干成活...");
        }
    }
}, "小女").start();

sleep(1);
new Thread(() -> {
    synchronized (room) {
        hasTakeout = true;
        log.debug("外卖到了噢!");
        room.notify();
    }
}, "送外卖的").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

但问题在于 notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为虚假唤醒。

虚假唤醒的解决方法:改为 notifyAll。

# step-3/例 3:if + wait 仅有一次判断机会

当我们把上面的代码改成 notifyAll() 后,又有一个新的问题:当有了烟而 notifyAll 后,会把等待外卖的小女也给唤醒,而小女的代码中,无法在醒来后判断是否满足资源并继续 wait。

解决办法:用 while + wait,当被唤醒后发现条件还是不成立,就再次 wait。

# step-4/例 4:使用 while + wait

之前的 if + wait 的代码如下:

if (!hasCigarette) {
    log.debug("没烟,先歇会!");
    try {
        room.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8

现在改成 while + wait:

while (!hasCigarette) {
    log.debug("没烟,先歇会!");
    try {
        room.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8

这样,程序的运行就满足我们的要求了。

# 小小结

使用 while + wait 来等待资源的模板代码如下:

synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 干活
}

//另一个线程
synchronized(lock) {
    lock.notifyAll();
}
1
2
3
4
5
6
7
8
9
10
11

# 2. 案例:同步模式之保护性暂停

# 2.1 定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果。

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式
20230624215234

# 2.2 实现

class GuardedObject {
    
    private Object response; // 结果
    private final Object lock = new Object();
    
    // 获取结果
    public Object get() {
        synchronized (lock) {
            // 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
            return response; 
        }
    }
    
    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 2.3 使用示例

一个线程等待另一个线程的执行结果:

public static void main(String[] args) {
    GuardedObject guardedObject = new GuardedObject();
    
    new Thread(() -> {
        try {
            // 子线程执行下载
            List<String> response = download();
            log.debug("download complete...");
            guardedObject.complete(response);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();
    
    log.debug("waiting...");
    
    // 主线程阻塞等待
    Object response = guardedObject.get();
    log.debug("get response: [{}] lines", ((List<String>) response).size());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

执行结果:

08:42:18.568 [main] c.TestGuardedObject - waiting...
08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines
1
2
3

# 2.4 带超时版的 GuardedObject

class GuardedObjectV2 {
    private Object response;
    private final Object lock = new Object();
    
    public Object get(long millis) {
        synchronized (lock) {
            // 1) 记录最初时间
            long begin = System.currentTimeMillis();
            // 2) 已经经历的时间
            long timePassed = 0;
            
            while (response == null) {
                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                
                if (waitTime <= 0) {
                    log.debug("break...");
                    break; 
                }
                
                try {
                    lock.wait(waitTime);  // 可能被虚假唤醒
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - begin;
                
                log.debug("timePassed: {}, object is null {}", 
                          timePassed, response == null);
            }
            return response; 
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

因为在 wait 时可能被虚假唤醒,所以在再次 wait 时只需要等待 总 timeout - 已经经过了的时间 passedTime。

# 2.5 join 的实现原理

join 的实现源码如下,如果你看懂了前面的示例,那这个其实思路就与“带超时版的 GuardedObject”的实现思路是类似的,也是用了 wait-notify 机制来实现的。

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 2.6 扩展:多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理:

20230624222148
  • 中间这个集合给每个 GuardedObject 一个 id 来唯一标识它。

中间解耦类的实现如下:

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

我们便可以拿着这个 Mailboxes 来实现多个人之间的发信与收信了。

# 2.7 扩展:异步模式之生产者/消费者模式

# 2.7.1 定义

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式
20230624223310

这里的消息队列只是在 Java 的线程之间进行消息传递,与 RabbitMQ 这类还是不一样的。

# 2.7.2 实现

@Getter
@AllArgsContructor
class Message {
    private int id;
    private Object message;
}

class MessageQueue {
    private LinkedList<Message> queue;  // 作为双向队列
    private int capacity;
    
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 2.7.3 使用

MessageQueue messageQueue = new MessageQueue(2);

// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
    int id = i;
    new Thread(() -> {
        try {
            log.debug("download...");
            List<String> response = Downloader.download();
            log.debug("try put message({})", id);
            messageQueue.put(new Message(id, response));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }, "生产者" + i).start();
}

// 1 个消费者线程, 处理结果
new Thread(() -> {
    while (true) {
        Message message = messageQueue.take();
        List<String> response = (List<String>) message.getMessage();
        log.debug("take message({}): [{}] lines", message.getId(), response.size());
    }
}, "消费者").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

某次运行结果:

10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
编辑 (opens new window)
上次更新: 2023/06/24, 14:48:51
共享模型之管程 1:synchronized 和锁的优化
共享模型之管程 3:park、多把锁与活跃性

← 共享模型之管程 1:synchronized 和锁的优化 共享模型之管程 3:park、多把锁与活跃性→

最近更新
01
Deep Reinforcement Learning
10-03
02
误删数据后怎么办
04-06
03
MySQL 一主多从
03-22
更多文章>
Theme by Vdoing | Copyright © 2021-2024 yubincloud | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×