线程池 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。 线程的生命周期的维护是非常麻烦的事情,所以需要一种工具来智能的管理线程,大大减轻开发的难度和优化程序的健壮性,这就是线程池的由来。 java本身已经提供了java.util.concurrent.Executor接口的实现用于创建线程池多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
线程池包含以下四个基本部分组成:
线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池,销管线程池,添加新任务。
工作线程(PoolWorker):线程池中线程,在没有任务时出于等待状态,可以循环的执行任务。
任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等。
任务队列(TaskQueue):用于存放没有处理的任务。提供一种缓冲机制。
空间换时间的操作,事先创建多个线程,放入线程池中,省去单个线程的创建于销毁的时间,专注于处理任务。
java中线程池的相关类的结构如下:
常见线程池 newSingleThreadExecutor 1 2 3 4 5 6 public static ExecutorService new SingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0 L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
创建核心线程数1,最大线程数1的线程池,队列采用LinkedBlockingQueue(默认大小Integer.MAX_VALUE);
LinkedBlockingQueue默认大小Integer.MAX_VALUE,大量请求堆积时,可能产生oom
newFixedThreadPool 1 2 3 4 5 public static ExecutorService new FixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0 L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
创建固定大小的线程池。创建核心线程数nThreads,最大线程数nThreads的线程池,队列采用阻塞队列LinkedBlockingQueue(默认大小Integer.MAX_VALUE);
每提交一个任务就创建一个线程,直到线程数达到nThreads。
线程池的大小达到nThreads后,尽管keepAliveTime为0,因为核心线程数等于最大线程数,所以闲置线程不会被回收,线程池数量一直保持在nThreads;
如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
适用于处理并发相对比较稳定的任务。
LinkedBlockingQueue默认大小Integer.MAX_VALUE,大量请求堆积时,可能产生oom
newCachedThreadPool 1 2 3 4 5 public static ExecutorService new CachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60 L, TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
创建一个可缓存的线程池。核心线程数0,最大线程数Integer.MAX_VALUE的线程池,队列采用SynchronousQueue;不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间;
如果线程池的大小超过了处理任务所需要的线程,60秒过后回收闲置多余线程,当任务数重新增加时,线程池又可以添加新的线程来处理任务。此线程池不会对线程池大小做限制,线程池大小基本依赖于JVM能够创建的最大线程大小。
既可接受吞吐量高的并发,又能在并发小的时候减少创建线程,节省资源,可伸缩性好;
最大线程数Integer.MAX_VALUE,创建大量线程时,可能会oom
newScheduledThreadPool 1 2 3 4 5 6 7 public static ScheduledExecutorService new ScheduledThreadPool(int corePoolSize ) { return new ScheduledThreadPoolExecutor(corePoolSize ) ; } public ScheduledThreadPoolExecutor(int corePoolSize ) { super(corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue() ); }
创建一个大小无限的线程池。此线程池支持定时周期性执行任务。
创建核心线程数corePoolSize,最大线程数Integer.MAX_VALUE的线程池,
采用延迟队列DelayedWorkQueue;
主要执行可延迟性的,可定时周期控制的的任务;
最大线程数Integer.MAX_VALUE,创建大量线程时,可能会oom
综上所述,建议用线程池类ThreadPoolExecutor自带构造方法自己根据适合的参数创建线程池。
1 2 3 4 5 6 7 8 9 10 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("myThreadPool-%d" ) .build() ; ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime () .availableProcessors() +1 , 200 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100 ), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy() );
实现原理 java.util.concurrent包提供了现成的线程池的实现。
ExecutorService
ScheduledExecutorService
能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ThreadPoolExecutor
ScheduledThreadPoolExecutor
继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
基本实现:
1 ExecutorService service = Executors.newCachedThreadPool();
常见线程池实现都是基于ThreadPoolExecutor实现的。提供4个构造器,逐层调用,最终实现在第4个构造器上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler ); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler ); ... }
基本变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;
状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~CAPACITY; private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
RUNNING -> SHUTDOWN
显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法;常见状态转换之一;
(RUNNING or SHUTDOWN) -> STOP
显式调用 shutdownNow() 方法时候;常见状态转换之一;
SHUTDOWN -> TIDYING
STOP -> TIDYING
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候
其他字段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile boolean allowCoreThreadTimeOut
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler ; }
int corePoolSize (核心线程数大小):当提交一个任务到线程池时,即便目前池中有空闲线程能够执行任务,线程池也会创建一个新线程来执行任务,直到池中线程数达到corePoolSize 数量;在这里要注意一点,线程池刚创建的时候,其中并没有创建任何线程数,而是等有任务提交后再去创建线程,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法 ,这样才会预先创建 corePoolSize 数量的线程。
int maximumPoolSize (线程池最大线程数):线程池允许创建的最大线程数,如果线程池队列已满,并且已创建线程数小于最大线程数maximumPoolSize,则线程池会再创建新的线程执行任务,所以在通常情况下,maximumPoolSize >= corePoolSize , 值得注意的是,如果使用了无界队列,最大线程数此参数就没意义了。
long keepAliveTime (线程闲置存活时间):此参数默认在当前线程数大于 corePoolSize 的情况下才会起作用, 当这些多余的线程,空闲时间达到 keepAliveTime 的时候就会终止,直至线程数目减到 corePoolSize 。不过如果调用了 allowCoreThreadTimeOut 方法,则当线程数目小于 corePoolSize 的时候也会起作用,keepAliveTime到达后闲置线程也会终止;
TimeUnit unit (keelAliveTime的时间单位):keelAliveTime的时间单位。
TimeUnit.NANOSECONDS 纳秒
TimeUnit.MICROSECONDS 微秒
TimeUnit.MILLISECONDS 毫秒
TimeUnit.SECONDS 秒
TimeUnit.MINUTES 分
TimeUnit.HOURS 时
TimeUnit.DAYS 天
BlockingQueue workQueue (阻塞队列):阻塞队列,用来存储等待执行的任务,这个参数也是非常重要的,在这里简单介绍一下几个阻塞队列。
ArrayBlockingQueue:这是一个基于数组结构的有界阻塞队列,此队列按照FIFO的先进先出规则,进和出用的是同一个可重入锁。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照FIFO排序元素,进和出分别采用两把锁,所以吞吐量通常要高于ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
ThreadFactory threadFactory(创建线程池工厂):用于设置创建线程的工厂,默认是使用Executors.defaultThreadFactory(),可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给池里的线程设置有意义的名字,代码:new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
RejectedExecutionHandler handler(饱和策略);当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。从JDK 1.5开始, Java线程池框架提供了4种策略
AbortPolicy:直接丢弃并且抛出 RejectedExecutionException 异常(默认)
CallerRunsPolicy:这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行;
DiscardOldestPolicy:如果执行程序尚未关闭,则位于阻塞队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程);
DiscardPolicy:直接丢弃后面提交的任务并且不抛出异常。
Work对象 线程池中的线程,都被封装成了Worker对象,线程池维护的一组Worker对象,而Worker对象又继承AbstractQueuedSynchronizer类,使用AQS来实现独占锁的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833 L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run() { runWorker(this ); } protected bool ean isHeldExclusively() { return getState() != 0 ; } protected bool ean tryAcquire(int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected bool ean tryRelease(int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock() { acquire(1 ); } public bool ean tryLock() { return tryAcquire(1 ); } public void unlock() { release(1 ); } public bool ean isLocked() { return isHeldExclusively(); } void int erruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.int errupt(); } catch (SecurityException ignore) { } } } }
主要方法 execute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void execute(Runnable command) { if (command == null) throw new NullPointerException() ; int c = ctl.get() ; if (workerCountOf(c ) < corePoolSize) { if (addWorker(command , true ) ) return; c = ctl.get() ; } if (isRunning(c ) && workQueue.offer(command)) { int recheck = ctl.get() ; if (! isRunning(recheck ) && remove(command)) reject(command); else if (workerCountOf(recheck ) == 0 ) addWorker(null , false ) ; } else if (!addWorker(command , false ) ) reject(command); }
addWorker addWorker()方法,主要作用是在线程池中创建一个新的线程并执行添加进来的任务,并且firstTask是新增线程需要执行的第一个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 循环直到满足true或false,作为addWorker方法返回值 for (;;) { // 获取线程池ctl变量来得到运行状态 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1 、如果线程池不是SHUTDOWN或者后面的状态 // (STOP, TIDYING, 或 TERMINATED): // 则不满足if 条件,那么逻辑向下执行,创建新线程接收执行任务; // false && ? == false; // 2 、如果线程池状态是SHUTDOWN状态: // (a)、并且下列语句条件只有全满足true时, // r s == SHUTDOWN && firstTask == null && // ! workQueue.isEmpty(); // 即线程池目前SHUTDOWN状态,新传入的任务对象为空, // 并且阻塞任务队列不为空; // 那么方法不返回,继续向下执行,因为目前队列中还有任务, // 所以线程池继续运行,允许创建线程执行任务; // true && ! (true && true&& !false) == false // (b)、SHUTDOWN状态,如果传入firstTask任务对象不为空, // 不再新增任务线程,方法返回false; // true && ! (true && false&& ? ) == true // (c)、SHUTDOWN状态,如果传入firstTask任务对象为空, // 并且目前任务阻塞队列为空,队列都空了,所以无需新增任务线程; // 方法返回false; // true && ! (true && true&& ! ture ) == true if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程数; int wc = workerCountOf(c); // 根据core的boolean值,来决定取, // 当前线程数wc是否超过CAPACITY和corePoolSize中最小值, // 还是超过CAPACITY和maximumPoolSize中最小值; // 如果超过了,那么返回不再添加新线程执行任务; // 所以如果core传入false的话,那么运行线程池线程执行数 // 增加到maximumPoolSize; // core传入true的话,当前线程池线程数大于corePoolSize后, // 不允许再增加线程数,addWorker方法返回false; if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试CAS增加workerCount线程数, // 如果添加成功则跳出最外层for 循环, // 并且将workerCount加1 if (compareAndIncrementWorkerCount(c)) break retry; // 竞争添加失败,重新获取状态值, // 跳出内for 循环,从外for 下一次循环继续开始; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 执行到此处,说明工作线程数workerCount已经+1 // worker是否已启动 boolean workerStarted = false; // worker是否已添加成功 boolean workerAdded = false; Worker w = null; try { // 把firstTask创建成worker对象, // worker构造方法会从ThreadFactory创建一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 获取整个线程池对象的锁, // 在关闭线程池的时候,需要获取锁, // 那么在锁期间,说明线程池不会被执行关闭 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取运行状态 int rs = runStateOf(ctl.get()); // 如果是SHUTDOWN之前的状态(只有RUNNING), // 或者现在是SHUTDOWN并且传入的firstTask为空; // 校验当前线程是否启动,因为workder中的线程不允许 // 是启动的,已启动抛出IllegalThreadStateException if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加到worker队列 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加worker队列成功后,启动这个线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 添加worker失败,addWorkerFailed方法善后工作 addWorkerFailed(w); } return workerStarted; } // addWorkerFailed方法善后工作private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) // 从worker队列中移除 workers.remove(w); // 将workerCount减1 decrementWorkerCount(); // 尝试将线程池置为TERMINATED状态,当对线程池执行了 // 非正常逻辑的操作时,都需执行tryTerminate判断是否结束线程池 tryTerminate(); } finally { // 在finally块中释放锁 mainLock.unlock(); } }
runWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final void runWorker(Worker w ) { Thread wt = Thread . currentThread() ; Runnable task = w.firstTask; w.firstTask = null; w.unlock() ; boolean completedAbruptly = true ; try { while (task != null || (task = getTask() ) != null) { w.lock() ; if ((runStateAtLeast(ctl .get () , STOP) || (Thread .interrupted() && runStateAtLeast(ctl .get () , STOP ))) && !wt.isInterrupted() ) wt.interrupt(); try { / / 目前什么都没做,留给子类实现的方法 beforeExecute(wt , task ) ; Throwable thrown = null; try { / / 开始执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x ) ; } finally { / / 同样留给子类的方法 afterExecute(task , thrown ) ; } } finally { task = null; / / 完成任务数+1 w.completedTasks ++ ; w.unlock(); } } completedAbruptly = false ; } finally { / / 当上面的while 循环中getTask 方法执行完毕以后,线程队列为空了, / / 统计整个线程池完成的任务个数,并从工作队列删除当前woker, / / 执行善后清理工作 processWorkerExit(w , completedAbruptly ) ; } }
getTask getTask方法返回null时,在之前runWorker方法中会跳出while循环,runWorker会执行finally块中的processWorkerExit方法: 在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize,并且workQueue已满时,则可以增加工作线程数到maximumPoolSize,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,那么当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,,如果之前设置过allowCoreThreadTimeOut为true,那么运行线程池数量降至corePoolSize以下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 获取线程状态 int rs = runStateOf(c); // 如果线程池关闭或者正在关闭,那么执行workerCount减1 // 方法返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // cas方法,减少线程数workerCount decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 如果之前allowCoreThreadTimeOut设置为true, // 或者当前池中运行线程大于corePoolSize, // timed就为true; boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // timedOut如果为true,说明上次循环有超时发生, // timed && timedOut 如果为true,表示需要超时超数控制, // 判断workerCount是否大于最大线程池数或者需超时空总控制, // 是的话workerCount减1 ,减1 成功后返回null, // 不成功则continue 继续执行 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue ; } // 到 workQueue 中获取任务 // 如果上面的timed为true,则用poll超时方法从 // 阻塞队列中取线程对象;否则用take方法取线程对象 // 阻塞队列中,take方法是阻塞的,如果队列元素为0 则一直阻塞等待 // 而poll方法是非阻塞的,这里给定超时时间,获取不到直接返回; try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果r不为空,说明已经从阻塞队列获取到线程对象, // 否则没获取到线程对象,设置timedOut为true; if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
processWorkerExit runWorker()方法执行完任务后,或者执行任务过程中出现异常中断执行的时,相对应的操作方法; 并且这里会移除线程池中的线程,因为异常情况添加到池中的线程没有执行的,这里移除以后重新添加线程执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove (w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get (); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min ) return ; } addWorker(null, false ); } }
submit 1 2 3 4 5 6 7 8 public <T> Future<T> submit(Callable<T> task ) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task ); execute(ftask); return ftask; }
shutdown 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers () { interruptIdleWorkers(false ); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
shutdownNow 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public List<Runnable> shutdownNow ( ) { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock (); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers ( ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock (); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private List<Runnable> drainQueue ( ) { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0 ])) { if (q.remove (r)) taskList.add (r); } } return taskList; }
tryTerminate 之前的关闭线程池方法和一些方法非正常执行后,都会调用tryTerminate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 final void try Terminate() { for (;;) { int c = ctl.get() ; if (isRunning(c ) || runStateAtLeast(c , TIDYING) || (runStateOf(c ) == SHUTDOWN && ! workQueue .isEmpty() )) return; / / 如果池中线程数量不为0,则只中断一个空闲的线程 if (workerCountOf(c ) != 0) { / / Eligible to terminate interruptIdleWorkers(ONLY_ONE) ; return; } final ReentrantLock mainLock = this.mainLock ; mainLock .lock(); try { / / 如果设置线程池状态为TIDYING 成功, / / 则调用terminated方法(此方法内容为空,想做啥留给实现子类) if (ctl.compareAndSet(c , ctlOf (TIDYING, 0) )) { try { terminated(); } finally { / / 最后设置线程池状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0) ); / / 唤醒termination条件,在这个变量注释处提到过 / / 的awaitTermination 方法有用到 / / termination.awaitNanos(nanos ) termination.signalAll() ; } return; } } finally { mainLock .unlock(); } / / else retry on failed CAS } }