线程池
为什么需要线程池呢?当短时间有大量任务到来时,突然创建大量的线程是不明智的,因为 CPU 的核就那么几个,所以再多的线程对性能的提高也无益,反而会因为过多的上下文切换导致性能的下降。于是利用享元模式的思想,产生了“线程池”:
线程池其实就是创建一批线程,并能使这些线程能够重复利用。在学习 JDK 的线程池之前,我们先自己实现一个简单的线程池。架构图如上图。
# 1. 自定义线程池
# 步骤 1:定义拒绝策略 RejectPolicy
当任务队列满了的时候,如果生产者想要往里面继续放 task 时,我们可以为其定义多种行为,例如:
- 死等
- 带超时的等待
- 让调用者放弃任务执行
- 让调用者抛出异常
- 让调用者自己执行任务
为了让上面这些行为能够灵活配置,我们需要使用策略模式将这些行为抽象出来,形成 RejectPolicy:
package com.tobestronger.n8._8_1;
/**
* 任务拒绝策略
*
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
2
3
4
5
6
7
8
9
10
11
12
# 步骤 2:自定义任务队列 BlockingQueue
BlockingQueue
用于存放任务,生产者可以追加任务,消费者可以取出任务。
但注意,在队列满了的时候,生产者再想追加就要等待;同理,当队列为空时,消费者再想取走也是需要等待。
BlockingQueue 实现
package com.tobestronger.n8._8_1;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 任务队列
*
* @param <T>
*/
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量,生产者需要等待时依靠这个
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量,消费者需要等待时依靠这个
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
log.info("构造BlockingQueue");
this.capcity = capcity;
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if(queue.size() == capcity) {
log.info("队列已满,按照拒绝策略处理任务 {}",task);
rejectPolicy.reject(this, task);
} else { // 有空闲
log.debug("队列未满,加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# 步骤 3:自定义线程池 ThreadPool
这里将线程 Thread 封装为 Worker 方便管理。
ThreadPool 实现
package com.tobestronger.n8._8_1;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
/**
* 拒绝策略
*/
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
log.info("接收到任务需要执行: "+task);
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
log.info("coreSize未满");
Worker worker = new Worker(task);
log.debug("新增 worker {} 来执行任务 {}", worker, task);
workers.add(worker);
worker.start();
} else {
log.info("coreSize已经满了!!!!!,尝试先将任务放入队列 {}",task);
// taskQueue.put(task);
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
log.info("构造ThreadPool");
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
/**
* 工作线程
*/
class Worker extends Thread{
/**
* 执行任务主体
*/
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
/**
* 执行已有任务或从队列中获取一个任务执行.
* 如果都执行完了,就结束线程
*/
@Override
public void run() {
log.info("跑起来了,让我看看有没有task来做");
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("获取到任务了,正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
log.info("搞定一个任务 {},尝试获取新任务执行",task);
task = null;
}
}
synchronized (workers) {
log.debug("worker 因长时间没有可执行任务 将被释放 {}", this);
workers.remove(this);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# 步骤 4:测试
测试代码
package com.tobestronger.n8._8_1;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "c.TestCustomThreadPool")
public class TestCustomThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
3000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
// 1. 死等
// queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
log.info("当前拒绝策略: 让调用者自己执行任务,没有开新线程,直接调用的run()");
task.run();
});
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(() -> {
try {
log.info("我先睡1s");
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("我是第 {} 个任务,我马上执行完了", j);
});
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
某次执行的结果:
某次执行日志
Connected to the target VM, address: '127.0.0.1:49956', transport: 'socket' 18:25:21.216 c.ThreadPool [main] - 构造ThreadPool 18:25:21.225 c.BlockingQueue [main] - 构造BlockingQueue 18:25:21.228 c.ThreadPool [main] - 接收到任务需要执行: com.tobestronger.n8._8_1.TestCustomThreadPool
Process finished with exit code 0
# 2. ThreadPoolExecutor
接下来就开始介绍 JDK 为我们提供的线程池实现。首先是 ThreadPoolExecutor。
看上图可知,ExecutorService
是线程池最基本的接口:
ScheduledExecutorService
是它的扩展接口,增加了任务调度功能ThreadPoolExecutor
是它最重要的线程池实现类ScheduledThreadPoolService
则是具有任务调度功能的线程池实现
这里首先看一下 ThreadPoolExecutor 类。
# 2.1 线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。
状态名 | 高 3 位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING。
因为第一位是符号位,RUNNING 是负数,所以最小。
为什么要将这些信息存在一个 int 中呢?这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值:
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,这个函数用来合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
4
5
# 2.2 构造方法
构造方法决定了这个线程池所能具备的行为,来看一下:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数目 (最多保留的线程数)
int maximumPoolSize, // 最大线程数目
long keepAliveTime, // 生存时间 - 针对救急线程
TimeUnit unit, // 时间单位 - 针对救急线程
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂 - 可以为线程创建时起个好名字
RejectedExecutionHandler handler // 拒绝策略
)
2
3
4
5
6
7
8
9
这些概念可以类比第一大节中我们自定义实现的线程池。
线程池中的线程分为核心线程和救急线程,两者的数目之和就是最大线程数目。核心线程在创建后会一直存在,而救急线程有点像外包,在干完活后如果过段时间没活了就消失了。
线程池的工作方式如下:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排队,直到有空闲的线程。
- 【救急线程创建的条件】如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
- 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现:
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束线程资源,这个时间由 keepAliveTime 和 unit 来控制。
由于构造方法中的参数很多,直接使用不太方便,于是 JDK 的 Executors 类中提供一些工厂方法来创建各种用途的线程池。下面来看一下这个类。
# 2.3 Executors 类中的工厂方法
# 2.3.1 newFixedThreadPool 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也就无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
评价:适用于任务量已知,相对耗时的任务
# 2.3.2 newCachedThreadPool 带缓冲的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
特点:
- 核心线程数是 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
- 队列采用了
SynchronousQueue
实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
评价:整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。
# 2.3.3 newSingleThreadExecutor 单线程执行器
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2
3
4
5
6
使用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
❓ 和自己创建一个线程来工作的区别:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
❓ 与 Executors.newFixedThreadPool(1)
的区别:
Executors.newSingleThreadExecutor()
线程个数始终为 1,不能修改- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改- 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
# 2.4 提交任务(的几个方法)
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
submit 使用示例
@Slf4j(topic = "c.TestSubmit")
public class TestSubmit() {
public static void main(Stringp[] args) throws ExecutionException, InterruptionExecution {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future = pool.submit(new Callable<String>() {
@Overide
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}
});
log.debug("{}", future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
程序运行后可以输出 "ok"。
# 2.5 关闭线程池(的几个方法)
# 2.5.1 shutdown
调用 pool.shutdown()
即可。
线程池状态变为 SHUTDOWN:
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
它的具体实现:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 给整个线程池加锁
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.5.2 shutdownNow
线程池状态变为 STOP:
- 不会接收新任务
- 会将队列中的任务返回
List<Runnable>
- 并用 interrupt 的方式中断正在执行的任务
其实现为:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程(在 shutdown 方法中是只打断空闲的)
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 2.5.3 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
2
3
4
5
6
7
8
# 2.6 异步模式之工作线程模式
# 2.6.1 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。
# 2.6.2 饥饿
固定大小线程池会有饥饿现象,比如:
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
- 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
- 但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,并都开始等待上菜,但这时没人做饭了,出现饥饿问题
注意,这是饥饿问题,而不是死锁,因为再加个线程就能解决这个问题了。
解决方法可以是增加线程池的大小,不过这不是根本解决方案,根本的解决方法还是:不同的任务类型,采用不同的线程池。
# 2.6.3 线程池多大比较合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存
计算分为两种:
- CPU 密集型运算:通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费。
- I/O 密集型运算:CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
IO 密集型的经验公式如下:线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
- 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
- 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40
# 2.7 任务调度线程池
有时我们希望任务可以延时执行,或者反复执行,于是引入了任务调度线程池。
# 2.7.1 java.util.Timer
不过在加入任务调度线程池功能前,就可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
下面是使用 Timer 来实现延迟执行的示例:
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
log.debug("start...");
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
// 甚至如果task1出异常停止后,task2都不会执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 2.7.2 ScheduledExecutorService
只使用 Timer 容易由于异常导致问题,因此应该使用 ScheduledExecutorService。下面是改写后的:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
2
3
4
5
6
7
8
9
10
11
而且该 executor 的其中一个任务出现异常的话,也不会影响其他的任务执行。
如果想每隔 xx 秒执行一次任务,那么就需要 scheduleAtFixedRate() 方法,示例如下:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);
2
3
4
5
6
但如果任务的实际执行时间超过了设置的间隔,那么间隔时间会被『撑』到实际执行时间,这样不会出现任务的重叠执行,而是顺序依次执行。
与其相类似的另一个方法是:scheduleAtFixedDelay(),它是从上一个任务执行结束后开始算间隔时间。
# 2.7.3 评价
整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务。
# 2.8 正确处理执行任务异常
# 2.8.1 方法 1:主动捉异常
这样是由任务自己处理可能抛出的异常。
ExecutorService pool = Executors.newScheduledThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
2
3
4
5
6
7
8
9
# 2.8.2 方法 2:使用 Future
当任务执行出现异常时,future 的 get()
返回的不是任务的返回值,而是异常信息。示例如下:
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
2
3
4
5
6
7
8
# 2.9 案例:定时任务
如何让每周四 18:00:00 定时执行任务?
// 获得当前时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.000
LocalDateTime thursday =
now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if(now.compareTo(thursday) >= 0) {
thursday = thursday.plusWeeks(1);
}
// 计算时间差,即延时执行时间
long initialDelay = Duration.between(now, thursday).toMillis();
// 计算间隔时间,即 1 周的毫秒值
long oneWeek = 7 * 24 * 3600 * 1000;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println("开始时间:" + new Date());
executor.scheduleAtFixedRate(() -> {
System.out.println("执行时间:" + new Date());
}, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 3. 案例:Tomcat 的线程池应用
Tomcat 在哪里用到了线程池呢?这里主要看一下 Connector 组件所用到的线程池。
- LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
- Acceptor 是个死循环,只负责【接收新的 socket 连接】
- Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责【处理请求】
接下来看一下具体应用。
# 3.1 扩展了 ThreadPoolExecutor
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同:
- 如果总线程数达到 maximumPoolSize
- 这时不会立刻抛 RejectedExecutionException 异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
Tomcat 7.0.42 源码:
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command); // 先执行这个任务
} catch (RejectedExecutionException rx) { // 捕获抛出的拒绝执行异常
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
force 方法:
在上面的 !queue.force(...)
那一行的 force 方法代码如下(TaskQueue.java 中),它尝试将任务放入队列中:
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
is rejected
}
2
3
4
5
6
7
8
# 3.2 Connector 配置
acceptThreadCount 和 pollerThreadCount 一个线程就足够了。因为 Acceptor 大部分时间是在等待的,而 Poller 利用了多路复用技术,一个线程就可以监听多个。
# 3.3 Executor 线程配置
# 3.4 Tomcat 执行任务的流程
它创建救急队列的方式与 JDK 中的有些不太一样,它不是等任务阻塞队列满了之后才创建:
# 4. Fork/Join 线程池
# 4.1 概念
Fork/Join 线程池是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池。
# 4.2 使用
它的使用分成两步:
- 首先创建一个任务对象
- 将任务对象交由 ForkJoinPool 执行
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务:
@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);
// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
然后提交给 ForkJoinPool 来执行:
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}
2
3
4
打印结果:
[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
[ForkJoinPool-1-worker-0] - join() 3
[ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
[ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
[ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
[ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
15
2
3
4
5
6
7
8