温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java并行处理的实现方法

发布时间:2021-07-14 13:39:52 来源:亿速云 阅读:494 作者:chen 栏目:开发技术

本篇内容介绍了“Java并行处理的实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

目录
  • 1. 背景

  • 2.知识

  • 3. Java 中的并行处理

  • 4. 扩展

    • 线程池方式实现并行处理

    • 使用 fork/join框架

  • 5.参考:

    1. 背景

    本文是一个短文章,介绍Java 中的并行处理。
    说明:10多分钟读完的文章我称之为短文章,适合快速阅读。

    2.知识

    并行计算(parallel computing)一般是指许多指令得以同时进行的计算模式。在同时进行的前提下,可以将计算的过程分解成小部分,之后以并发方式来加以解决。

    也就是分解为几个过程:

    1、将一个大任务拆分成多个子任务,子任务还可以继续拆分。
    2、各个子任务同时进行运算执行。
    3、在执行完毕后,可能会有个 " 归纳 " 的任务,比如 求和,求平均等。

    再简化一点的理解就是: 先拆分  -->  在同时进行计算  --> 最后“归纳”
    为什么要“并行”,优点呢?

    1、为了获得 “节省时间”,“快”。适合用于大规模运算的场景。从理论上讲,在 n 个并行处理的执行速度可能会是在单一处理机上执行的速度的 n 倍。
    2、以前的计算机是单核的,现代的计算机Cpu都是多核的,服务器甚至都是多Cpu的,并行计算可以充分利用硬件的性能。

    3. Java 中的并行处理

    JDK 8 新增的Stream API(java.util.stream)将生成环境的函数式编程引入了Java库中,可以方便开发者能够写出更加有效、更加简洁的代码。

    steam 的另一个价值是创造性地支持并行处理(parallel processing)。示例:

    final Collection< Task > tasks = Arrays.asList(
        new Task( Status.OPEN, 5 ),
        new Task( Status.OPEN, 13 ),
        new Task( Status.CLOSED, 8 ) 
    );
    
    // 并行执行多个任务,并 求和
    final double totalPoints = tasks
       .stream()
       .parallel()
       .map( task -> task.getPoints() ) // or map( Task::getPoints ) 
       .reduce( 0, Integer::sum );
     
    System.out.println( "Total points (all tasks): " + totalPoints );

    对于上面的tasks集合,上面的代码计算所有任务的点数之和。
    它使用 parallel 方法并行处理所有的task,并使用 reduce 方法计算最终的结果。

    4. 扩展

    线程池方式实现并行处理

    jdk1.5引入了并发包,其中包括了ThreadPoolExecutor,相关代码如下:

    public class ExecutorServiceTest {
     
        public static final int THRESHOLD = 10_000;
        public static long[] numbers;
     
        public static void main(String[] args) throws Exception {
            numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
            CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
            int taskSize = (int) (numbers.length / THRESHOLD);
            for (int i = 1; i <= taskSize; i++) {
                final int key = i;
                completionService.submit(new Callable<Long>() {
     
                    @Override
                    public Long call() throws Exception {
                        return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                    }
                });
            }
            long sumValue = 0;
            for (int i = 0; i < taskSize; i++) {
                sumValue += completionService.take().get();
            }
            // 所有任务已经完成,关闭线程池
            System.out.println("sumValue = " + sumValue);
            executor.shutdown();
        }
     
        private static long sum(int start, int end) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }

    使用 fork/join框架

    分支/合并框架的目的是以递归的方式将可以并行的认为拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果;相关代码如下:

    public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
        
        private static final long serialVersionUID = 1L;
        private final long[] numbers;
        private final int start;
        private final int end;
        public static final long THRESHOLD = 10_000;
     
        public ForkJoinTest(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
     
        private ForkJoinTest(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
     
        @Override
        protected Long compute() {
            int length = end - start;
            if (length <= THRESHOLD) {
                return computeSequentially();
            }
            ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
            leftTask.fork();
            ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
            Long rightResult = rightTask.compute();
            // 注:join方法会阻塞,因此有必要在两个子任务的计算都开始之后才执行join方法
            Long leftResult = leftTask.join();
            return leftResult + rightResult;
        }
     
        private long computeSequentially() {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
     
        public static void main(String[] args) {
            System.out.println(forkJoinSum(10_000_000));
        }
     
        public static long forkJoinSum(long n) {
            long[] numbers = LongStream.rangeClosed(1, n).toArray();
            ForkJoinTask<Long> task = new ForkJoinTest(numbers);
            return new ForkJoinPool().invoke(task);
        }
    }

    上面的代码实现了 递归方式拆分子任务,并放入到线程池中执行。

    5.参考:

    https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97

    “Java并行处理的实现方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI