温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

线程池ThreadPoolExecutor有什么作用

发布时间:2021-07-06 18:17:58 来源:亿速云 阅读:190 作者:chen 栏目:大数据

这篇文章主要讲解了“线程池ThreadPoolExecutor有什么作用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“线程池ThreadPoolExecutor有什么作用”吧!

一、初始化线程池(4种):

1、newFixedThreadPool()

public final static ExecutorService esPool = Execustor.newFixedThreadPool(50);

特点:corePoolSize == maxPoolSize,使用LinkedBlockingQueue作为堵塞队列;没有可执行任务时不会释放线程池;

2、newCachedThreadPool()

public final static ExecutorService esPool  = Execustor.newCachedThreadPool();

特点:默认缓存60s,线程数可以达到Integer.MAX_VALUE,内部使用SynchronousQueue作为堵塞队列;没有可执行任务时,达到keepAliveTime会释放线程资源,新任务没有执行空闲线程就得重新创建新的线程,会导致系统开销;使用时,注意控制并发数,减少创建新线程数;

3、newScheduleThreadPool()

public final static ExecutorService esPool = Execustor.newScheduleThreadPool(50);

特点:指定时间内周期性内执行所提交的任务,一般用于定时同步数据;

4、newSingleThreadExecutor()

特点:初始化只有一个线程的线程池;内部使用LinkedBlockingQueue作为堵塞队列;

二、源码实现:

1、ThreadPoolExecutor构造器

 public ThreadPoolExecutor(
                              int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//线程存活时间(必须在线程数在corePoolSize与maximumPoolSie之间时,存活时间才能生效)
                              TimeUnit unit,//存活时间的单位
                              BlockingQueue<Runnable> workQueue,//堵塞队列
                              RejectedExecutionHandler handler//当拒绝处理任务时的策略
                           ) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

说明:堵塞队列BlockingQueue:1)ArrayBlockingQueue(数组结构的界限堵塞队列,FIFO);2)LinkedBlockQueue(链表结构的堵塞队列,FIFO);3)SynchronousQueue(不存储元素的堵塞队列,每次插入必须等到另一个线程移除);4)PriorityBlockingQueue(具有优先级的堵塞队列)。

拒绝处理任务的策略RejectedExecutionHandler :1)、ThreadPoolExecutor.AbortPolicy(抛弃当前线程,并抛出RejectedExecutionException异常)2)、ThreadPoolExecutor.DiscardPolicy(抛弃当前线程,不抛出异常);3)、ThreadPoolExecutor.DiscardOldestPolicy(抛弃最前面的任务,然后重新尝试此执行过程);4)、CallerRunnsPolicy(调用线程执行此任务)

2、execute()

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        //获取当前线程的数量:workerCountOf()
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//当当前线程数量小于corePoolSize时,就addWorker
                return;
            c = ctl.get();
        }
        //如果当前线程处于Running状态;
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //检查线程池(因为可能在上次检查后,有线程资源被释放),是否有空闲的线程
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果当前线程处于非Running状态;
        else if (!addWorker(command, false))
            reject(command);
    }

3、reject()

   final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

4、submit()

  public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        //封装成一个FutureTask对象,FutureTask类实现Runnable接口
        RunnableFuture<T> ftask = newTaskFor(task);
        //执行execute(),通过execute提交到FutureTask线程池中等待被执行,最终执行FutureTask的run方法
        execute(ftask);
        return ftask;
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

5、线程状态

	private static final int COUNT_BITS = Integer.SIZE - 3;

    //即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
	private static final int RUNNING    = -1 << COUNT_BITS;
	
    //即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	
    //即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
	private static final int STOP       =  1 << COUNT_BITS;
	
    //即高3位为010,该状态表示线程池对线程进行整理优化;
	private static final int TIDYING    =  2 << COUNT_BITS;

	//即高3位为011,该状态表示线程池停止工作;
	private static final int TERMINATED =  3 << COUNT_BITS;

6、runWorker()(真正执行任务的接口)

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//啥都没做
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }



 protected void afterExecute(Runnable r, Throwable t) { }

感谢各位的阅读,以上就是“线程池ThreadPoolExecutor有什么作用”的内容了,经过本文的学习后,相信大家对线程池ThreadPoolExecutor有什么作用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI