这篇文章主要介绍“JUC的CyclicBarrier怎么实现”,在日常操作中,相信很多人在JUC的CyclicBarrier怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”JUC的CyclicBarrier怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
前面介绍的 CountDownLatch 同步器是基于 AQS 实现的,而本文要介绍的 CyclicBarrier 则没有直接继承 AQS 的 AbstractQueuedSynchronizer 抽象类,而是基于 ReentrantLock 锁进行实现。首先来看一下 CyclicBarrier 的字段定义,如下:
public class CyclicBarrier { /** 支撑 CyclicBarrier 的重入锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 条件队列,已经到达屏障的线程会在条件队列中等待其它线程 */ private final Condition trip = lock.newCondition(); /** 参与的线程数 */ private final int parties; /** 当所有线程都到达屏障时的回调函数 */ private final Runnable barrierCommand; /** 当前年代对象 */ private Generation generation = new Generation(); /** 当前剩余未完成的线程数 */ private int count; // ... 省略方法定义 }
上述各个字段的含义如代码注释,这里我们进一步解释一下 generation 字段,该字段为 Generation 类型,用于表示当前 CyclicBarrier 同步器的年代信息。Generation 内部类定义如下:
private static class Generation { boolean broken = false; }
当新建一个 CyclicBarrier 对象时会初始化 CyclicBarrier#generation
字段。此外,当所有参与的线程都到达屏障后(也称 tripped),或者 CyclicBarrier 被重置(即调用 CyclicBarrier#reset
方法)时,会新建一个 Generation 对象赋值给 CyclicBarrier#generation
字段,表示年代的更替。
Generation 定义的 Generation#broken
属性用于标识当前屏障是否被打破。当 CyclicBarrier 被重置,或者参与到该屏障的某个线程被中断、等待超时,亦或是执行回调函数发生异常,都会导致屏障被打破。破损的屏障(即 broken=true
)会导致当前参与等待的线程,以及已经处于等待状态的线程抛出 BrokenBarrierException 异常,并退出当前屏障进程。
因为 CyclicBarrier 的复用性,导致在程序运行期间可能并存多个年代信息,但是任何时刻只有一个年代对象是活跃的,剩余的年代对象对应的 CyclicBarrier 要么是已经用完的(tripped),要么就是已经破损的。
介绍完了字段定义,下面来分析一下 CyclicBarrier 的方法实现,首先来看一下构造方法。CyclicBarrier 定义了两个构造方法,实现如下:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) { throw new IllegalArgumentException(); } this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中 parties 参数用于指定当前参与的线程数,参数 barrierAction 用于指定当所有参与的线程都到达屏障时的回调逻辑。你可能有些疑问,既然设置了 parties 字段,为什么还要设置一个 count 字段呢?
考虑 CyclicBarrier 是可重用的,所以需要有一个字段记录参与线程的数目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在运行期间其值是会随着参与线程逐一到达屏障而递减的,所以 count 值始终记录的是当前未到达屏障的线程数。当 CyclicBarrier 被重置时,我们需要依据 parties 字段值来重置 count 字段值。
继续来看一下 CyclicBarrier 除构造方法以外的剩余方法实现,主要分析一下 CyclicBarrier#await
方法和 CyclicBarrier#reset
方法。首先来看一下 CyclicBarrier#reset
方法,当我们希望复用 CyclicBarrier 对象时可以调用该方法,用于重置 count 值、年代信息,并唤醒所有位于条件队列中等待的线程。方法实现如下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { this.breakBarrier(); // break the current generation this.nextGeneration(); // start a new generation } finally { lock.unlock(); } } private void breakBarrier() { // 标识当前屏障被打破 generation.broken = true; // 重置 count 字段值 count = parties; // 唤醒所有等待的线程 trip.signalAll(); } private void nextGeneration() { // 唤醒所有等待的线程 trip.signalAll(); // 重置 count 值 count = parties; generation = new Generation(); }
再来看一下 CyclicBarrier#await
方法,该方法用于阻塞当前线程,以在屏障处等待其它线程到达,CyclicBarrier 还为该方法定义了超时等待版本。当一个线程因调用 CyclicBarrier#await
方法进入等待状态时,该线程将会在满足以下条件之一时退出等待状态:
所有参与的线程都已经到达了屏障。
当前线程被中断,或者其它处于等待状态的线程被中断。
如果启用了超时机制,并且某个参与的线程等待超时。
CyclicBarrier 被重置。
方法 CyclicBarrier#await
的普通版本和超时版本在实现上都是直接委托给 CyclicBarrier#dowait
方法执行,所以下面主要来分析一下该方法,实现如下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 获取当前年代信息 final Generation g = generation; // 当前屏障被打破,抛出异常 if (g.broken) { throw new BrokenBarrierException(); } // 当前线程被中断,打破屏障,并唤醒所有等待的线程 if (Thread.interrupted()) { this.breakBarrier(); throw new InterruptedException(); } int index = --count; // 如果 count 值为 0,说明所有的线程都已经到达屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 如果设置了回调,则执行 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; // 唤醒所有等待的线程,并重置屏障 this.nextGeneration(); return 0; } finally { // 如果执行回调异常 if (!ranAction) { this.breakBarrier(); } } } // count 值不为 0,说明存在还未到达屏障的线程,则进入条件队列等待 // loop until tripped, broken, interrupted, or timed out for (; ; ) { try { if (!timed) { // 进入条件队列等待 trip.await(); } else if (nanos > 0L) { // 进入条件队列超时等待 nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { // 当前线程被中断,响应中断 if (g == generation && !g.broken) { this.breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not been interrupted, // so this interrupt is deemed to "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 屏障被打破 if (g.broken) { throw new BrokenBarrierException(); } // 当前 CyclicBarrier 已经被重置 if (g != generation) { return index; } // 等待超时 if (timed && nanos <= 0L) { this.breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁 lock.unlock(); } }
由上述实现我们可以总结线程在调用 CyclicBarrier#await
方法时的整体执行流程。如果当前线程不是最后一个到达屏障的线程(递减 count 值之后仍然大于 0),则调用 Condition#await
方法(或超时版本)将当前线程添加到条件队列中等待。如果当前线程是最后一个到达屏障的线程(递减 count 值之后为 0),则在线程到达屏障后执行:
如果指定了回调逻辑,则执行该回调,如果期间发生任何异常,则打破屏障、重置 count 值,并唤醒条件队列中所有等待的线程;
否则,继续调用 CyclicBarrier#nextGeneration
方法唤醒条件队列中所有等待的线程,并重置 count 值和年代信息。
在上述过程中如果当前线程或处于等待状态的线程被中断、屏障被打破、年代信息发生变化,或者等待超时(如果允许的话),则线程将会从 Condition#await
方法中退出,即当前屏障失效。
到此,关于“JUC的CyclicBarrier怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。