这篇文章主要讲解了“Java编程生产者消费者实现的方法有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java编程生产者消费者实现的方法有哪些”吧!
实现生产者消费者的四种方式
一、最基础的
二、java.util.concurrent.lock 中的 Lock 框架
三、阻塞队列BlockingQueue的实现
Blockqueue 接口的一些方法
四、信号量 Semaphore 的实现
利用 wait() 和 notify() 方法实现,当缓冲区满或为空时都调用 wait() 方法等待,当生产者生产了一个产品或消费者消费了一个产品后会唤醒所有线程;
package com.practice; public class testMain { private static Integer count = 0; private static final Integer FULL = 10; private static String LOCK = "lock"; public static void main(String[] args) { testMain testMain = new testMain(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } synchronized (LOCK){ while(count == FULL){//缓存空间满了 try{ LOCK.wait();//线程阻塞 }catch (Exception e){ e.printStackTrace(); } } count++;//生产者 System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有"+count); LOCK.notifyAll();//唤醒所有线程 } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } synchronized (LOCK){ while(count == 0){ try{ LOCK.wait(); }catch (Exception e){ } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有 "+count); LOCK.notifyAll();//唤醒所有线程 } } } } }
通过对 lock 的 lock() 方法和 unlock() 方法实现对锁的显示控制,而 synchronize()
则是对锁的隐形控制,可重入锁也叫做递归锁,指的是同一个线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响;
简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获计数器就加1,函数调用结束计数器就减1,然后锁需要释放两次才能获得真正释放,已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private static Integer count = 0; private static Integer FULL = 10; //创建一个锁对象 private Lock lock = new ReentrantLock(); //创建两个条件变量,一个为缓冲非满,一个缓冲区非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args){ ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } // 获取锁 lock.lock(); try { while (count == FULL) { try{ notFull.await(); }catch(InterruptedException e){ e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); }finally { lock.unlock(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); } catch (Exception e){ e.printStackTrace(); } lock.lock(); try{ while(count==0){ try{ notEmpty.await(); }catch (InterruptedException e){ e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有 " + count); }finally { lock.unlock();//解锁 } } } } }
被阻塞的情况主要分为如下两种,BlockingQueue 是线程安全的
1,当队列满了的时候进行入队操作;
2,当队列空的时候进行出队操作
四类方法分别对应于:
1,ThrowsException,如果操作不能马上进行,则抛出异常;
2,SpecialValue 如果操作不能马上进行,将会返回一个特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定时间未执行返回一个特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 实现生产者消费模型 */ public class BlockQueueTest { public static Integer count = 0; //创建一个阻塞队列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } try{ blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有 " + count); }catch (InterruptedException e){ e.printStackTrace(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ blockingQueue.take();//消费 count--; System.out.println(Thread.currentThread().getName() + " 消费者消费,目前总共有 "+ count); }catch (InterruptedException e){ e.printStackTrace(); } } } } }
Semaphore (信号量) 用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Java中的 Semaphone 维护了一个许可集,一开始设定这个许可集的数量,使用 acquire()
方法获得一个许可,当许可不足时会被阻塞,release()
添加一个许可。
下面代码中,还加入了 mutex
信号量,维护消费者和生产者之间的同步关系,保证生产者消费者之间的交替进行
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Integer count = 0; //创建三个信号量 final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1);//互斥锁,控制共享数据的互斥访问 public static void main(String[] args) { SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ notFull.acquire();//获取一个信号量 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有 "+count); } catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release();//添加 notEmpty.release(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch(InterruptedException e){ e.printStackTrace(); } try{ notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有"+count); }catch (InterruptedException e){ e.printStackTrace(); }finally { mutex.release(); notFull.release(); } } } } }
感谢各位的阅读,以上就是“Java编程生产者消费者实现的方法有哪些”的内容了,经过本文的学习后,相信大家对Java编程生产者消费者实现的方法有哪些这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。