本篇内容主要讲解“java线程池源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java线程池源码分析”吧!
当提交一个任务时,如果当前线程数小于corePoolSize,就会创建一个线程。即使其他有可用的空闲线程。
用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列:
1.ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2.LinkedBlockingQueue 一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
3.SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等上一个元素被移除之后,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
4.PriorityBlockingQueue 一个具有优先级的无限阻塞队列。
不同的runnableTaskQueue对线程池运行逻辑有很大影响
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
线程执行结束后,保持存活的时间。 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
线程池队列饱和之后的执行策略,默认是采用AbortPolicy。JDK提供四种实现方式:
AbortPolicy:直接抛出异常
CallerRunsPolicy :只用调用者所在线程来运行任务
DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy : 不处理,丢弃掉
keepalive的时间单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
我们来看看 Executors.newCachedThreadPool() 里面的构造:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize 为 0,意味着核心线程数是 0。
maximumPoolSize 是 Integer.MAX_VALUE ,意味这可以一直往线程池提交任务,不会执行 reject 策略。
keepAliveTime 和 unit 决定了线程的存活时间是 60s,意味着一个线程空闲60s后才会被回收。
reject 策略是默认的 AbortPolicy,当线程池超出最大限制时抛出异常。不过这里 CacheThreadPool 的没有最大线程数限制,所以 reject 策略没用。
runnableTaskQueue 是 SynchronousQueue。该队列的特点是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。使用该队列是实现 CacheThreadPool 的关键之一。
SynchronousQueue 的详细原理参考这里
我们看看 CacheThreadPool 的注释介绍,大意是说当有任务提交进来,会优先使用线程池里可用的空闲线程来执行任务,但是如果没有可用的线程会直接创建线程。空闲的线程会保留 60s,之后才会被回收。这些特性决定了,当需要执行很多短时间的任务时,CacheThreadPool 的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool 并不会占用很多资源。
那么问题来了:
CacheThreadPool 如何实现线程保留60s。
CacheThreadPool 如何实现线程复用。
首先我们向线程池提交任务一般用 execute() 方法,我们就从这里入手:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1.返回包含线程数以及线程状态Integer类型的数值
int c = ctl.get();
// 如果工作线程数小于核心线程数,则创建线程并执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果失败,防止外部已经在线程池中加入新任务,重新获取下
c = ctl.get();
}
// 2.只有线程处于RUNNING状态,才执行后半句:置入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池不是RUNNING状态,则将刚加入的移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果之前的线程已经被消费完,则新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 没有被消费完,只将任务放入队列
}
// 3.如果task不能加入到队列,会尝试创建一个新线程。
else if (!addWorker(command, false))
// 如果创建失败,走reject流程
reject(command);
第一步比较简单,如果当前运行的线程少于核心线程,调用 addWorker(),创建一个线程。但是因为 CacheThreadPool 的 corePoolSize 是0,所以会跳过这步,并不会创建核心线程。
关键在第二步,首先判断了线程池是否运行状态,紧接着调用 workQueue.offer() 往对列添加 task 。 workQueue 是一个 BlockingQueue ,我们知道 BlockingQueue.offer() 方法是向队列插入元素,如果成功返回 true ,如果队列没有可用空间返回 false 。
CacheThreadPool 用的是 SynchronousQueue ,前面了解过 SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必须被其他线程取出,才能塞入下一个元素。等会我们再来看看哪里是从 SynchronousQueue 取出元素。
这里当任务入队列成功后,再次检查了线程池状态,还是运行状态就继续。然后检查当前运行线程数量,如果当前没有运行中的线程,调用 addWorker() ,第一个参数为 null 第二个参数是 false ,标明了非核心线程。
为什么这里 addWorker() 第一个方法要用null?带着这个疑问,我们来看看 addWorker() 方法:
大概翻译了下
检查是否可以添加新 worker ,在线程池状态和给定的边界(核心数或最大数)。
如果可以,则计数线程数,并且创建并启动新工作程序,以firstTask作为其运行第一项任务。
如果池已停止或有资格关闭,则此方法返回false。
如果线程工厂在询问时无法创建线程,它也会返回false。
如果线程创建失败,则由于线程工厂返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError),我们干净地回滚。
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* 使用 corePoolSize 绑定做校验为 true,maximumPoolSize 绑定做校验为 false,
* @param core if true use corePoolSize as bound, else maximumPoolSize.
*
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// continue retry 快速推多层循环嵌套
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 当前线程数量+1
if (compareAndIncrementWorkerCount(c))
break retry;
// 获取当前线程数
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁。持有主锁防止干扰。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 将任务包装成 worker 对象,用线程安全的方式添加到当前工作 HashSet()里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 线程 start 并执行 run方法处理 runWorker() 执行 task
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 创建失败减去线程数
addWorkerFailed(w);
}
return workerStarted;
}
源代码比较长,这里省略了一部分。过程主要分成两步, 第一步是一段 cas 代码通过双重循环检查状态并为当前线程数扩容 +1, 第二部是将任务包装成 worker 对象,用线程安全的方式添加到当前工作 HashSet() 里,并开始执行线程。 终于读到线程开始执行的地方了,里程碑式的胜利啊同志们!
但是我们注意到,task 为 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?
继续跟踪代码,Worker 类继承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:
public void run() {
runWorker(this);
}
继续进入 runWorker() 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取task
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// getTask() 获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 退出自旋,进入finally代码块。调用processWorkerExit方法,注销当前Worker,实现worker的销毁
processWorkerExit(w, completedAbruptly);
}
}
可以看到这里判断了 firstTask 如果为空,就调用 getTask() 方法。getTask() 方法是从 workQueue 拉取任务。 所以到这里之前的疑问就解决了,调用 addWorker(null,false) 的目的是启动一个线程,然后再 workQueue 拉取任务执行。
继续跟踪 getTask() 方法:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 当allowCoreThreadTimeout(运行空闲核心线程超时)
// 或 wc>corePoolSize(当前线程数量大于核心线程数量) 时,timed会标识为true,表示需要进行超时判断。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当wc(当前工作者数量)大于 最大线程数 或 空闲线程的空闲时间大于keepAliveTime(timed && timeout),
// 以及wc>1或(workQueue)任务队列为空时,会进入compareAndDecrementWorkerCount方法,对wc的值减1。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 当compareAndDecrementWorkerCount方法返回true时,则getTask方法会返回null,终止getTask方法的自旋。
// 这时候回到runWorker方法,就会进入到processWorkerExit方法,进行销毁worker。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// timed 为 true 时,进行poll处理,超时后线程就会会被回收
Runnable r = timed ?
// poll(time):取走BlockingQueue里排在首位的对象,
// 若不能立即取出,则可以等time参数规定的时间,取不到时返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// take():取走BlockingQueue里排在首位的对象,
// 若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
终于看到从 workQueue 拉取元素了。
CacheThreadPool 构造的时候 corePoolSize 是 0,allowCoreThreadTimeOut 默认是 false ,因此 timed 一直为 true ,会调用 workQueue.poll() 从队列拉取一个任务,等待 60s, 60s后超时,线程就会会被回收。
如果 60s 内,进来一个任务,会发生什么情况?任务在 execute() 方法里,会被 offer() 进 workQueue ,因为目前队列是空的,所以 offer 进来后,马上会被阻塞的 worker.poll() 拉取出来,然后在 runWorker() 方法里执行,因为线程没有新建所以达到了线程的复用。
至此,我们已经明白了线程复用的秘密,以及线程保留 60s 的实现方法。回到 execute() 方法,还有剩下一个逻辑 如果task不能加入到队列,会尝试创建线程。如果创建失败,走reject流程
else if (!addWorker(command, false))
reject(command);
因为 CacheThreadPool 用的 SynchronousQueue ,所以没有空闲线程, SynchronousQueue 有一个元素正在被阻塞,那么就不能加入到队列里。会走到 addWorker(commond,false) 这里,这个时候因为就会新建线程来执行任务。如果 addWorker() 返回 false 才会走 reject 策略。
那么什么时候 addWorker() 什么时候会返回false呢?我们看代码:
private boolean addWorker(Runnable firstTask, boolean core){
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.线程池已经shutdown,或者提交进来task为ull且队列也是空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 2.如果需要创建核心线程但是当前线程已经大于corePoolSize 返回false,
// 如果是非核心线程但是已经超出maximumPoolSize,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
//省略代码。。。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//省略代码。。。
}
}
}
//省略代码。。。
}
addWorker() 有以下情况会返回 false :
线程池已经 shutdown,或者提交进来 task 为ull且同时任务队列也是空,返回 false。
如果需要创建核心线程但是当前线程已经大于 corePoolSize 返回 false,
如果是非核心线程但是已经超出 maximumPoolSize ,返回 false。
创建线程后,检查是否已经启动。
我们逐条检查。 第一点只有线程池被 shutDown() 才会出现。 第二点由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize 是 Intger.MAX_VALUE ,所以也不会出现。 第三点是保护性错误,我猜因为线程允许通过外部的 ThreadFactory 创建,所以检查了一下是否外部已经 start,如果开发者编码规范,一般这种情况也不会出现。
综上,在线程池没有 shutDown 的情况下,addWorker() 不会返回 false ,不会走reject流程,所以理论上 CacheThreadPool 可以一直提交任务,符合CacheThreadPool注释里的描述。
Executors 还提供了这么一个方法 Executors.newFixedThreadPool(4) 来创建一个有固定线程数量的线程池,我们看看创建的参数:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
参数中核心线程和最大线程一样,线程保留时间 0 ,使用 LinkedBlockingQueue 作为任务队列,这样的线程池有什么样的特性呢?我们看看注释说明,大意是说这是一个有着固定线程数量且使用无界队列作为线程队列的线程池。如果有新的任务提交,但是没有线程可用,这个任务会一直等待直到有可用的线程。如果一个线程因为异常终止了,当线程不够用的时候会再创建一个出来。线程会一直保持,直到线程池 shutDown。
和 CacheThreadPool 相比,FixedThreadPool 注释里描述的特性有几个不同的地方。
因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。
在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。
由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool 占用资源更多。
FixedThreadPool 和 CacheThreadPool 也有相同点,都使用无界队列,意味着可用一直向线程池提交任务,不会触发 reject 策略。
到此,相信大家对“java线程池源码分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/Lien6o/blog/3093824