温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java中的异步与线程池怎么创建使用

发布时间:2022-11-23 09:57:56 来源:亿速云 阅读:177 作者:iii 栏目:开发技术

这篇文章主要介绍“Java中的异步与线程池怎么创建使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java中的异步与线程池怎么创建使用”文章能帮助大家解决问题。

    初始化线程的4种方式

    1.继承Thread

    	Thread01 thread01 = new Thread01();
    	thread01.start();
    
        public static  class Thread01 extends Thread{
            @Override
            public void run() {
                System.out.println("当前线程:"+Thread.currentThread().getId());
                int i = 10 / 2;
                System.out.println("运行结果:"+i);
            }
        }

    2.实现Runnable 接口

    	Runnable01 runnable01 = new Runnable01();
        new Thread(runnable01).start();
    
        public static class Runnable01 implements Runnable{
            @Override
            public void run() {
                System.out.println("当前线程:"+Thread.currentThread().getId());
                int i = 10 / 2;
                System.out.println("运行结果:"+i);
            }
        }

    3.实现Callable 接口+ FutureTask (可以拿到返回结果,可以处理异常)

        Callabel01 callabel01 = new Callabel01();
    
        FutureTask<Integer> integerFutureTask = new FutureTask<>(callabel01);
        //阻塞等待整个线程执行完成,获取返回结果
        Integer integer = integerFutureTask.get();
        new Thread(integerFutureTask).start();
    
        public static class Callabel01 implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                System.out.println("当前线程:"+Thread.currentThread().getId());
                int i = 10 / 2;
                System.out.println("运行结果:"+i);
                return i;
            }
        }

    在业务代码里面不建议使用以上三种启动线程的方式 

    4.线程池

    应该将所有的多线程异步任务都交给线程池执行,进行有效的资源控制

    //当前系统中池只有一两个,每一个异步任务直接提交给线程池,让他自己去执行
    ExecutorService service = Executors.newFixedThreadPool(10);
    //执行
    service.execute(new Runnable01());

    区别

    1/2两种方式都不能获取返回值

    1/2/3都不能达到资源控制的效果

    只有4能控制资源,系统性能是稳定的

    创建线程池(ExecutorService)

    1.Executors 工具类创建

    //当前系统中池只有一两个,每一个异步任务直接提交给线程池,让他自己去执行
    ExecutorService service = Executors.newFixedThreadPool(10);
    //执行
    service.execute(new Runnable01());

    2.原生方法创建线程池

    ThreadPoolExecutor需要传入七大参数

    • corePoolSize 核心线程数【一直存在,除非设置了允许线程超时的设置:allowCoreThreadTimeOut】,保留在池中的线程数,线程池创建后好后就准备就绪的线程数,就等待异步任务去执行,new 好了 Thread,等待异步任务

    • maximumPoolSize 池中最大线程数量,控制资源并发

    • keepAliveTime 存活时间,当前正在运行的线程数量,大于核心线程数,就会释放空闲的线程,只要线程空闲大于指定存活时间,释放的线程是指最大的线程数量减去核心线程数,

    • unit 时间单位

    • BlockingQueue workQueue 阻塞队列,如果任务有很多,就会将目前多的队伍放在队列里面,只要有空闲的线程,就会去队列里面取出新的任务继续执行。

    • new LinkedBlockingQueue<>() 默认值是Integer的最大值,会导致内存不够,一定要传入业务定制的大小,可以通过压测得出峰值

    • threadFactory 线程的创建工厂

    • handler 如果队列满了,按照我们指定的拒绝策略拒绝执行任务

    3.线程池的运行流程 线程池创建

    准备好core 数量的核心线程,准备接受任务新的任务进来,用core 准备好的空闲线程执行。

    (1) 、core 满了,就将再进来的任务放入阻塞队列中。空闲的core 就会自己去阻塞队列获取任务执行

    (2) 、阻塞队列满了,就直接开新线程执行,最大只能开到max 指定的数量

    (3) 、max 都执行好了。Max-core 数量空闲的线程会在keepAliveTime 指定的时间后自动销毁。最终保持到core 大小

    (4) 、如果线程数开到了max 的数量,还有新任务进来,就会使用reject 指定的拒绝策略进行处理所有的线程创建都是由指定的factory 创建的。

    一个线程池core 7; max 20 ,queue:50,100 并发进来怎么分配的;先有7 个能直接得到执行,接下来50 个进入队列排队,在多开13 个继续执行。

    现在70 个被安排上了。剩下30 个默认拒绝策略。拒绝策略一般是抛弃,如果不想抛弃还要执行,可以使用同步的方式执行,或者丢弃最老的

    Java中的异步与线程池怎么创建使用

    4. 四种常见的线程池

    • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。核心线程固定是0,所有都可回收

    • newFixedThreadPool创建一个固定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。固定大小,核心 = 最大

    • newScheduledThreadPool创建一个固定长线程池,支持定时及周期性任务执行。定时任务线程池

    • newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。后台从队列里面获取任务 挨个执行

    为什么要使用线程池

    降低资源的消耗

    通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

    提高响应速度

    因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

    提高线程的可管理性

    线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

    CompletableFuture 异步编排

    业务场景:

    查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间

    Java中的异步与线程池怎么创建使用

    假如商品详情页的每个查询,需要如下标注的时间才能完成那么,用户需要5.5s 后才能看到商品详情页的内容。很显然是不能接受的。

    如果有多个线程同时完成这6 步操作,也许只需要1.5s 即可完成响应。

    CompletableFuture 和FutureTask 同属于Future 接口的实现类,都可以获取线程的执行结果。

    Java中的异步与线程池怎么创建使用

    1.创建异步对象

    Java中的异步与线程池怎么创建使用

    1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的

    2、可以传入自定义的线程池,否则就用默认的线程池;

    没有返回结果的

    static ExecutorService service = Executors.newFixedThreadPool(10);
    CompletableFuture.runAsync(()->{
         System.out.println("当前线程:"+Thread.currentThread().getId());
         int i = 10 / 2;
         System.out.println("运行结果:"+i);
     },service);

    有返回结果的

    static ExecutorService service = Executors.newFixedThreadPool(10);
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    	 System.out.println("当前线程:" + Thread.currentThread().getId());
    	 int i = 10 / 2;
    	 System.out.println("运行结果:" + i);
    	 return i;
    }, service);
    	Integer integer = future.get();
    	System.out.println("main----end"+integer);

    2.计算完成时(线程执行成功)回调方法

    Java中的异步与线程池怎么创建使用

    whenComplete 可以处理正常和异常的计算结果,虽然可以得到异常信息,但是不能修改返回数据exceptionally 处理异常情况。

    可以感知异常并返回默认值whenComplete 和whenCompleteAsync 的区别:

    • whenComplete:是执行当前任务的线程执行继续执行whenComplete 的任务。

    • whenCompleteAsync:是执行把whenCompleteAsync 这个任务继续提交给线程池来进行执行。

    方法不以Async 结尾,意味着Action 使用相同的线程执行,而Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main----start");
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 0;
                System.out.println("运行结果:" + i);
                return i;
            }, service).whenComplete((res,excption)->{
                System.out.println("异步任务成功完成:结果是::::"+res+"异常是:"+excption);
            }).exceptionally(throwable->{
                //可以感知异常,同时返回数据
                return 10;
            });
            Integer integer = future.get();
            System.out.println("main----end"+integer);
        }

    3.handle 方法(可对结果做最后的处理(可处理异常),可改变返回值)

    Java中的异步与线程池怎么创建使用

    和complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

            /* 方法完成后的处理*/
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("运行结果:" + i);
                return i;
            }, service).handle((res,exption)->{
                if (res != null){
                    return res*2;
                }
                if (exption != null){
                    return 0;
                }
                return 0;
            });

    4.线程串行化

    Java中的异步与线程池怎么创建使用

    • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

    • thenAccept 方法:能接收上一步的返回结果,但是不能改变返回值。

    • thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,不能改变返回值带有Async 默认是异步执行的。同之前。

    以上都要前置任务成功完成。

    Function<? super T,? extends U>

    • T:上一个任务返回结果的类型

    • U:当前任务的返回值类型

    • thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,不能改变返回值

    static ExecutorService service = Executors.newFixedThreadPool(10);
    
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("运行结果:" + i);
                return i;
            }, service).thenRunAsync(() -> {
                System.out.println("任务2启动了");
            }, service);
    • thenAccept 方法:能接收上一步的返回结果,但是不能改变返回值。

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("运行结果:" + i);
                return i;
            }, service).thenAccept((res)->{
                System.out.println("异步启动了:"+res);
            });
    • thenApplyAsync 技能接收上一步的结果,又能改变返回值

     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("运行结果:" + i);
                return i;
            }, service);
            future.thenApplyAsync((res) -> {
                System.out.println("任务2启动了:" + res);
                return res + "hello";
            }, service);
    
            System.out.println("main----end"+future.get());

    5.两任务组合- 都要完成

    Java中的异步与线程池怎么创建使用

    Java中的异步与线程池怎么创建使用

    两个任务必须都完成,触发该任务。

    • thenCombine:组合两个future,获取两个future 的返回结果,并返回当前任务的返回值

    • thenAcceptBoth:组合两个future,获取两个future 任务的返回结果,然后处理任务,没有返回值。

    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("任务1运行结果:" + i);
                return i;
            }, service);
    
            CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程:" + Thread.currentThread().getId());
                System.out.println("任务2运行结果:");
                return "hello";
            }, service);
    
            future01.thenAcceptBothAsync(future02, (f1, f2) -> {
                System.out.println("任务3开始之前的结果---f1=" + f1 + "f2=" + f2);
            }, service);
    • runAfterBoth:组合两个future,不获取前两个的结果,只需两个future 处理完任务后,处理该任务。

            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("任务1运行结果:" + i);
                return i;
            }, service);
    
            CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程:" + Thread.currentThread().getId());
                System.out.println("任务2运行结果:");
                return "hello";
            }, service);
    
            future01.runAfterBothAsync(future02,()->{
                System.out.println("任务3开始");
            },service);

    6.两任务组合- 一个完成

    Java中的异步与线程池怎么创建使用

    Java中的异步与线程池怎么创建使用

    当两个任务中,任意一个future 任务完成的时候,执行任务。

    • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并自己有新的返回值。

            CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("任务1运行结果:" + i);
                return i;
            }, service);
    
            CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程:" + Thread.currentThread().getId());
                System.out.println("任务2运行结果:");
                return "hello";
            }, service);
    
            future01.applyToEitherAsync(future02,(t) -> {
                System.out.println("任务3开始"+t);
                return t.toString() + "niubi";
            }, service);
    • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,自己没有新的返回值。

     CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("任务1运行结果:" + i);
                return i;
            }, service);
    
            CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程:" + Thread.currentThread().getId());
                System.out.println("任务2运行结果:");
                return "hello";
            }, service);
    
            future01.acceptEitherAsync(future02,(t) -> {
                System.out.println("任务3开始"+t);
            }, service);
    • runAfterEither:两个任务有一个执行完成,不获取future 的结果,处理任务,自己也没有返回值。

    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程:" + Thread.currentThread().getId());
                int i = 10 / 4;
                System.out.println("任务1运行结果:" + i);
                return i;
            }, service);
    
            CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程:" + Thread.currentThread().getId());
                System.out.println("任务2运行结果:");
                return "hello";
            }, service);
    
            future01.runAfterEitherAsync(future02,() -> {
                System.out.println("任务3开始");
            }, service);

    7.多任务组合

    Java中的异步与线程池怎么创建使用

    • allOf:等待所有任务完成

    CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
                System.out.println("查询商品的图片信息");
                return "hello.png";
            }, service);
            CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
                System.out.println("查询商品的属性");
                return "黑色+256g";
            }, service);
            CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
                System.out.println("查询商品的介绍");
                return "华为";
            }, service);
    
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
            completableFuture.get(); //等待所有结果完成
    • anyOf:只要有一个任务完成

    整合SpringBoot

    1.添加配置类,新建线程池

    package cn.cloud.xmall.product.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description: ···
     * @author: Freedom
     * @QQ: 1556507698
     * @date:2022/3/21 17:41
     */
    @Configuration
    public class MyThreadConfig {
    
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(){
           return new ThreadPoolExecutor(
                    20,
                    200,
                    10,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(100000),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
                    );
        };
    }

    2.想要在配置文件中手动的配置参数

    新建一个配置属性类

    package cn.cloud.xmall.product.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description: ···
     * @author: Freedom
     * @QQ: 1556507698
     * @date:2022/3/21 17:47
     */
    @ConfigurationProperties(prefix = "xmall.thread")
    @Component  //加入容器
    @Data
    public class ThreadPollConfigProperties {
        private Integer coreSize;
        private Integer maxSize;
        private Integer keepAliveTime;
    }

    注:可以在依赖种添加此依赖,在配置文件中就会有我们自己配置属性的提示

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-configuration-processor</artifactId>
          <optional>true</optional>
      </dependency>

    3.配置文件配置属性

    #线程池配置
    xmall:
      thread:
        core-size: 20
        max-size: 200
        keep-alive-time: 10

    4.使用配置文件中的属性

    @EnableConfigurationProperties(ThreadPollConfigProperties.class),如果配置文件类没有添加@Component加入容器可以使用这种方式

    package cn.cloud.xmall.product.config;
    
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description: ···
     * @author: Freedom
     * @QQ: 1556507698
     * @date:2022/3/21 17:41
     */
    //@EnableConfigurationProperties(ThreadPollConfigProperties.class)
    @Configuration
    public class MyThreadConfig {
    
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(ThreadPollConfigProperties pool){
           return new ThreadPoolExecutor(
                    pool.getCoreSize(),
                    pool.getMaxSize(),
                    pool.getKeepAliveTime(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(100000),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
                    );
        };
    }

    5.注入线程池

        @Autowired
        private ThreadPoolExecutor executor;

    6.异步编排

        @Override
        public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    
            SkuItemVo skuItemVo = new SkuItemVo();
    
            //1.使用自己的线程池来新建异步任务
            CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
                //1.查询基本信息 pms_sku_info
                SkuInfoEntity info = getById(skuId);
                skuItemVo.setInfo(info);
                return info;
            }, executor);
    
            //2.根据一号任务来继续调用
            CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
                //3.获取当前spu的销售属性组合
                List<SkuItemSaleAttrVo> saleAttrVos = saleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
                skuItemVo.setSaleAttr(saleAttrVos);
            }, executor);
    
            //3.根据一号任务来继续调用
            CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
                //4.获取Spu的介绍 pms_spu_info_desc
                SpuInfoDescEntity spuInfo = spuInfoDescService.getById(res.getSpuId());
                skuItemVo.setDesc(spuInfo);
            }, executor);
    
            //4.根据一号任务来继续调用
            CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
                //5.获取spu的规格参数信息
                List<SpuItemAttrGroupVo> attrGroups = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
                skuItemVo.setGroupAttrs(attrGroups);
            }, executor);
    
            //此任务不需要根据一号任务的返回调用,所以开一个新线程
            CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> {
                //2.获取sku的图片信息 pms_sku_images
                List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);
                skuItemVo.setImages(images);
            }, executor);
    
            //等待所有任务都完成
            //TODO 可以选择有异常情况下的处理结果
            CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imagesFuture).get();        
    
            return skuItemVo;
        }

    关于“Java中的异步与线程池怎么创建使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI