前言
ThreadPoolExecutor是线程池的默认实现,在使用线程池的时候,如果没有特殊要求,则直接创建ThreadPoolExecutor。如果有特殊要求,则直接继承ThreadPoolExecutor,例如ScheduledThreadPoolExecutor,它是一个可以定时执行任务的线程池。
下面就通过分析ThreadPoolExecutor的源码来进一步了解线程池的原理。
源码分析
ctl
ctl意为control,是一个重要的变量,其定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的状态
private static final int RUNNING = -1 << COUNT_BITS; //高三位111
private static final int SHUTDOWN = 0 << COUNT_BITS; //高三位000
private static final int STOP = 1 << COUNT_BITS; //高三位001
private static final int TIDYING = 2 << COUNT_BITS; //高三位010
private static final int TERMINATED = 3 << COUNT_BITS; //高三位011
//打包和解析ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
可以看到,ctl是一个AtomicInteger对象,它是对线程池运行状态和线程池中有效线程数量进行控制的字段。ctl有32位,其中高3位表示线程池状态,低29位表示线程池中的有效线程数。
线程池共有5种状态,分别如下:
- RUNNING:线程池初始化时的状态,此时可接受新的任务,也能处理任务队列中的任务。
- SHUTDOWN:调用shutdown方法后进入该状态,此时不再接受新的任务,但还会处理完任务队列中剩余的任务。
- STOP:调用shutdownNow方法后进入该状态,此时不再接受新的任务,也不会处理任务队列中剩余的任务。
- TIDYING:此时所有任务都已终止,workerCount为零,线程池进入该状态后将运行terminate方法并进入TIDYING状态。
- TERMINATED:terminated方法执行完毕后进入该状态。
Worker
Worker是ThreadPoolExecutor中的一个内部类,它既实现了Runnable接口,又继承了AQS,所以它既是一个可执行的任务,又是线程安全的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//...
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//...
}
它有两个重要的成员变量:thread和firstTask。thread表示用来处理当前任务的线程,firstTask保存第一次传进来的任务。
构造方法的参数
ThreadPoolExecutor的构造方法提供了一系列参数来配置线程池,其构造方法如下:1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
共7个参数,各参数的含义如下:
- corePoolSize:线程池的核心线程数。默认情况下,线程池是空的,当有任务提交时才会创建线程,如果当前线程数量小于corePoolSize,则创建核心线程,核心线程会在线程池中一直存活,即使它们处于闲置状态。
- maximumPoolSize:线程池所能容纳的最大线程数,当任务队列满了但线程数未达到该值时,会创建非核心线程来执行新的任务。当任务队列满了并且线程数也达到最大值时,将执行饱和策略。
- keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。如果将allowCoreThreadTimeOut设置为true,那么该时长同样会作用于核心线程。
- unit:指定keepAliveTime参数的单位。
- workQueue:任务队列,如果当前线程数大于corePoolSzie,就将新的任务添加到此队列中,该队列的类型是阻塞队列。
- threadFactory:这是一个接口,用于创建线程,所有的线程都是通过该工厂来创建。
- handler:饱和策略。当线程池和任务队列都满了的时候,会调用handler的rejectedExecution方法,默认情况下该方法直接抛出异常。其他几个可选值为:用调用者所在线程执行任务、丢弃队列中靠前的任务以及丢弃当前任务。
execute(提交任务)
要想线程池提交一个任务,可以调用execute方法或submit方法,两者的区别是:execute方法只能提交任务而不能获得任务执行的结果,submit方法既能提交任务,也能获得任务执行的结果。submit方法只是先将Runnable封装为FutureTask,最终也是调用execute方法。所以直接分析execute方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); //获取ctl的值
//如果当前有效线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//添加Worker执行任务(即创建一个线程执行该Runnable)
if (addWorker(command, true))
return;
//添加失败时重新获取ctl的值
c = ctl.get();
}
//如果当前有效线程数大于等于核心线程数,并且当前线程池的状态为RUNNING
//则将该任务添加至任务列表中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查,如果线程池的状态不是RUNNING,就将该任务从任务队列中移除,并抛出异常,拒绝该任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果发现当前的有效线程数为0,就添加一个Worker,但该Worker不执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行到这里有两种情况:
//1. 线程池已经不处于RUNNING状态
//2. 线程池处于RUNNING状态,但任务队列已满
//这时再次执行addWorker,并将线程数限制设定为最大线程数,分两种情况:
//如果线程池处于RUNNING状态,并且有效线程数小于最大线程数,那么将会创建一个新的Worker来执行该任务
//如果线程池不处于RUNNING状态,或者有效线程数达到最大线程数,又或者添加Worker失败,将会执行饱和策略
else if (!addWorker(command, false))
reject(command);
}
execute方法中又用到了addWorker方法,看下该方法的实现:
addWorker
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
addWorker方法的执行步骤如下:
- 首先根据当前线程池的状态判断是否要执行添加Worker的操作:
- 如果线程池状态为RUNNING,继续执行
- 如果线程池状态为SHUTDOWN,当传入的firstTask为空且任务队列不为空时,继续执行;否则不再执行
- 线程状态为STOP、TIDYING、TERMINATED时,不再执行
- 如果有效线程数大于等于限制数,不再执行。否则将ctl的有效线程数加1。
- 根据传入的firstTask创建Worker对象,然后将该Worker对象添加进HashSet中保存起来,由于HashSet是非线程安全的,所以该过程需要加锁。之后在添加前还要再判断一次,如果线程池的状态是RUNNING或者线程池的状态是SHUTDOWN并且firstTask为空时,就将Worker添加进HashSet中。
- 最后还要判断Worker是否启动成功,启动成功返回true,否则就要将ctl的有效线程数减1,并且从HashSet中删除该Worker对象。
小结下execute的步骤
- 如果当前有效线程数小于核心线程数,就通过addWorker方法来执行任务,在addWorker方法中,如果线程池状态和当前线程数没问题,就会创建并启动线程来执行任务。
- 如果当前有效线程数大于等于核心线程数,线程池的状态为RUNNING,且任务队列未满的话,就将任务添加至任务列表中。添加完后要再次检查:如果线程池的状态已经不是RUNNING,就将该任务从任务队列中移除,并执行饱和策略。如果发现线程池还是RUNNING状态但线程池中没有有效线程,就通过addWorker开启一个新线程来执行任务。
- 如果线程池已经不处于RUNNING状态或者线程池处于RUNNING状态,但任务队列已满。就再次执行addWorker,并将限制数设为最大线程数,这时分两种情况:如果线程池处于RUNNING状态,并且有效线程数小于最大线程数,那么将会创建一个新的Worker来执行该任务;如果线程池不处于RUNNING状态,或者有效线程数达到最大线程数,又或者是添加Worker失败,就会执行饱和策略。
其流程图如下:
执行任务
在addWorker方法中,要执行任务时,调用以下代码:1
2
3
4if (workerAdded) {
t.start();
workerStarted = true;
}
其中,t为w.thread,即Worker中的thread变量,其创建如下:1
2
3
4Worker(Runnable firstTask) {
//...
this.thread = getThreadFactory().newThread(this);
}
getThreadFactory()默认返回Executors.defaultThreadFactory,其newThread方法如下:1
2
3
4
5
6
7
8
9
10public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
可以看到,Worker作为target创建了Thread,所以调用t.start的时候就是调用了Worker的run方法:1
2
3public void run() {
runWorker(this);
}
可以看到,真正执行任务的方法是runWorker:
runWorker
1 | final void runWorker(Worker w) { |
在runWorker方法中用到了getTask方法,该方法不断地尝试从任务队列中取出任务,实现如下:
getTask
1 | private Runnable getTask() { |
小结下执行任务的步骤
真正执行任务的入口是runWorker方法,在该方法中:
- 先执行Worker的firstTask,之后再不断地尝试从任务队列中取出任务来执行
- 执行任务前需先加锁,避免同一个任务被多个线程执行。还有判断线程池的状态,如果线程池正在停止,需中断当前线程。
- 调用Runnable的run方法,真正执行任务。执行完任务后置空task,新的task通过getTask方法从任务队列中获取。
getTask的执行步骤如下:
- 先根据线程池状态判断是否要销毁当前线程:如果线程池状态为SHUTDOWN且任务队列为空,或者线程池状态为大于等于STOP,就将当前有效线程数减一并返回null,这时当前线程因得不到任务而销毁
- 再根据有效线程数判断是否要销毁当前线程,以下两种情况要销毁线程:1. 有效线程数大于最大线程数。 2. 有效线程数大于核心线程数或核心线程数也可回收,且上一次调用任务队列的poll方法超时
- 获取任务:若当前有效线程数小于等于核心线程数并且核心线程不可回收时,就从任务队列中阻塞获取任务,如果一直获取不到任务就会一直阻塞在这里,这就保证了核心线程不会被销毁。若当前有效线程数大于核心线程数或者核心线程可回收时,就阻塞超时获取任务,如果获取任务超时且下次循环时有效线程数还是大于核心线程数,就销毁当前线程。
参考
- 从源码的角度解析线程池运行原理
- Java 线程池 ThreadPoolExecutor 源码分析
- 码仔漫画:怎么给女朋友讲明白线程池?
- 《Android 进阶之光》