这篇文章主要介绍“java并发ThreadPoolExecutor如何使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“java并发ThreadPoolExecutor如何使用”文章能帮助大家解决问题。
当一个任务提交到线程池ThreadPoolExecutor时,该任务的执行如下图所示。
如果当前运行的线程数小于corePoolSzie(核心线程数),则创建新线程来执行任务(需要获取全局锁);
如果当前运行的线程数等于或大于corePoolSzie,则将任务加入BlockingQueue(任务阻塞队列);
如果BlockingQueue已满,则创建新的线程来执行任务(需要获取全局锁);
如果创建新线程会使当前线程数大于maximumPoolSize(最大线程数),则拒绝任务并调用RejectedExecutionHandler的rejectedExecution() 方法。
由于ThreadPoolExecutor存储工作线程使用的集合是HashSet,因此执行上述步骤1和步骤3时需要获取全局锁来保证线程安全,而获取全局锁会导致线程池性能瓶颈,因此通常情况下,线程池完成预热后(当前线程数大于等于corePoolSize),线程池的execute() 方法都是执行步骤2。
通过ThreadPoolExecutor能够创建一个线程池,ThreadPoolExecutor的构造函数签名如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
通过ThreadPoolExecutor创建线程池时,需要指定线程池的核心线程数,最大线程数,线程保活时间,线程保活时间单位和任务阻塞队列,并按需指定线程工厂和饱和拒绝策略,如果不指定线程工厂和饱和拒绝策略,则ThreadPoolExecutor会使用默认的线程工厂和饱和拒绝策略。下面分别介绍这些参数的含义。
参数 | 含义 |
---|---|
corePoolSize | 核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。 |
maximumPoolSize | 最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的任务会加入任务阻塞队列,但是如果任务阻塞队列已满且线程数小于maximumPoolSize,此时会继续创建新的线程来执行任务。该参数规定了线程池允许创建的最大线程数 |
keepAliveTime | 线程保活时间。当线程池的线程数大于核心线程数时,多余的空闲线程会最大存活keepAliveTime的时间,如果超过这个时间且空闲线程还没有获取到任务来执行,则该空闲线程会被回收掉。 |
unit | 线程保活时间单位。通过TimeUnit指定线程保活时间的时间单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么时间单位,ThreadPoolExecutor统一会将其转换为NANOSECONDS。 |
workQueue | 任务阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的任务会添加到workQueue中,所有线程执行完上一个任务后,会循环从workQueue中获取任务来执行。 |
threadFactory | 创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。 |
handler | 饱和拒绝策略。如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。 |
通过ThreadPoolExecutor的execute() 方法,能执行Runnable任务,示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor执行简单无返回值任务() throws Exception { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建两个任务 Runnable firstRunnable = new Runnable() { @Override public void run() { System.out.println("第一个任务执行"); } }; Runnable secondRunnable = new Runnable() { @Override public void run() { System.out.println("第二个任务执行"); } }; // 让线程池执行任务 threadPoolExecutor.execute(firstRunnable); threadPoolExecutor.execute(secondRunnable); // 让主线程睡眠1秒,等待线程池中的任务被执行完毕 Thread.sleep(1000); } }
运行测试程序,结果如下。
通过ThreadPoolExecutor的submit() 方法,能够执行Callable任务,通过submit() 方法返回的RunnableFuture能够拿到异步执行的结果。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor执行简单有返回值任务() throws Exception { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建两个任务,任务执行完有返回值 Callable<String> firstCallable = new Callable<String>() { @Override public String call() throws Exception { return "第一个任务返回值"; } }; Callable<String> secondCallable = new Callable<String>() { @Override public String call() throws Exception { return "第二个任务返回值"; } }; // 让线程池执行任务 Future<String> firstFuture = threadPoolExecutor.submit(firstCallable); Future<String> secondFuture = threadPoolExecutor.submit(secondCallable); // 获取执行结果,拿不到结果会阻塞在get()方法上 System.out.println(firstFuture.get()); System.out.println(secondFuture.get()); } }
运行测试程序,结果如下。
如果ThreadPoolExecutor在执行Callable任务时,在Callable任务中抛出了异常并且没有捕获,那么这个异常是可以通过Future的get() 方法感知到的。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor执行简单有返回值任务时抛出错误() { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建一个任务,任务有返回值,但是执行过程中抛出异常 Callable<String> exceptionCallable = new Callable<String>() { @Override public String call() throws Exception { throw new RuntimeException("发生了异常"); } }; // 让线程池执行任务 Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable); try { System.out.println(exceptionFuture.get()); } catch (Exception e) { System.out.println(e.getMessage()); } } }
运行测试程序,结果如下。
ThreadPoolExecutor可以通过submit() 方法来运行Runnable任务,并且还可以异步获取执行结果。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor通过submit的方式来提交并执行Runnable() throws Exception { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建结果对象 MyResult myResult = new MyResult(); // 创建Runnable对象 Runnable runnable = new Runnable() { @Override public void run() { myResult.setResult("任务执行了"); } }; // 通过ThreadPoolExecutor的submit()方法提交Runnable Future<MyResult> resultFuture = threadPoolExecutor.submit(runnable, myResult); // 获取执行结果 MyResult finalResult = resultFuture.get(); // myResult和finalResult的地址实际相同 Assert.assertEquals(myResult, finalResult); // 打印执行结果 System.out.println(resultFuture.get().getResult()); } private static class MyResult { String result; public MyResult() {} public MyResult(String result) { this.result = result; } public String getResult() { return result; } public void setResult(String result) { this.result = result; } } }
运行测试程序,结果如下。
实际上ThreadPoolExecutor的submit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutor的execute() 方法来执行FutureTask。
关闭线程池可以通过ThreadPoolExecutor的shutdown() 方法,但是shutdown() 方法不会去中断正在执行任务的线程,所以如果线程池里有Worker正在执行一个永远不会结束的任务,那么shutdown() 方法是无法关闭线程池的。示例如下。
public class ThreadPoolExecutorTest { @Test public void 通过shutdown关闭线程池() { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建Runnable对象 Runnable runnable = new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { LockSupport.parkNanos(1000 * 1000 * 1000); } System.out.println(Thread.currentThread().getName() + " 被中断"); } }; // 让线程池执行任务 threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); // 调用shutdown方法关闭线程池 threadPoolExecutor.shutdown(); // 等待3秒观察现象 LockSupport.parkNanos(1000 * 1000 * 1000 * 3L); } }
运行测试程序,会发现在主线程中等待3秒后,也没有得到预期的打印结果。如果上述测试程序中使用shutdownNow,则是可以得到预期打印结果的,示例如下。
public class ThreadPoolExecutorTest { @Test public void 通过shutdownNow关闭线程池() { // 创建一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 创建Runnable对象 Runnable runnable = new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { LockSupport.parkNanos(1000 * 1000 * 1000); } System.out.println(Thread.currentThread().getName() + " 被中断"); } }; // 让线程池执行任务 threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); // 调用shutdown方法关闭线程池 threadPoolExecutor.shutdownNow(); // 等待3秒观察现象 LockSupport.parkNanos(1000 * 1000 * 1000 * 3L); } }
运行测试程序,打印如下。
因为测试程序中的任务是响应中断的,而ThreadPoolExecutor的shutdownNow() 方法会中断所有Worker,所以执行shutdownNow() 方法后,正在运行的任务会响应中断并结束运行,最终线程池关闭。
假如线程池中运行着一个永远不会结束的任务,且这个任务不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都是无法关闭线程池的。
关于“java并发ThreadPoolExecutor如何使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。