线程池

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
线程的生命周期的维护是非常麻烦的事情,所以需要一种工具来智能的管理线程,大大减轻开发的难度和优化程序的健壮性,这就是线程池的由来。
java本身已经提供了java.util.concurrent.Executor接口的实现用于创建线程池多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

线程池包含以下四个基本部分组成:

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池,销管线程池,添加新任务。
  • 工作线程(PoolWorker):线程池中线程,在没有任务时出于等待状态,可以循环的执行任务。
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等。
  • 任务队列(TaskQueue):用于存放没有处理的任务。提供一种缓冲机制。

空间换时间的操作,事先创建多个线程,放入线程池中,省去单个线程的创建于销毁的时间,专注于处理任务。

java中线程池的相关类的结构如下:

常见线程池

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
  • 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
  • 此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
  • 创建核心线程数1,最大线程数1的线程池,队列采用LinkedBlockingQueue(默认大小Integer.MAX_VALUE);
  • LinkedBlockingQueue默认大小Integer.MAX_VALUE,大量请求堆积时,可能产生oom

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 创建固定大小的线程池。创建核心线程数nThreads,最大线程数nThreads的线程池,队列采用阻塞队列LinkedBlockingQueue(默认大小Integer.MAX_VALUE);
  • 每提交一个任务就创建一个线程,直到线程数达到nThreads。
  • 线程池的大小达到nThreads后,尽管keepAliveTime为0,因为核心线程数等于最大线程数,所以闲置线程不会被回收,线程池数量一直保持在nThreads;
  • 如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  • 适用于处理并发相对比较稳定的任务。
  • LinkedBlockingQueue默认大小Integer.MAX_VALUE,大量请求堆积时,可能产生oom

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 创建一个可缓存的线程池。核心线程数0,最大线程数Integer.MAX_VALUE的线程池,队列采用SynchronousQueue;不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间;
  • 如果线程池的大小超过了处理任务所需要的线程,60秒过后回收闲置多余线程,当任务数重新增加时,线程池又可以添加新的线程来处理任务。此线程池不会对线程池大小做限制,线程池大小基本依赖于JVM能够创建的最大线程大小。
  • 既可接受吞吐量高的并发,又能在并发小的时候减少创建线程,节省资源,可伸缩性好;
  • 最大线程数Integer.MAX_VALUE,创建大量线程时,可能会oom

newScheduledThreadPool

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
  • 创建一个大小无限的线程池。此线程池支持定时周期性执行任务。
  • 创建核心线程数corePoolSize,最大线程数Integer.MAX_VALUE的线程池,
  • 采用延迟队列DelayedWorkQueue;
  • 主要执行可延迟性的,可定时周期控制的的任务;
  • 最大线程数Integer.MAX_VALUE,创建大量线程时,可能会oom

综上所述,建议用线程池类ThreadPoolExecutor自带构造方法自己根据适合的参数创建线程池。

1
2
3
4
5
6
7
8
9
10
// myThreadPool-%d中的 ” %d ” 在String中表示整数的意思:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("myThreadPool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors()+1,
200,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

实现原理

java.util.concurrent包提供了现成的线程池的实现。

  • ExecutorService
    • 真正的线程池接口。
  • ScheduledExecutorService
    • 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
  • ThreadPoolExecutor
    • ExecutorService的默认实现。
  • ScheduledThreadPoolExecutor
    • 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

基本实现:

1
ExecutorService service = Executors.newCachedThreadPool();

常见线程池实现都是基于ThreadPoolExecutor实现的。提供4个构造器,逐层调用,最终实现在第4个构造器上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

基本变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ctl 其实可以理解为单词 control 的简写, 翻译就是“控制”;
//修改此变量的操作,是原子性的;
//ctl 变量是Integer类型的,Integer.size在大多平台上是32位,
//该变量包含了两部分信息: 线程池的运行状态 (runState,前3位) ,
//和线程池内有效线程的数量 (workerCount,后29位);
//在下面的属性方法中,ctl 简写c,runState简写rs,workerCount简写wc;
//初始化为RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//32-3=29,在大多平台上Integer.size = 32 位,之所以不直接用数字29,是为了规范性
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程池最大数,1左移29位,再减1,除前3位状态位,值全为1,
//即000 11111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//前3位 111
//接受新的任务,并且处理在阻塞队列中排队的任务
private static final int RUNNING = -1 << COUNT_BITS;
//前3位 000
//不再接受新任务,继续处理在阻塞队列排队中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//前3位 001
//不在接受新任务,不处理在阻塞队列排队中的任务,并且会中断正在运行中的任务
private static final int STOP = 1 << COUNT_BITS;
//前3位 010
//整理中状态,所有任务执行完,阻塞队列执行完,工作线程数为0,过渡到此//状态的工作线程会调用方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//前3位 011
//终态,上述讲的terminated()方法执行完成后的状态
private static final int TERMINATED = 3 << COUNT_BITS;


// 将整数 c 低 29位与0计算,得到ctl前3位的值,即线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY;
// 将整数 c 低 29位与1计算,得到ctl后29位的值,即池中线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl新值,即新的线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • RUNNING -> SHUTDOWN
    • 显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法;常见状态转换之一;
  • (RUNNING or SHUTDOWN) -> STOP
    • 显式调用 shutdownNow() 方法时候;常见状态转换之一;
  • SHUTDOWN -> TIDYING
    • 当线程池和任务队列为空的时候
  • STOP -> TIDYING
    • 当线程池为空的时候
  • TIDYING -> TERMINATED
    • 当 terminated() hook 方法执行完成时候

其他字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//线程池主锁
private final ReentrantLock mainLock = new ReentrantLock();

//存放worker的HashSet
private final HashSet<Worker> workers = new HashSet<Worker>();

//主锁等待条件队列,用于awaitTermination方法,在调用shutdown()或者//shutdownNow()方法后,其实线程池并没有真正关闭,这个方法可以设定一
//个超时时间,来检查在时间内线程池是否真正已经达到TERMINATED状态;
private final Condition termination = mainLock.newCondition();

//线程池曾经创建过的最大线程数量。通过getLargestPoolSize()方法可以知道//线程池是否满过,也就是达到了maximumPoolSize
private int largestPoolSize;

//已完成任务数
private long completedTaskCount;


//默认为false,核心线程池不会被回收,哪怕在闲置以后;
//如果设置为true,就在keepAliveTime到达超时后回收;
private volatile boolean allowCoreThreadTimeOut

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • int corePoolSize (核心线程数大小):当提交一个任务到线程池时,即便目前池中有空闲线程能够执行任务,线程池也会创建一个新线程来执行任务,直到池中线程数达到corePoolSize 数量;在这里要注意一点,线程池刚创建的时候,其中并没有创建任何线程数,而是等有任务提交后再去创建线程,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法 ,这样才会预先创建 corePoolSize 数量的线程。
  • int maximumPoolSize (线程池最大线程数):线程池允许创建的最大线程数,如果线程池队列已满,并且已创建线程数小于最大线程数maximumPoolSize,则线程池会再创建新的线程执行任务,所以在通常情况下,maximumPoolSize >= corePoolSize , 值得注意的是,如果使用了无界队列,最大线程数此参数就没意义了。
  • long keepAliveTime (线程闲置存活时间):此参数默认在当前线程数大于 corePoolSize 的情况下才会起作用, 当这些多余的线程,空闲时间达到 keepAliveTime 的时候就会终止,直至线程数目减到 corePoolSize 。不过如果调用了 allowCoreThreadTimeOut 方法,则当线程数目小于 corePoolSize 的时候也会起作用,keepAliveTime到达后闲置线程也会终止;
  • TimeUnit unit (keelAliveTime的时间单位):keelAliveTime的时间单位。
    • TimeUnit.NANOSECONDS 纳秒
    • TimeUnit.MICROSECONDS 微秒
    • TimeUnit.MILLISECONDS 毫秒
    • TimeUnit.SECONDS 秒
    • TimeUnit.MINUTES 分
    • TimeUnit.HOURS 时
    • TimeUnit.DAYS 天
  • BlockingQueue workQueue (阻塞队列):阻塞队列,用来存储等待执行的任务,这个参数也是非常重要的,在这里简单介绍一下几个阻塞队列。
    • ArrayBlockingQueue:这是一个基于数组结构的有界阻塞队列,此队列按照FIFO的先进先出规则,进和出用的是同一个可重入锁。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照FIFO排序元素,进和出分别采用两把锁,所以吞吐量通常要高于ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ThreadFactory threadFactory(创建线程池工厂):用于设置创建线程的工厂,默认是使用Executors.defaultThreadFactory(),可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给池里的线程设置有意义的名字,代码:new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
  • RejectedExecutionHandler handler(饱和策略);当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。从JDK 1.5开始, Java线程池框架提供了4种策略
    • AbortPolicy:直接丢弃并且抛出 RejectedExecutionException 异常(默认)
    • CallerRunsPolicy:这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行;
    • DiscardOldestPolicy:如果执行程序尚未关闭,则位于阻塞队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程);
    • DiscardPolicy:直接丢弃后面提交的任务并且不抛出异常。

Work对象

线程池中的线程,都被封装成了Worker对象,线程池维护的一组Worker对象,而Worker对象又继承AbstractQueuedSynchronizer类,使用AQS来实现独占锁的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 线程对象,真正在执行任务的活动线程 */
final Thread thread;
/**这里的 Runnable 是任务:在创建线程后需要执行的第一个任务,
如果这个传入对象为空,在线程启动之后,不是执行空的任务,
而是会到阻塞任务队列中自行取任务执行*/
Runnable firstTask;
/**该线程完成的任务数 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); //将state设置为-1,是为了防止在执行任务前对线程进行中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**调用外部主类的 runWorker 方法来运行线程 */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

主要方法

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取当前值
int c = ctl.get();
//如果当前线程数小于核心线程数,创建新线程执行该任务
if (workerCountOf(c) < corePoolSize) {
//第二个参数为true的时候,表示用corePoolSize
//作为线程池工作线程数大小的上限,否则使用
//maximumPoolSize作为线程池工作线程数上限;
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池处于运行状态,并且插入阻塞队列排队成功
if (isRunning(c) && workQueue.offer(command)) {
//再检查一次ctl变量
int recheck = ctl.get();
//如果线程池处于非运行状态,
//如果现在正在关闭线程池,并且从阻塞队列成功移除当前线程;
//就执行拒绝策略;
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池还是运行状态的,并且当前池中可用线程为0,
//那么需要开启新的线程,并且addWorker方法第二个参数
//为false,表示最大线程数可到maximumPoolSize;
//防止任务在阻塞队列中后,但是可运行线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果执行到这个if分支,有两种情况:
//a、线程池非RUNNING状态;
//b、或者线程池是RUNNING状态,但workerCount >= corePoolSize
//并且workQueue.offer返回false(说明阻塞队列已满),这时调用
//addWorker方法,但第二个参数传入为false,将线程池的有限线程数量
//的上限设置为maximumPoolSize;
else if (!addWorker(command, false))
//添加线程任务失败,对当前线程执行拒绝策略
reject(command);
}

addWorker

addWorker()方法,主要作用是在线程池中创建一个新的线程并执行添加进来的任务,并且firstTask是新增线程需要执行的第一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//循环直到满足true或false,作为addWorker方法返回值
for (;;) {
//获取线程池ctl变量来得到运行状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//1、如果线程池不是SHUTDOWN或者后面的状态
//(STOP, TIDYING, 或 TERMINATED):
//则不满足if条件,那么逻辑向下执行,创建新线程接收执行任务;
// false && ? == false;

//2、如果线程池状态是SHUTDOWN状态:
//(a)、并且下列语句条件只有全满足true时,
// r s == SHUTDOWN && firstTask == null &&
// ! workQueue.isEmpty();
//即线程池目前SHUTDOWN状态,新传入的任务对象为空,
//并且阻塞任务队列不为空;
//那么方法不返回,继续向下执行,因为目前队列中还有任务,
//所以线程池继续运行,允许创建线程执行任务;
// true && ! (true && true&& !false) == false

//(b)、SHUTDOWN状态,如果传入firstTask任务对象不为空,
//不再新增任务线程,方法返回false;
// true && ! (true && false&& ? ) == true

//(c)、SHUTDOWN状态,如果传入firstTask任务对象为空,
//并且目前任务阻塞队列为空,队列都空了,所以无需新增任务线程;
//方法返回false;
// true && ! (true && true&& ! ture ) == true

if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
//获取线程数;
int wc = workerCountOf(c);
//根据core的boolean值,来决定取,
//当前线程数wc是否超过CAPACITY和corePoolSize中最小值,
//还是超过CAPACITY和maximumPoolSize中最小值;
//如果超过了,那么返回不再添加新线程执行任务;
//所以如果core传入false的话,那么运行线程池线程执行数
//增加到maximumPoolSize;
//core传入true的话,当前线程池线程数大于corePoolSize后,
//不允许再增加线程数,addWorker方法返回false;
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//尝试CAS增加workerCount线程数,
//如果添加成功则跳出最外层for循环,
//并且将workerCount加1
if (compareAndIncrementWorkerCount(c))
break retry;
//竞争添加失败,重新获取状态值,
//跳出内for循环,从外for下一次循环继续开始;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//执行到此处,说明工作线程数workerCount已经+1
// worker是否已启动
boolean workerStarted = false;
// worker是否已添加成功
boolean workerAdded = false;
Worker w = null;
try {
// 把firstTask创建成worker对象,
//worker构造方法会从ThreadFactory创建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//获取整个线程池对象的锁,
//在关闭线程池的时候,需要获取锁,
//那么在锁期间,说明线程池不会被执行关闭
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取运行状态
int rs = runStateOf(ctl.get());
//如果是SHUTDOWN之前的状态(只有RUNNING),
//或者现在是SHUTDOWN并且传入的firstTask为空;
//校验当前线程是否启动,因为workder中的线程不允许
//是启动的,已启动抛出IllegalThreadStateException
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到worker队列
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加worker队列成功后,启动这个线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//添加worker失败,addWorkerFailed方法善后工作
addWorkerFailed(w);
}
return workerStarted;
}

//addWorkerFailed方法善后工作
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//从worker队列中移除
workers.remove(w);
//将workerCount减1
decrementWorkerCount();
//尝试将线程池置为TERMINATED状态,当对线程池执行了
//非正常逻辑的操作时,都需执行tryTerminate判断是否结束线程池
tryTerminate();
} finally {
//在finally块中释放锁
mainLock.unlock();
}
}

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环调用getTask方法获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果线程池状态大于等于 STOP,线程就应中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//目前什么都没做,留给子类实现的方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//同样留给子类的方法
afterExecute(task, thrown);
}
} finally {
task = null;
//完成任务数+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//当上面的while循环中getTask方法执行完毕以后,线程队列为空了,
//统计整个线程池完成的任务个数,并从工作队列删除当前woker,
//执行善后清理工作
processWorkerExit(w, completedAbruptly);
}
}

getTask

getTask方法返回null时,在之前runWorker方法中会跳出while循环,runWorker会执行finally块中的processWorkerExit方法:
在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize,并且workQueue已满时,则可以增加工作线程数到maximumPoolSize,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,那么当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,,如果之前设置过allowCoreThreadTimeOut为true,那么运行线程池数量降至corePoolSize以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 获取线程状态
int rs = runStateOf(c);
// 如果线程池关闭或者正在关闭,那么执行workerCount减1
//方法返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// cas方法,减少线程数workerCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果之前allowCoreThreadTimeOut设置为true,
//或者当前池中运行线程大于corePoolSize,
//timed就为true;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//timedOut如果为true,说明上次循环有超时发生,
// timed && timedOut 如果为true,表示需要超时超数控制,
// 判断workerCount是否大于最大线程池数或者需超时空总控制,
//是的话workerCount减1,减1成功后返回null,
//不成功则continue继续执行
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//到 workQueue 中获取任务
//如果上面的timed为true,则用poll超时方法从
//阻塞队列中取线程对象;否则用take方法取线程对象
//阻塞队列中,take方法是阻塞的,如果队列元素为0则一直阻塞等待
//而poll方法是非阻塞的,这里给定超时时间,获取不到直接返回;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果r不为空,说明已经从阻塞队列获取到线程对象,
//否则没获取到线程对象,设置timedOut为true;
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit

runWorker()方法执行完任务后,或者执行任务过程中出现异常中断执行的时,相对应的操作方法;
并且这里会移除线程池中的线程,因为异常情况添加到池中的线程没有执行的,这里移除以后重新添加线程执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果正常执行完成,runWorker方法中completedAbruptly传的false
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//非正常执行完,线程池工作线程数减1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//完成任务数
completedTaskCount += w.completedTasks;
//从线程池中移除一个工作线程,那么这个线程就销毁了;
//这里只负责移除,在runWorker方法执行完之后,也就是Worker
//中的run方法执行完,由JVM自动回收;
workers.remove(w);
} finally {
mainLock.unlock();
}
//非正常逻辑的操作时,都需执行tryTerminate判断是否结束线程池
tryTerminate();

int c = ctl.get();
//如果是STOP之前的状态,即RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
//如果是正常结束
if (!completedAbruptly) {
//allowCoreThreadTimeOut为true,
//允许核心线程数收回,设置min为0,否则为核心线程数大小
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果线程队列中还有线程,那么至少保留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当前线程数比min(0或corePoolSize)大,无需新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//否则应该是异常情况,重新添加一个线程执行任务;
addWorker(null, false);
}
}

submit

1
2
3
4
5
6
7
8
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
//实际上执行的是execute方法,但是这里会有Future返回值,
//后续可以通过Future调用get方法异步阻塞获取线程返回值;
execute(ftask);
return ftask;
}

shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查关闭权限,如果设置了安全管理,则看当前调用 shutdown
//方法的线程是否有关闭线程的权限
checkShutdownAccess();
//检查当前状态如果是SHUTDOWN之后的状态则返回,
//如果不是SHUTDOWN状态,设置当前状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
//这个方法有必要分析一下,往下看,从中文名称翻译来讲:
//打断空闲的工作者
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//结束线程池
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//这里为什么要去获取w线程对象的锁,之前我们在看
//addWorker()和runWorker()方法的时候,看到了对worker加
//锁,说明正在入队的线程和正在执行的线程,不会中断他们;
//这就是与下面shutdownNow()关闭线程池方法的区别所在;
if (!t.isInterrupted() && w.tryLock()) {
try {
//满足上面的条件:线程非中断状态,并且获取锁
//则中断当前线程;
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//释放当前线程锁
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
//释放线程池主锁
mainLock.unlock();
}
}

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭权限
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//结束线程池
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//这里就比较简单粗暴,直接去打断线程,并不检查当前线程是否正在
//运行或者已经添加到队列中;
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
//我们看到shutdownNow()方法的返回类型是List<Runnable>,
//这个方法就是把目前阻塞队列中的元素都移除,并且封装到List集合里,
//返回给shutdownNow的调用者
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

tryTerminate

之前的关闭线程池方法和一些方法非正常执行后,都会调用tryTerminate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//RUNNING或TIDYING或(SHUTDOWN并且队列不为空)
//这时候不会关闭线程池;
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果池中线程数量不为0,则只中断一个空闲的线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果设置线程池状态为TIDYING成功,
//则调用terminated方法(此方法内容为空,想做啥留给实现子类)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//最后设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒termination条件,在这个变量注释处提到过
//的awaitTermination方法有用到
//termination.awaitNanos(nanos)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}