温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java多线程:CyclicBarrier的使用场景

发布时间:2025-03-19 08:06:39 阅读:99 作者:小樊 栏目:编程语言
Java开发者专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

CyclicBarrier 是 Java 并发包 java.util.concurrent 中的一个同步辅助类,它允许一组线程互相等待,直到所有线程都准备好继续执行。CyclicBarrier 特别适用于那些需要多个线程协同工作,且每个线程在继续执行之前必须等待其他线程达到某个屏障点的场景。

以下是 CyclicBarrier 的一些典型使用场景:

1. 并行计算

将一个大任务拆分成多个小任务,分配给多个线程并行处理。当所有小任务都完成后,主线程或其他线程可以继续执行后续操作。

import java.util.concurrent.CyclicBarrier;

public class ParallelComputation {
    public static void main(String[] args) {
        int numberOfThreads = 4;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("All tasks are completed.");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            new Thread(new Task(barrier)).start();
        }
    }

    static class Task implements Runnable {
        private final CyclicBarrier barrier;

        public Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is working.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " has finished.");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

2. 数据聚合

多个线程分别处理数据的一部分,然后将结果汇总到一个共享的数据结构中。CyclicBarrier 可以确保所有线程都完成了数据处理后再进行汇总。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;

public class DataAggregation {
    public static void main(String[] args) {
        int numberOfThreads = 4;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("All data has been aggregated.");
        });

        List<List<Integer>> results = new ArrayList<>();
        for (int i = 0; i < numberOfThreads; i++) {
            final int threadIndex = i;
            new Thread(() -> {
                List<Integer> data = fetchData(threadIndex);
                results.add(data);
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    private static List<Integer> fetchData(int threadIndex) {
        // Simulate fetching data
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            data.add(threadIndex * 10 + i);
        }
        return data;
    }
}

3. 多阶段任务

任务分为多个阶段,每个阶段需要所有线程都完成当前阶段的工作后才能进入下一个阶段。

import java.util.concurrent.CyclicBarrier;

public class MultiStageTask {
    public static void main(String[] args) {
        int numberOfThreads = 4;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("Stage completed.");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            new Thread(new StageTask(barrier)).start();
        }
    }

    static class StageTask implements Runnable {
        private final CyclicBarrier barrier;

        public StageTask(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is working on stage 1.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " has finished stage 1.");
                barrier.await();

                System.out.println(Thread.currentThread().getName() + " is working on stage 2.");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName() + " has finished stage 2.");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4. 资源初始化

多个线程负责初始化不同的资源,所有资源都初始化完成后,主线程或其他线程可以继续执行后续操作。

import java.util.concurrent.CyclicBarrier;

public class ResourceInitialization {
    public static void main(String[] args) {
        int numberOfResources = 4;
        CyclicBarrier barrier = new CyclicBarrier(numberOfResources, () -> {
            System.out.println("All resources have been initialized.");
        });

        for (int i = 0; i < numberOfResources; i++) {
            new Thread(new ResourceInitializer(barrier, i)).start();
        }
    }

    static class ResourceInitializer implements Runnable {
        private final CyclicBarrier barrier;
        private final int resourceId;

        public ResourceInitializer(CyclicBarrier barrier, int resourceId) {
            this.barrier = barrier;
            this.resourceId = resourceId;
        }

        @Override
        public void run() {
            try {
                System.out.println("Initializing resource " + resourceId);
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println("Resource " + resourceId + " has been initialized.");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

CyclicBarrierawait() 方法会阻塞线程,直到所有线程都调用了 await() 方法。当所有线程都到达屏障点时,屏障会自动打开,所有线程继续执行。CyclicBarrier 还支持超时机制,可以在指定时间内等待所有线程到达屏障点。

使用 CyclicBarrier 时需要注意以下几点:

  • 确保所有线程都能调用 await() 方法,否则可能会导致死锁。
  • 处理 BrokenBarrierException,当某个线程在等待时被中断或屏障被重置时,会抛出此异常。
  • 如果屏障被重置(调用 reset() 方法),所有等待的线程会收到 BrokenBarrierException

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI

开发者交流群×