这篇文章将为大家详细讲解有关使用BlockingQueue怎么实现阻塞队列,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
package com.shi.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 阻塞队列 * @author shiye * */ public class TestBlockQueue { public static void main(String[] args) throws InterruptedException { //定义一个长度为3的阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println("----------------抛出异常的 情况----------------------"); // blockingQueue.add("aa"); // blockingQueue.add("bb"); // blockingQueue.add("cc"); //blockingQueue.add("dd");//如果队列满了 Exception java.lang.IllegalStateException: Queue full // System.out.println(blockingQueue.element());//检查队列头的信息 : aa // blockingQueue.remove(); // blockingQueue.remove(); // blockingQueue.remove(); //blockingQueue.remove();//如果队列为空 Exception java.util.NoSuchElementException //System.out.println(blockingQueue.element());//如果队列为空 Exception java.util.NoSuchElementException System.out.println("----------------返回true/false----------------------"); // System.out.println(blockingQueue.offer("11"));//插入队列 true // System.out.println(blockingQueue.offer("22"));//插入队列 true // System.out.println(blockingQueue.offer("33"));//插入队列 true // System.out.println(blockingQueue.offer("44"));//插入队列 false // // System.out.println(blockingQueue.peek());//检查队列头元素 11 // // System.out.println(blockingQueue.poll());//输出队列 11 // System.out.println(blockingQueue.poll());//输出队列 22 // System.out.println(blockingQueue.poll());//输出队列 33 // System.out.println(blockingQueue.poll());//输出队列 null System.out.println("----------------队列阻塞等待----------------------"); // blockingQueue.put("zhangsan"); // blockingQueue.put("lisi"); // blockingQueue.put("wangwu"); // //blockingQueue.put("shiye");//线程一直等待无法关闭 // // blockingQueue.take(); // blockingQueue.take(); // blockingQueue.take(); //blockingQueue.take();//线程一直等待 无法响应 System.out.println("----------------队列等待一定时间之后就退出----------------------"); System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//false 等待2s钟之后就退出 } }
SynchronousQueue
package com.shi.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; /** * 不存储数据的队列,即生产一个消费一个的队列 * @author shiye * *结果: AA 存放1 ... BB get 1 AA 存放2 ... BB get 2 AA 存放3 ... BB get 3 */ public class TestSynchroniousQueue { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()-> { try { System.out.println(Thread.currentThread().getName()+ "\t 存放1 ..." ); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+ "\t 存放2 ..." ); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+ "\t 存放3 ..." ); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"AA").start(); new Thread(()-> { try { Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"BB").start(); } }
综合案例(使用阻塞队列实现生产者消费者问题)
package com.shi.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 通过阻塞队列的方式 实现 生产者 消费者 问题 * @author shiye * 使用到的技术: * countDownLatch:闭锁 * volatile 自旋锁 * AtomicInteger 原子整型 * BlockingQueue 阻塞队列 * */ public class TestProducterAndConsumterByQueue { public static void main(String[] args) throws InterruptedException { //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(11); Check check = new Check(new ArrayBlockingQueue<>(3)); //创建线程生产 (启动10个线程去生产) for (int i = 0; i < 10; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t 生产者启动..."); try { check.productor("aaa"); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown();//线程数量减一 },"AA-"+i).start(); } //创建1 个线程消费 new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t 消费者启动..."); try { check.consumter(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown();//线程数量减一 },"BB").start(); Thread.sleep(5000);//等待5秒 停止 check.stop(); countDownLatch.await();//等待上面的线程全部执行完毕,才检查产品数量 System.out.println("5s之后线程停止,总共生产了:"+ check.getTotle() +"件产品"); } } //店员 class Check{ private volatile boolean FLAG = true;//标志位 private AtomicInteger atomicInteger = new AtomicInteger();//统计总数的变量 private BlockingQueue<Object> blockingQueue = null;//定义一个阻塞队列 public Check(BlockingQueue<Object> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println("创建一个 "+blockingQueue.getClass().getName()+" 实例"); } //生产者 public void productor(String num) throws InterruptedException { while(FLAG) { System.out.println( Thread.currentThread().getName() + "\t 生产者生产数据:" + num + "到队列中..."); blockingQueue.offer(num,2l,TimeUnit.SECONDS); //延迟2s插入数据到队列中。。 Thread.sleep(1000);//线程睡眠1s atomicInteger.getAndIncrement();//让总数自加1 } } //消费者 public void consumter() throws InterruptedException { while(FLAG) { Object object = blockingQueue.poll(2, TimeUnit.SECONDS);//最多消费延迟2s if(object != null) { System.out.println( Thread.currentThread().getName() + "\t 消费者消费数据:" + object); } } } //停止 public void stop() { FLAG = false; } public int getTotle() { return atomicInteger.get(); } }
关于使用BlockingQueue怎么实现阻塞队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。