这篇文章主要介绍“java线程池的状态有几种”,在日常操作中,相信很多人在java线程池的状态有几种问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java线程池的状态有几种”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
// 高3位存放状态 低29位存线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
RUNNING = ‐1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011
RUNNING 初始化以后就是这种状态 SHUTDOWN 不再接收新任务 把已有任务完成后就变状态 STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 TIDYING(收拾/整理) 没有任务没有线程 有钩子方法 terminated() TERMINATED 线程池的具体实现 ThreadPoolExecutor 默认线程池 ScheduledThreadPoolExecutor 定时线程池 提交任务
public void execute() //提交任务无返回值
public Future<?> submit() //任务执行完成后有返回值
###参数解释
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime; unit keepAliveTime的单位;
了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到 另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有优先级的无界阻塞队列; threadFactory 它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* 记录着线程池状态和线程数量
*/
int c = ctl.get();
/**
* 如果当前线程数小于核心线程数 创建线程
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
如果在执行execute方法一直处于running状态 执行流程如下
如果wokeCount < corePoolSize 创建一个新的线程来执行任务
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;
如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新 提交的任务;
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根 据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。 注意一下addWorker(null,false) 创建线程但是没有传入任务,因为之前已经把任务放到队里里面了
firstTask参数 用 于指定新增的线程执行的第一个任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// 如果设置成功 跳出第一个循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 如果状态变了 回到的第一个循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 用任务创建了worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 放入线程不安全的set里
int s = workers.size();
if (s > largestPoolSize)
// 设置最大的池数量
largestPoolSize = s;
// 标记添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开始执行任务 因为thread是利用woker创建的 所以t.start实际就是调用的worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker对象.
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属 性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创 建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过 getThreadFactory().newThread(this); 来 新 建 一 个 线 程 , newThread 方 法 传 入 的 参 数 是 this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在 启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来 实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的 线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线 程。 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。 此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时 就不应该被中断,看一下tryAquire方法:
// tryAcquire方法是根据state是否是0来判断的,所以, 将state设置为-1是 为了禁止在执行任务前对线程进行中断。
//正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 允许被打断
// 是否异常退出循环
boolean completedAbruptly = true;
try {
// 如果task是null 从队列里拿数据
while (task != null || (task = getTask()) != null) {
// 为什么不用ReentrantLock 因为要做到不可重入
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* 如果线程池正在停止,那么要保证当前线程是中断状态;
* 如果不是的话,则要保证当前线程不是中断状态;
* Thread.interrupted() 获得当前的中断状态 复位
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// gettask结果是null 就会走这 runWorker 结束 销毁线程
processWorkerExit(w, completedAbruptly);
}
}
总结一下runWorker方法的执行过程:
while循环不断地通过getTask()方法获取任务;
getTask()方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是 中断状态;
调用task.run()执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。 这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给 子类来实现。
private Runnable getTask() {
// 判断从阻塞队列里获取任务是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池是非run状态
// 如果>= stop 说明线程正在停止 或者队列是空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少任务数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
到此,关于“java线程池的状态有几种”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/hongyanluori/blog/4551588