温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

java线程池的状态有几种

发布时间:2021-07-29 19:22:19 来源:亿速云 阅读:167 作者:chen 栏目:大数据

这篇文章主要介绍“java线程池的状态有几种”,在日常操作中,相信很多人在java线程池的状态有几种问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java线程池的状态有几种”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

线程池5种状态
// 高3位存放状态 低29位存线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

RUNNING = ‐1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

RUNNING 初始化以后就是这种状态 SHUTDOWN 不再接收新任务 把已有任务完成后就变状态 STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 TIDYING(收拾/整理) 没有任务没有线程 有钩子方法 terminated() TERMINATED 线程池的具体实现 ThreadPoolExecutor 默认线程池 ScheduledThreadPoolExecutor 定时线程池 提交任务

public void execute() //提交任务无返回值
public Future<?> submit() //任务执行完成后有返回值

###参数解释

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。

maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线

程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime; unit keepAliveTime的单位;

workQueue 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供

了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到 另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有优先级的无界阻塞队列; threadFactory 它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。

线程池监控
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

源码分析

execute方法
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /**
         * 记录着线程池状态和线程数量
         */
        int c = ctl.get();
        /**
         * 如果当前线程数小于核心线程数 创建线程
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

如果在执行execute方法一直处于running状态 执行流程如下

  1. 如果wokeCount < corePoolSize 创建一个新的线程来执行任务

  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;

  3. 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新 提交的任务;

  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根 据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。 注意一下addWorker(null,false) 创建线程但是没有传入任务,因为之前已经把任务放到队里里面了

addWorker方法

firstTask参数 用 于指定新增的线程执行的第一个任务

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    // 如果设置成功 跳出第一个循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    // 如果状态变了 回到的第一个循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 用任务创建了worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 放入线程不安全的set里
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 设置最大的池数量
                            largestPoolSize = s;
                        // 标记添加worker成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 开始执行任务 因为thread是利用woker创建的 所以t.start实际就是调用的worker的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker对象.

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属 性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创 建的线程,是用来处理任务的线程。

在调用构造方法时,需要把任务传入,这里通过 getThreadFactory().newThread(this); 来 新 建 一 个 线 程 , newThread 方 法 传 入 的 参 数 是 this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在 启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来 实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;

  2. 如果正在执行任务,则不应该中断线程;

  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;

  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是否是空闲状态;

  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的 线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线 程。 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。 此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时 就不应该被中断,看一下tryAquire方法:

// tryAcquire方法是根据state是否是0来判断的,所以,   将state设置为-1是 为了禁止在执行任务前对线程进行中断。
//正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。

protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts 允许被打断
        // 是否异常退出循环
        boolean completedAbruptly = true;
        try {
            // 如果task是null 从队列里拿数据
            while (task != null || (task = getTask()) != null) {
                // 为什么不用ReentrantLock 因为要做到不可重入
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                /**
                 * 如果线程池正在停止,那么要保证当前线程是中断状态;
                 * 如果不是的话,则要保证当前线程不是中断状态;
                 * Thread.interrupted() 获得当前的中断状态 复位
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // gettask结果是null 就会走这 runWorker 结束 销毁线程
            processWorkerExit(w, completedAbruptly);
        }
    }

总结一下runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;

  2. getTask()方法从阻塞队列中取任务;

  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是 中断状态;

  4. 调用task.run()执行任务;

  5. 如果task为null则跳出循环,执行processWorkerExit()方法;

  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。 这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给 子类来实现。

getTask方法
private Runnable getTask() {
        // 判断从阻塞队列里获取任务是否超时
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果线程池是非run状态
            // 如果>= stop 说明线程正在停止 或者队列是空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 减少任务数
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

到此,关于“java线程池的状态有几种”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI