温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger

发布时间:2020-07-29 20:46:57 来源:网络 阅读:718 作者:SeekerWu 栏目:建站服务器

       CountDownLatch允许一个或者多个线程等待其他线程完成操作,之后再对结果做统一处理;

        适用场景,分布式系统中对多个微服务的调用,并发执行并且必须等待全部执行完成才能继续执行后续操作;



        其实在java中默认的实现是join()方法,join()方法主要的作用是当前线程必须等待直到join线程执行完成之后才能继续执行后续的操作,

        其本质就是轮询判断join线程是否存活,如果存活则主线程继续等待,否则,通过调用this.notifyAll()方法来继续执行主线程。

        实例代码如下:

public static void main(String[] args) throws InterruptedException {

        Thread thread1 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("this is thread 1");

        }

        });

        Thread thread2 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("Thread2 is finish");

        }

        });


        thread1.start();

        thread2.start();

        /*thread1.join();

        thread2.join();*/ (1)

        System.out.println("all parser finish");

        }

        现在的代码是注释掉了两个join()方法的调用,那么输出结果将不能被保证,三个sout的输出打印是乱序的。

        如果将上述的注释(1)去掉,则根据join()方法的定义,可以知main线程会先等待thread1的执行结束才会执行thread2的执行,直到thread2执行结束才会继续往下执行输出:

        "all parser finish";从而保证执行顺序固定,即线程thread1先执行,其次是thread2的执行,最后main线程执行最后的输出。


        那么同样的如果我们用CountDownLatch来实现,则应用代码如下:

static CountDownLatch c =  new CountDownLatch(2);//定义成员变量

public static void main(String[] args) throws InterruptedException{

        new Thread(new Runnable() {

@Override

public void run() {

        System.out.println(1);

        c.countDown();

        System.out.println(2);

        c.countDown();

        }

        }).start();

        c.await();

        System.out.println(3);

        }


        其中定义的CountDownLatch(2)表示等待两个点完成,即当c变成0以后当前线程才会继续执行后续代码,否则由于await()方法,线程会一直等待;

        而每次调用countDown()方法则c就会减一,上述代码在输出1,2之后,因为调用两次countDown之后c变成0,那么c.await()方法会失效,然后main()线程执行最后输出3;


        如果我们要等待的是多个线程的并发执行,则代码如下

static CountDownLatch c  = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {

        Thread thread1 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("this is thread 1");

        try {

        Thread.sleep(100l);

        } catch (InterruptedException e) {

        e.printStackTrace();

        }

        c.countDown();

        }

        });

        Thread thread2 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("Thread2 is finish");

        c.countDown();

        }

        });


        thread1.start();

        thread2.start();

        c.await();

        System.out.println("all parser finish");

        }

        上述代码中我们可以保证的是main线程会等待thread1和thread2线程的执行完成,之后再执行最后的打印,但是不保证thead1和thread2执行的先后顺序即有可能thread1先执行,也有可能thread2先执行;在这一点上有别与join方法,join方法可以保证其是按照调用顺序来执行的。

        注意:在使用CountDownLatch()的过程中必须保证count次数大于0,因为只有count次数大于0才能保证await()方法调用的阻塞。

        等待多线程完成的CountDownLatch和join()方法的使用就到这里结束了。

        CyclicBarrier:同步屏障,作用是使得一组线程到达一个同步点时被阻塞,直到所有线程都到达屏障时,屏障才会消失,所有被拦截的线程才可以继续执行。

        CyclicBarrier的使用方式和CountDownLatch类似;实例代码如下:

static CyclicBarrier c = new CyclicBarrier(2);


public static void main(String[] args) throws InterruptedException {

        new Thread(new Runnable() {

@Override

public void run() {

        try {

        c.await();

        } catch (Exception e) {

        e.printStackTrace();

        }

        System.out.println(1);


        }

        }).start();

        try {

        c.await();

        } catch (Exception e) {

        e.printStackTrace();

        }

        System.out.println(2);


        }

        上述代码的执行结果可能是1,2也可能是2,1,c并没有保证线程的顺序,从目前来看,CyclicBarrier和CountDownLatch几乎实现的是一样的功能。

        但是CyclicBarrier有更强大的功能,即通过构造函数:new CyclicBarrier(int parties,Runnable barrierAction)来保证线程到达同步点的时候,优先执行barrierAction中的任务。实例代码如下:

static CyclicBarrier c = new CyclicBarrier(2, new PrThread());


public static void main(String[] args) {

        new Thread(new Runnable() {

@Override

public void run() {

        try {

        c.await();

        } catch (InterruptedException e) {

        e.printStackTrace();

        } catch (BrokenBarrierException e) {

        e.printStackTrace();

        }

        System.out.println(1);

        }

        }).start();

        try {

        c.await();

        } catch (InterruptedException e) {

        e.printStackTrace();

        } catch (BrokenBarrierException e) {

        e.printStackTrace();

        }

        System.out.println(2);

        }


static class PrThread implements Runnable {


    @Override

    public void run() {

        System.out.println(3);

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

其中输出顺序被保证为3,1,2,因为count设置为2,所以必须在第一个线程和线程PrThread执行完成之后才能执行主线程,完成输出。

        CyclicBarrier适用于多线程计算最后合并结果的场景。

        还有一点就是CountDownLatch()方法只能用一次,而CyclicBarrier可以通过reset()方法重复调用。至于其他方法比如getNumberWaiting可以获取CyclicBarrier阻塞的线程数等。

        对CyclicBarrier的使用到这就结束了。

        Semaphone:信号量是用来控制同时访问特定资源的线程数量,以保证合理使用有限的公共资源。常用场景是流量控制。实例代码如下:

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);


public static void main(String[] args) {

        for (int i = 0; i <THREAD_COUNT ; i++) {

        threadPool.execute(new Runnable() {

@Override

public void run() {

        try {

        s.acquire();

        System.out.println("DO SOMETHING FOR YOURSELF"+s.getQueueLength());

        s.release();

        } catch (InterruptedException e) {

        e.printStackTrace();

        }

        }

        });




        }

        threadPool.shutdown();

        }

        ,其中构造线程池大小为30,而在同一时刻只允许10个线程执行输出;s.acquire()要求获取一个许可,而s.release()表示释放获取到的许可,当线程数超过可用的许可数,则进入等待状态,直到有许可可用,才会继续执行下一个任务。


        信号量的使用到这里就结束了,我们最后再说线程间的交换数据,其实就是线程之间的数据传递:

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);


    public static void main(String[] args) {

        threadPool.execute(new Runnable() {

            @Override

            public void run() {

                try {

                    String thread1 = "要交换的数据1";

                    exgr.exchange(thread1);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        });

        threadPool.execute(new Runnable() {

            @Override

            public void run() {

                try {

                    String thread2 = "要交换的数据1";

                    String exchange = exgr.exchange("thread2");

                    System.out.println(exchange.equals(thread2));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }


            }

        });

        threadPool.shutdown();

    }


};

其中exchange为从thread1中拿到的要对比的数据,然后和thread2做对比,如果是相当则输出true.

        其中exgr携带了需要交互的数据信息。

        到此Exchanger的使用结束。

        并发编程中用的比较多的就是CountDownLatch和CyclicBarrier和Semaphore。所以了解这些有助于我们以后更好的编程


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI