CyclicBarrier
是 Java 并发包 java.util.concurrent
中的一个同步辅助类,它允许一组线程互相等待,直到所有线程都准备好继续执行。CyclicBarrier
特别适用于那些需要多个线程协同工作,且每个线程在继续执行之前必须等待其他线程达到某个屏障点的场景。
以下是 CyclicBarrier
的一些典型使用场景:
将一个大任务拆分成多个小任务,分配给多个线程并行处理。当所有小任务都完成后,主线程或其他线程可以继续执行后续操作。
import java.util.concurrent.CyclicBarrier;
public class ParallelComputation {
public static void main(String[] args) {
int numberOfThreads = 4;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
System.out.println("All tasks are completed.");
});
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Task(barrier)).start();
}
}
static class Task implements Runnable {
private final CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is working.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has finished.");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
多个线程分别处理数据的一部分,然后将结果汇总到一个共享的数据结构中。CyclicBarrier
可以确保所有线程都完成了数据处理后再进行汇总。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
public class DataAggregation {
public static void main(String[] args) {
int numberOfThreads = 4;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
System.out.println("All data has been aggregated.");
});
List<List<Integer>> results = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
final int threadIndex = i;
new Thread(() -> {
List<Integer> data = fetchData(threadIndex);
results.add(data);
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
private static List<Integer> fetchData(int threadIndex) {
// Simulate fetching data
List<Integer> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(threadIndex * 10 + i);
}
return data;
}
}
任务分为多个阶段,每个阶段需要所有线程都完成当前阶段的工作后才能进入下一个阶段。
import java.util.concurrent.CyclicBarrier;
public class MultiStageTask {
public static void main(String[] args) {
int numberOfThreads = 4;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
System.out.println("Stage completed.");
});
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new StageTask(barrier)).start();
}
}
static class StageTask implements Runnable {
private final CyclicBarrier barrier;
public StageTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is working on stage 1.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has finished stage 1.");
barrier.await();
System.out.println(Thread.currentThread().getName() + " is working on stage 2.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has finished stage 2.");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
多个线程负责初始化不同的资源,所有资源都初始化完成后,主线程或其他线程可以继续执行后续操作。
import java.util.concurrent.CyclicBarrier;
public class ResourceInitialization {
public static void main(String[] args) {
int numberOfResources = 4;
CyclicBarrier barrier = new CyclicBarrier(numberOfResources, () -> {
System.out.println("All resources have been initialized.");
});
for (int i = 0; i < numberOfResources; i++) {
new Thread(new ResourceInitializer(barrier, i)).start();
}
}
static class ResourceInitializer implements Runnable {
private final CyclicBarrier barrier;
private final int resourceId;
public ResourceInitializer(CyclicBarrier barrier, int resourceId) {
this.barrier = barrier;
this.resourceId = resourceId;
}
@Override
public void run() {
try {
System.out.println("Initializing resource " + resourceId);
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Resource " + resourceId + " has been initialized.");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier
的 await()
方法会阻塞线程,直到所有线程都调用了 await()
方法。当所有线程都到达屏障点时,屏障会自动打开,所有线程继续执行。CyclicBarrier
还支持超时机制,可以在指定时间内等待所有线程到达屏障点。
使用 CyclicBarrier
时需要注意以下几点:
await()
方法,否则可能会导致死锁。BrokenBarrierException
,当某个线程在等待时被中断或屏障被重置时,会抛出此异常。reset()
方法),所有等待的线程会收到 BrokenBarrierException
。亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。