今天小编给大家分享一下java多线程如何通过CompletableFuture组装异步计算单元的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
CompletableFuture是1.8引入的新特性,一些比较复杂的异步计算场景,尤其是需要串联多个异步计算单元的场景,可以考虑使用 CompletableFuture 来实现。
在现实世界中,我们需要解决的复杂问题都是要分为若干步骤。就像我们的代码一样,一个复杂的逻辑方法中,会调用多个方法来一步一步实现。
设想如下场景,植树节要进行植树,分为下面几个步骤:
挖坑 10 分钟
拿树苗 5 分钟
种树苗 20 分钟
浇水 5 分钟
其中 1 和 2 可以并行,1 和 2 都完成了才能进行步骤 3,然后才能进行步骤 4。
我们有如下几种实现方式:
如果现在只有一个人植树,要种 100 棵树,那么只能按照如下顺序执行:
图中仅列举种三棵树示意。可以看到串行执行,只能种完一棵树再种一棵,那么种完 100 棵树需要 40 * 100 = 4000 分钟
。 这种方式对应到程序,就是单线程同步执行。
如何缩短种树时长呢?你肯定想这还不好办,学习了这么久的并发,这肯定难不倒我。不是要种 100 棵树吗?那我找 100 个人一块种,每个人种一棵。那么只需要 40 分钟就可以种完 100 棵树了。
没错,如果你的程序有个方法叫做 plantTree,里面包含了如上四部,那么你起 100 个线程就可以了。但是,请注意,100 个线程的创建和销毁需要消耗大量的系统资源。并且创建和销毁线程都有时间消耗。此外CPU的核数并不能真的支持100个线程并发。如果我们要种1万棵树呢?总不能起一万个线程吧?
所以这只是理想情况,我们一般是通过线程池来执行,并不会真的启动100个线程。
种每一棵树的时候,不依赖的步骤可以分不同的人并行干
这种方式可以进一步缩短种树的时长,因为第一步挖坑和第二步拿树苗可以两个人并行去做,所以每棵树只需要35 分钟。如下图:
如果程序还是 100 个主线程并发运行 plantTree 方法,那么只需要 35 分钟种完 100 颗树。 这里需要注意每个线程中,由于还要并发两个线程去做 1,2 两个步骤。实际运行中会又 100 x 3 = 300 个线程参与植树。但是负责 1,2 步骤的线程只会短暂参与,然后就闲置了。
这种方法和第二种方式也存在大量创建线程的问题。所以也只是理想情况。
可以看到一开始小王挖完第一个坑后,小李已经取回两个树苗,但此时小张才能开始种第一个树苗。此后小张就可以一个接一个的去种树苗了,并且在他种下一棵树苗的时候,小赵可以并行浇水。按照这个流程走下来,种完 100 颗树苗需要 10+20x100+5=2015 分钟。比单线程的4000分钟好了很多,但是远远比不上 100 个线程并发种树的速度。不过不要忘记 100 个线程并发只是理想情况,而本方法只用了 4 个线程。
我们再对分工做下调整。每个人不只干自己的工作,一旦自己的工作做完了就看有没有其他工作可以做。比如小王挖坑完后,发现可以种树苗,那么他就去种树苗。小李拿树苗完成后也可以去挖坑或者种树苗。这样整体的效率就会更高了。如果基于这种思想,那么我们实际上把任务分成了 4 类,每类 100 件,一共 400 件任务。400 件任务全部完成,意味着整个任务就完成了。那么任务的参与者只需要知道任务的依赖,然后不断领取可以执行的任务去执行。这样的效率将会是最高的。
前文说到我们不可能通过100个线程并发来执行任务,所以一般情况下我们都会使用线程池,这和上面的设计思想不谋而合。使用线程池后,由于第四种方式把步骤拆的更细,提高了并发的可能性。因此速度会比第二种方式更快。那么和第三种比起来,哪种更快呢?如果线程数量可以无穷大,这两个方法能达到的最短时间是一样的,都是 35 分钟。不过在线程有限的情况下,第四种方式对线程的使用率会更高,因为每个步骤都可以并行执行(参与种树的人完成自己的工作后,都可以去帮助其他人),线程的调度更为灵活,所以线程池中的线程很难闲下来,一直保持在运转之中。是的,谁都不能偷懒。而第三种由于只能并发在 plantTree 方法及挖坑和拿树苗,所以不如第四种方式灵活
上文讲了这么多,主要是要说明 CompletableFuture 出现的原因。他用来把复杂任务拆解为一个个衔接的异步执行步骤,从而提升整体的效率。我们回一下小节题目:谁都不能偷懒。没错,这就是 CompletableFuture 要达到的效果,通过对计算单元的抽象,让线程能够高效的并发参与每一个步骤。同步的代码通过 CompletableFuture 可以完全改造为异步代码。下面我们就来看看如何使用 CompletableFuture。
CompletableFuture 实现了 Future 接口并且实现了 CompletionStage 接口。Future 接口我们已经很熟悉了,而CompletionStage 接口定了异步计算步骤之间的规范,这样确保一步一步能够衔接上。CompletionStage 定义了38 个 public 的方法用于异步计算步骤间的衔接。接下来我们会挑选一些常用的,相对使用频率较高的方法,来看看如何使用。
如果你已经知道 CompletableFuture 的计算结果,可以使用静态方法 completedFuture。传入计算结果,声明CompletableFuture 对象。在调用 get 方法时会立即返回传入的计算结果,不会被阻塞,如下代码:
public static void main(String[] args) throws Exception{ CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World"); System.out.println("result is " + completableFuture.get()); } // result is Hello World
是不是觉得这种用法没有什么意义?既然知道计算结果了,直接使用就好了,为什么还要通过 CompletableFuture 进行包装?这是因为异步计算单元需要通过 CompletableFuture 进行衔接,所以有的时候我们即使已经知道计算结果,也需要包装为 CompletableFuture,才能融入到异步计算的流程之中。
这是我们最常用的方式。把需要异步计算的逻辑封装为一个计算单元,交由 CompletableFuture 去运行。如下面的代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成
这里我们使用了 CompletableFuture 的 supplyAsync 方法,以 lambda 表达式的方式向其传递了一个 supplier 接口的实现。
可见 completableFuture.get()
拿到的计算结果就是你传入函数执行后 return 的值。那么如果你有需要异步计算的逻辑,那么就可以放到 supplyAsync 传入的函数体中。这段函数是如何被异步执行的呢?如果你跟入代码可以看到其实 supplyAsync 是通过 Executor,也就是线程池来运行这段函数的。completableFuture 默认使用的是ForkJoinPool,当然你也可以通过为 supplyAsync 指定其他 Excutor,通过第二个参数传入 supplyAsync 方法。
supplyAsync 使用场景非常多,举个简单的例子,主程序需要调用多个微服务的接口请求数据,那么就可以启动多个 CompletableFuture,调用 supplyAsync,函数体中是关于不同接口的调用逻辑。这样不同的接口请求就可以异步同时运行,最后再等全部接口返回时,执行后面的逻辑。
supplyAsync 接收的函数是有返回值的。有些情况我们只是一段计算过程,并不需要返回值。这就像 Runnable 的run 方法,并没有返回值。这种情况我们可以使用 runAsync方法,如下面的代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成")); completableFuture.get(); } // 挖坑完成
runAsync 接收 runnable 接口的函数。所以并无返回值。栗子中的逻辑只是打印“挖坑完成”。
当我们通过 supplyAsync 完成了异步计算,返回 CompletableFuture,此时可以继续对返回结果进行加工,如下面的代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenApply(s -> s + ", 并且归还铁锹") .thenApply(s -> s + ", 全部完成。"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 并且归还铁锹, 全部完成。
在调用 supplyAsync 后,我们两次链式调用 thenApply 方法。s 是前一步 supplyAsync 返回的计算结结果,我们对结算结果进行了两次再加工。我们可以通过 thenApply 不断对计算结果进行加工处理。 如果想异步运行 thenApply 的逻辑,可以使用 thenApplyAsync。使用方法相同,只不过会通过线程池异步运行。
这种场景你可以使用thenApply。这个方法可以让你处理上一步的返回结果,但无返回值。参照如下代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenAccept(s -> System.out.println(s + ", 并且归还铁锹")); completableFuture.get(); }
这里可以看到 thenAccept 接收的函数没有返回值,只有业务逻辑。处理后返回 CompletableFuture 类型对象。
此时你可以使用 thenRun 方法,他接收 Runnable 的函数,没有输入也没有输出,仅仅是在异步计算结束后回调一段逻辑,比如记录 log 等。参照下面代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenAccept(s -> System.out.println(s + ", 并且归还铁锹")) .thenRun(() -> System.out.println("挖坑工作已经全部完成")); completableFuture.get(); } // 挖坑完成, 并且归还铁锹 // 挖坑工作已经全部完成
可以看到在 thenAccept 之后继续调用了 thenRun,仅仅是打印了日志而已
我们可以把两个 CompletableFuture 组合起来使用,如下面的代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且归还铁锹")); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 并且归还铁锹
thenApply 和 thenCompose 的关系就像 stream中的 map 和 flatmap。从上面的例子来看,thenApply 和thenCompose 都可以实现同样的功能。但是如果你使用一个第三方的库,有一个API返回的是CompletableFuture 类型,那么你就只能使用 thenCompose方法。
如果你有两个异步操作互相没有依赖,但是第三步操作依赖前两部计算的结果,那么你可以使用 thenCombine 方法来实现,如下面代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿树苗完成"), (x, y) -> x + y + "植树完成"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 拿树苗完成植树完成
挖坑和拿树苗可以同时进行,但是第三步植树则祖尧前两步完成后才能进行。
可以看到符合我们的预期。使用场景之前也提到过。我们调用多个微服务的接口时,可以使用这种方式进行组合。处理接口调用间的依赖关系。 当你需要两个 Future 的结果,但是不需要再加工后向下游传递计算结果时,可以使用 thenAcceptBoth,用法一样,只不过接收的函数没有返回值。
假如我们对微服务接口的调用不止两个,并且还有一些其它可以异步执行的逻辑。主流程需要等待这些所有的异步操作都返回时,才能继续往下执行。此时我们可以使用 CompletableFuture.allOf 方法。它接收 n 个 CompletableFuture,返回一个 CompletableFuture。对其调用 get 方法后,只有所有的 CompletableFuture 全完成时才会继续后面的逻辑。我们看下面示例代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("挖坑完成"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("取树苗完成"); }); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("取肥料完成"); }); CompletableFuture.allOf(future1, future2, future3).get(); System.out.println("植树准备工作完成!"); } // 挖坑完成 // 取肥料完成 // 取树苗完成 // 植树准备工作完成!
在异步计算链中的异常处理可以采用 handle 方法,它接收两个参数,第一个参数是计算及过,第二个参数是异步计算链中抛出的异常。使用方法如下:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("Computation error"); } return "挖坑完成"; }).handle((result, throwable) -> { if (result == null) { return "挖坑异常"; } return result; }); System.out.println("result is " + completableFuture.get()); } // result is 挖坑异常
代码中会抛出一个 RuntimeException,抛出这个异常时 result 为 null,而 throwable 不为null。根据这些信息你可以在 handle 中进行处理,如果抛出的异常种类很多,你可以判断 throwable 的类型,来选择不同的处理逻辑。
以上就是“java多线程如何通过CompletableFuture组装异步计算单元”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。