这篇文章主要讲解了“Java线程池的原理和作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java线程池的原理和作用是什么”吧!
“首当其冲的问题就是,如何复用线程。”
/** * Created by Anur IjuoKaruKas on 2019/7/16 */ public class ThreadPoolExecutor { private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); private final Runnable runnable = () -> { try { while (true) { Runnable take = workQueue.poll(); if (take == null) { Thread.sleep(200); } else { take.run(); } } } catch (InterruptedException e) { e.printStackTrace(); } }; public ThreadPoolExecutor() { new Thread(runnable).start(); } public void execute(Runnable command) { workQueue.offer(command); } }
肥宅埋过了一眼,很快就发现其中玄妙之处:在小奈的 ThreadPoolExecutor
中,定制了一套 runnable
流程,负责不断从 workQueue
这个队列中拉取由 #execute
方法提交过来的任务,并执行其 run()
方法。这样,无论提交过来多少个任务,始终都是这个线程池内置的线程在执行任务。当获取不到任务的时候,线程池会自己进入休眠状态。
“虽然这达到了线程复用,但是你的这个线程完全没办法自动创建和销毁啊?甚至它的线程池数量都是不可控制的。”肥宅埋虽然感叹于对方可以这么快实现线程复用,但还是持续展开攻势。
“既然要实现线程池可控,最直截了当的想法便是将方才的那套 runnable
流程封装成一个对象,我们只需控制这个对象的创建、销毁、以及复用即可。”作为一只长期浸泡在 OOP
思维中的程序媛,这种问题难不倒小奈。她很快就写出了一个内部类,叫做 Worker
,其中 #runWorker(this);
就是刚才那个 runnable
流程,负责不断从队列中获取任务,并调用它的 #run()
方法。
private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = threadFactory.newThread(this); } @Override public void run() { runWorker(this); } }
小奈为后续将要完成的 worker
线程数量控制打下了基石:ThreadPoolExecutor
中增加了一个散列集,用于存放 worker
,增加了一个 ThreadFactory
,供使用者定制化 worker
线程的创建。
其中比较核心的方法叫做 #addWorker()
,负责创建并初始化 worker
线程,并将其纳入散列集中管理。当然,这个线程池还无法自动创建,不过已经可以自动销毁了。可以看到,在拉取不到任务时,#getTask()
则返回空,会跳出 #runWorker()
的 while
循环,之后调用 #processWorkerExit();
,将 worker
线程从散列集中移除。
/** * Created by Anur IjuoKaruKas on 2019/7/16 */ public class ThreadPoolExecutor { private final HashSet<Worker> workers = new HashSet<>(); private volatile ThreadFactory threadFactory; private final BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.workQueue = workQueue; } public void execute(Runnable command) { workQueue.offer(command); } /** * 新建一个 worker 线程、启动并纳入 workers */ private boolean addWorker(Runnable firstTask) { Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; } /** * worker 线程池不断从 workQueue 中拉取 task 进行消费 */ private void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { task.run(); } processWorkerExit(w); } /** * 当线程执行完毕之前,将其从 workers 中移除 */ private void processWorkerExit(Worker w) { workers.remove(w); } private Runnable getTask() { return workQueue.poll(); } }
看到这里,肥宅埋已经能预测到接下来的思路了。
线程池需要加入一个变量 maximumPoolSize
,以防无限创建线程,每次进行 #addWorker()
时,需要判断一下是否可以继续添加 worker
,如果可以,则添加新的 worker
,否则将任务丢入队列:
#addWorker()
中加入拒绝的逻辑,确保不能无限创建 worker
。
再修改一下 #execute()
方法,优先创建 worker
,如果创建 worker
失败( workers.size() >= maximumPoolSize
),则直接将任务丢入队列。
public void execute(Runnable command) { if (addWorker(command)) { return; } workQueue.offer(command); } /** * 新建一个 worker 线程、启动并纳入 workers */ private boolean addWorker(Runnable firstTask) { int ws = workers.size(); if (ws >= maximumPoolSize) { return false; } Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; }
已经写到这里小奈可谓是趾高气扬,仿佛实现一个线程池已经不在话下。
“这样貌似有点问题啊?虽然说你已经实现了线程的动态创建与销毁,但在任务没有那么紧凑的情况下,基本是每个任务都进来都需要创建一次线程,再销毁一次线程,说好的复用到哪里去了?”肥宅埋给了膨胀的小奈当头一棒。
“咳咳......嘛,销毁的时候做一下判断就可以了,我们加入一个新的变量,叫做 keepAliveTime
,当拿不到任务的时候,就进行阻塞休眠,比如 20ms
,每次对 keepAliveTime
减 20ms
,直到小于等于 0
,再销毁线程。”小奈反应迅速,很快给出了答案,并准备动手对线程池进行改动。
肥宅埋叹了一口气,“我看你是被膨胀蒙蔽了双眼,既然我们已经使用了阻塞队列,那么就可以充分利用阻塞队列的特性!阻塞队列中内置了一个显式锁,利用锁的 condition
对象,使用它的 #awaitNanos()
与 #notify()
方法,就可以直接精准地实现线程调度了。”毕竟肥宅埋也是一只学霸,听到小奈的想法后提出了更具有建设性的设计。
小奈也很快反应过来,阻塞队列有一个 #poll()
方法,底层是借助 condition
对象封装的 LockSupport.parkNanos(this, nanosTimeout);
来实现的,会阻塞直到有新的元素加入,当有新的元素加入,这个 condition
就会被唤醒,来实现 当调用阻塞队列的 #poll()
时,如果阻塞队列为空,会进行一段时间的休眠,直到被唤醒,或者休眠超时。
肥宅埋一手接管了改造线程池的大权,马上大刀阔斧地改了起来。
改动十分简单,原先的 #getTask()
是直接调用阻塞队列的 #take()
方法,如果队列为空,则直接返回,只要将其改为 #poll
方法即可。
/** * 当 runWorker 一定时间内获取不到任务时,就会 processWorkerExit 销毁 */ private Runnable getTask() { boolean timedOut = false; while (true) { try { if (timedOut) { return null; } Runnable r = workQueue.poll(keepAliveTime, unit); if (r != null) { return r; } else { timedOut = true; } } catch (InterruptedException e) { timedOut = false; } } }
“一般来说,我们的任务提交都不会太过于均匀,如果我们平常不需要那么多线程来消费,但又想避免任务一直被堆积导致某些任务迟迟不被消费,就需要引入**核心线程 corePoolSize
** 与 **最大线程 maximumPoolSize
** 的概念。”肥宅埋想到了一个简单的可以优化的点,头头是道地分析道:“我们可以不用做那么复杂的动态 worker
消费池,最简单的,如果我们的阻塞队列满了,就继续创建更多的线程池,这样,堆积的任务能比以往更快速的降下来。”
说起来好像复杂,实际上代码十分简单。小奈看见肥宅埋修改了 #addWorker()
方法,增加了一个参数 core
,其作用只有一个,如果是核心线程,则创建时,数量必须小于等于 corePoolSize
,否则数量必须小于等于 maximumPoolSize
。
另外, #execute()
方法的改动也十分简单,前面的改动不大,主要是,当任务 #offer()
失败后,创建非核心 worker
线程。
/** * 优先创建核心线程,核心线程满了以后,则优先将任务放入队列 * * 队列满了以后,则启用非核心线程池,以防任务堆积 */ public void execute(Runnable command) { if (getPoolSize() < corePoolSize) { if (addWorker(command, true)) { return; } } if (!workQueue.offer(command)) { addWorker(command, false); } } /** * 新建一个 worker 线程、启动并纳入 workers */ private boolean addWorker(Runnable firstTask, boolean core) { int ws = workers.size(); if (ws >= (core ? corePoolSize : maximumPoolSize)) { return false; } Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; }
“现在这个版本的线程池看起来真是有模有样呢 ~ 可以动态创建与销毁线程,线程也能复用,还可以动态增加更多的线程来消费堆积的线程!” 肥宅埋满意地看着两人的杰作,“其实我还发现有个地方不太友好,在推送任务时,调用方可能并不知道自己的任务是否失败。”
“这个简单鸭,只需要在调用 #execute()
时返回 flase
来代表添加失败,或者抛出对应的异常即可。”小奈给出了很直观的设计。
“这确实不失为一个好方法,但是对于调用方来说,如果所有使用线程池的地方都需要去做这个判断,那岂不是太麻烦了!”肥宅埋对方案进行了补充:“这个是面向切面编程的一种思想,我们可以提供一个如何处理这些队列已经放不下,且无法创建更多消费线程的切面入口,就叫它 AbortPolicy
吧!”
肥宅埋修改了一下 #execute()
方法,如果在创建非核心线程池的时候失败,就直接将任务拒绝掉。
/** * 优先创建核心线程,核心线程满了以后,则优先将任务放入队列 * * 队列满了以后,则启用非核心线程池,以防任务堆积 * * 如果非核心线程池创建失败,则拒绝这个任务 */ public void execute(Runnable command) { if (getPoolSize() < corePoolSize) { if (addWorker(command, true)) { return; } } if (!workQueue.offer(command)) { if (!addWorker(command, false)) { reject(command); } } }
如何去拒绝任务,交给调用者去实现,#reject()
的实现非常简单,就是调用一下 BiConsumer
,这个可以供调用方自由定制。
private void reject(Runnable command) { abortPolicy.accept(command, this); }
小奈与肥宅埋已经完成了她们的线程池,现在需要测试一下线程池是否可以正常使用,比较细心的肥宅埋写了测试用例如下:
核心线程数为5,最大线程数为10,紧接着每个线程在拉取不到任务时会存活一分钟,有一个长度为 5 的并发阻塞队列,采用默认的 ThreadFactory
,最后,使用了 DiscardPolicy
,当任务被拒绝后,直接丢弃任务,并打印日志。
她们运行了代码,日志打印如下。完全符合预期,在阻塞队列还未装满之前,只有 5 个核心线程在消费任务,当阻塞队列满了以后,会逐步创建更多的线程,而当无法创建更多线程后,则触发丢弃策略。
感谢各位的阅读,以上就是“Java线程池的原理和作用是什么”的内容了,经过本文的学习后,相信大家对Java线程池的原理和作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。