这篇文章主要介绍“java线程池的状态有几种”,在日常操作中,相信很多人在java线程池的状态有几种问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java线程池的状态有几种”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
// 高3位存放状态 低29位存线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; RUNNING = ‐1 << COUNT_BITS; //高3位为111 SHUTDOWN = 0 << COUNT_BITS; //高3位为000 STOP = 1 << COUNT_BITS; //高3位为001 TIDYING = 2 << COUNT_BITS; //高3位为010 TERMINATED = 3 << COUNT_BITS; //高3位为011
RUNNING 初始化以后就是这种状态 SHUTDOWN 不再接收新任务 把已有任务完成后就变状态 STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 TIDYING(收拾/整理) 没有任务没有线程 有钩子方法 terminated() TERMINATED 线程池的具体实现 ThreadPoolExecutor 默认线程池 ScheduledThreadPoolExecutor 定时线程池 提交任务
public void execute() //提交任务无返回值 public Future<?> submit() //任务执行完成后有返回值
###参数解释
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime; unit keepAliveTime的单位;
了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到 另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有优先级的无界阻塞队列; threadFactory 它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。
public long getTaskCount() //线程池已执行与未执行的任务总数 public long getCompletedTaskCount() //已完成的任务数 public int getPoolSize() //线程池当前的线程数 public int getActiveCount() //线程池中正在执行任务的线程数量
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** * 记录着线程池状态和线程数量 */ int c = ctl.get(); /** * 如果当前线程数小于核心线程数 创建线程 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
如果在执行execute方法一直处于running状态 执行流程如下
如果wokeCount < corePoolSize 创建一个新的线程来执行任务
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;
如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新 提交的任务;
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根 据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。 注意一下addWorker(null,false) 创建线程但是没有传入任务,因为之前已经把任务放到队里里面了
firstTask参数 用 于指定新增的线程执行的第一个任务
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 如果设置成功 跳出第一个循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 如果状态变了 回到的第一个循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 用任务创建了worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 放入线程不安全的set里 int s = workers.size(); if (s > largestPoolSize) // 设置最大的池数量 largestPoolSize = s; // 标记添加worker成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 开始执行任务 因为thread是利用woker创建的 所以t.start实际就是调用的worker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker对象.
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属 性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创 建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过 getThreadFactory().newThread(this); 来 新 建 一 个 线 程 , newThread 方 法 传 入 的 参 数 是 this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在 启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来 实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的 线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线 程。 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。 此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时 就不应该被中断,看一下tryAquire方法:
// tryAcquire方法是根据state是否是0来判断的,所以, 将state设置为-1是 为了禁止在执行任务前对线程进行中断。 //正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts 允许被打断 // 是否异常退出循环 boolean completedAbruptly = true; try { // 如果task是null 从队列里拿数据 while (task != null || (task = getTask()) != null) { // 为什么不用ReentrantLock 因为要做到不可重入 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 如果线程池正在停止,那么要保证当前线程是中断状态; * 如果不是的话,则要保证当前线程不是中断状态; * Thread.interrupted() 获得当前的中断状态 复位 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // gettask结果是null 就会走这 runWorker 结束 销毁线程 processWorkerExit(w, completedAbruptly); } }
总结一下runWorker方法的执行过程:
while循环不断地通过getTask()方法获取任务;
getTask()方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是 中断状态;
调用task.run()执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。 这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给 子类来实现。
private Runnable getTask() { // 判断从阻塞队列里获取任务是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果线程池是非run状态 // 如果>= stop 说明线程正在停止 或者队列是空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 减少任务数 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
到此,关于“java线程池的状态有几种”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。