这篇文章主要介绍了JUC并发编程LinkedBlockingQueue队列源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇JUC并发编程LinkedBlockingQueue队列源码分析文章都会有所收获,下面我们一起来看看吧。
在JUC包下关于线程安全的队列实现有很多,那么此篇文章讲解LinkedBlockingQueue的实现原理,相信各位读者在线程池中能看到LinkedBlockingQueue或者SynchronousQueue队列来作为储存任务和消费任务的通道。一个并发安全的队列,在多线程中充当着安全的传输任务的责任。
既然是介绍LinkedBlockingQueue,那么从构造方法入手最合适不过。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化一个伪节点,让head和last都指向这个伪节点 // 为什么需要伪节点的存在? // 因为可以保证不会发生极端情况(假设没有伪节点,并且只存在一个节点的情况下,生产者和消费者并发执行就可能出现极端情况) last = head = new Node<E>(null); }
为什么需要存在伪节点,因为可以保证不会发生极端情况(假设没有伪节点,并且只存在一个节点的情况下,生产者和消费者并发执行就可能出现极端情况,用伪节点就能很好的解决这个极端问题)
/** * 因为是队列,用链表实现,所以头尾指针肯定不可少。 * */ transient Node<E> head; private transient Node<E> last; /** * 我们可以很清楚的看到,这里使用了2套ReentrantLock和对应的condition条件等待队列。 * 目的也很明显,让生产者和消费者并行。 * */ private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
2套ReentrantLock和对应的condition条件等待队列,很明显目的是为了让生产者和消费者并行,所以就需要一个伪节点处理极端并发情况。
为了,一些没有接触过队列的读者,所以这里还是介绍一下API把
API | 用途 | 注意事项 |
offer | 生产者 | 不会阻塞,如果插入失败,或者队列已经满了,直接返回 |
poll | 消费者 | 不会阻塞,如果消费失败,或者队列当前为空,直接返回 |
put | 生产者 | 会阻塞,如果插入失败或者队列已经满了,阻塞直到插入成功 |
take | 消费者 | 会阻塞,如果消费失败或者当前队列为空,阻塞直到消费成功 |
public void put(E e) throws InterruptedException { // 不能插入null if (e == null) throw new NullPointerException(); int c = -1; // 创建插入的节点。 Node<E> node = new Node<E>(e); // 拿到生产者的锁对象 final ReentrantLock putLock = this.putLock; // 拿到全局计数器,注意这里用的是AtomicInteger,所以自增的原子性已经保证。 final AtomicInteger count = this.count; // 上的是可响应中断锁。 putLock.lockInterruptibly(); try { // 如果当前队列已经满了,此时我们就要去阻塞,等待队列被消费,我们要被唤醒,醒来生产节点。 while (count.get() == capacity) { // 进入条件等待队列阻塞。 // 注意,只要阻塞,是会释放锁的,其他生产者线程可以抢到锁。 notFull.await(); } // 插入到队列尾部 enqueue(node); // 因为插入了节点,所以全局计数需要+1 // 但是这里请注意细节,getAndIncrement方法返回的是旧值。 c = count.getAndIncrement(); // 这里是一个很sao的点 // 注意,这里只要当前队列没满,唤醒的是生产者的条件等待队列。 // 为什么要这么做? // 很简单,首先需要考虑,生产者和消费者是并发执行了。 // 其次,只要队列没满就能一直生产,那么队列一旦满了后,后来的线程就都去条件队列阻塞,所以线程生产完一个节点就有必要去唤醒等待的同胞(不管有没有同胞在阻塞,这是义务) if (c + 1 < capacity) // 唤醒条件等待队列中头部节点。 notFull.signal(); } finally { putLock.unlock(); } // 这里也是一个很sao的点 // 再次强调,getAndIncrement方法是返回的旧值 // 所以当前生产者如果生产的是第一个节点,那么c ==0 // 而队列中没有节点,消费者是要阻塞的 // 也即,这里给队列生产了一个节点,要唤醒消费者去消费节点。 if (c == 0) signalNotEmpty(); } // 插入到队列尾部 // 因为ReentrantLock保证了整体的原子性,所以这里细节部分不需要保证原子性了。 private void enqueue(Node<E> node) { // 插入到尾部 last = last.next = node; }
第一次看到这个代码难免会发生震撼,为什么在生产者代码里面唤醒生产者?不是正常写的生产者消费者模型,不都是生产者生产一个唤醒消费者消费吗?怎么这里不一样??????
因为这里生产者和消费者并行处理,当队列满了以后,后来的生产者线程都会去阻塞,所以生产者线程生产完一个节点就有必要去唤醒等待的同胞(不管有没有同胞在阻塞,这是义务)
大致流程如下:
创建Node节点
上生产者锁
如果队列已经满了,就去生产者条件队列阻塞
如果没满,或者唤醒后,就插入到last指针的后面
全局节点计数器+1
如果当前队列还有空间,就唤醒在阻塞的同胞。
释放锁
如果在生产之前队列为空,本次生产后就需要唤醒在阻塞的消费者线程,让他们醒来消费我刚生产的节点
public E take() throws InterruptedException { E x; int c = -1; // 全局计数器 final AtomicInteger count = this.count; // 消费者的锁对象 final ReentrantLock takeLock = this.takeLock; // 可响应中断锁。 takeLock.lockInterruptibly(); try { // 如果当前队列中没有节点,此时消费者需要去阻塞,因为不阻塞他只会浪费CPU性能,又消费不到节点。 while (count.get() == 0) { // 去消费者的条件队列阻塞。 notEmpty.await(); } // 醒来后,去消费节点。 x = dequeue(); // 给全局计数器-1,但是这里也要注意,返回的是旧值 c = count.getAndDecrement(); // 如果队列中还有节点就唤醒其他消费者去消费节点。 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 这里也是一个sao点 // 请注意,这里的c是旧值,因为getAndDecrement返回的是旧值 // 所以,如果当前消费线程消费节点之前队列是满的,当消费完毕后,我有必要去唤醒因为队列满了而阻塞等待的生产者,因为当前已经空出一个空间了。 if (c == capacity) // 唤醒生产者 signalNotFull(); return x; } // 消费者消费节点 // 所以需要HelpGC // 不过这里要注意,head都是指向伪节点。 private E dequeue() { // 拿到头节点, Node<E> h = head; // 拿到头节点的next节点,next节点作为下一个head节点。 // 因为head节点是指向伪节点,所以head.next节点就是当前要消费的节点。 Node<E> first = h.next; // 将当前的头结点的next指向自己。 h.next = h; // help GC // 设置新的头结点,也即把当前消费的节点做为下次的伪节点 // head节点指向的都是伪节点 head = first; // 拿到当前消费者想要的数据 E x = first.item; first.item = null; return x; }
这里跟put生产者基本思想一致,只不过这里是消费者,因为是生产者消费者并行,所以这里也是唤醒同胞,因为当队列为空所有的消费者都会阻塞,所以每次消费者线程消费完节点后 ,有义务唤醒同胞。
大致流程如下:
拿到全局计数器
上消费者锁
如果当前队列为空,当前消费者线程就要去阻塞
如果不为空,或者被唤醒以后消费节点,把消费的节点作为下一次的伪节点,也即作为head节点
全局计数器-1
唤醒同胞
释放锁
如果在消费之前队列已经满了,那么可能会有生产者线程在阻塞,所以我有义务去唤醒他们
关于“JUC并发编程LinkedBlockingQueue队列源码分析”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“JUC并发编程LinkedBlockingQueue队列源码分析”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。