温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java中怎么使用BlockingQueue实现并发

发布时间:2021-08-03 15:46:55 来源:亿速云 阅读:132 作者:Leah 栏目:编程语言

Java中怎么使用BlockingQueue实现并发,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。


1 概述

阻塞队列(BlockingQueue)是一个支持两种附加操作的队列。支持附加阻塞的插入和移除操作。

  • 支持阻塞的插入:当队列满时,插入操作会被阻塞,直到队列不满。

  • 支持阻塞的移除:当队列空时,移除操作会被阻塞,直到队列不空。

阻塞队列不可用时,操作处理方式

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e, time, unit)
移除方法remove()poll()take()poll(time, unit)
检查方法element()peek()
  • 抛出异常:队列满时,若继续插入元素会抛出IllegalStateException;当队列为空时,若获取元素则会抛出NoSuchElementException异常。

  • 返回特殊值:向队列插入元素时,会返回是否插入成功true/false;获取元素时,成功则返回元素,失败则返回null。

  • 一直阻塞:当阻塞队列满时,若继续使用put新增元素时会被阻塞,直到队列不为空或者响应中断退出;当阻塞队列为空时,继续使用take获取元素时会被阻塞,直到队列不为空。

  • 超时退出:当阻塞队列满时,使用offer(e, time, unit)新增元素会被阻塞至超时退出;当队列为空时,使用poll(time, unit)获取元素时会被阻塞至超时退出。

注意:

  • 阻塞队列中不允许插入null,会抛出NPE异常。

  • 可以访问阻塞队列中的任意元素,调用remove(Object o)可以将队列之中的特定对象移除,但会遍历全部元素,并不高效。

2 阻塞队列的实现

2.1 ArrayBlockingQueue

由数组构成的有界阻塞队列,内部由数组final Object[] items实现。默认情况下不保证线程公平的访问队列,所谓公平访问队列指阻塞的线程,可以按照阻塞的先后顺序访问队列。

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);  // 使用公平锁/非公平锁
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

队列大小初始化后不可修改。参数fair控制内部ReentrantLock是否采用公平锁。

2.2 LinkedBlockingQueue

链表实现的有界阻塞队列。内部结构是单链表。默认大小为Integer.MAX_VALUE,可以指定大小。

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  // 指定队列大小
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

// 单链表节点Node
static class Node<E> {
  E item;
  Node<E> next;
  Node(E x) { item = x; }
}
2.3 PriorityBlockingQueue

支持优先级的无界阻塞队列。默认情况下采取自然顺序升序排列。也可以自定义compareTo()方法来指定元素的排列顺序,或者初始化队列时,指定构造参数Comparator来对元素进行排序。同优先级顺序无法保证。

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();  // 非公平锁
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}


// offer方法部分代码
Comparator<? super E> cmp = comparator;
if (cmp == null)
  siftUpComparable(n, e, array);
else
  siftUpUsingComparator(n, e, array, cmp);

由offer代码可以看出,Comparator的优先级是大于Comparable.compareTo方法的。

注意:PriorityBlockingQueue不会阻塞数据生产者(队列无界),只会在没有数据时阻塞消费者。生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则将有可能耗尽堆空间。

2.4 DelayQueue

支持延时获取元素的无界队列。队列使用PriorityQueue实现。队列中的元素必须实现java.util.concurrent.Delayed接口,在创建元素时指定多久才能才能从队列中取到元素。

DelayQueue非常有用,可以将DelayQueu应用在以下应用场景。

  • 缓存系统的设计:用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能获取到元素时,表示缓存有限期到了。

  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如TimerQueue就是使用DelayQueue实现的。

2.5 SynchronousQueue

不存储元素的阻塞队列。每个put操作都必须等待一个take操作,反之亦然。

// fair为true,等待线程将以FIFO的顺序进行访问
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

将生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue

3 阻塞队列的原理

利用Lock锁的多条件(Condition)阻塞控制。下面简单分析下ArrayBlockingQueue部分代码。

3.1 ArrayBlockingQueue属性
/** The queued items */
// 数据元素数组
final Object[] items;

/** items index for next take, poll, peek or remove */
// 下一个待获取元素索引
int takeIndex;

/** items index for next put, offer, or add */
// 下一个待插入元素索引
int putIndex;

/** Number of elements in the queue */
// 队列中元素个数
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
// 所有访问的主锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 消费者监视器
private final Condition notEmpty;

/** Condition for waiting puts */
// 生产者监视器
private final Condition notFull;


// 
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
3.2 put操作
// 在队列尾部插入元素,若队列已满则等待队列非满。
public void put(E e) throws InterruptedException {
  // 校验插入元素,为空则抛出NPE
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  // 1. 尝试获取锁(响应中断)
  lock.lockInterruptibly();
  try {
    // 2. 当队列满时
    while (count == items.length)
      // 2.1 若队列满,则阻塞当前线程。等待`notFull.signal()`唤醒。
      notFull.await();
    // 3. 非满则执行入队操作
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

// 在`putIndex`处放置当前元素,只有获取lock锁后才会调用
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  // 在`putIndex`处放置元素
  items[putIndex] = x;
  // putIndex等于数组长度时,重置为0索引。
  if (++putIndex == items.length)
    putIndex = 0;
  // 数量加1
  count++;
  // 4. 唤醒一个等待线程(等待取元素的线程)
  notEmpty.signal();
}

put总体流程:

  1. 获取lock锁,拿到锁后继续执行,否则自旋竞争锁。

  2. 判断阻塞队列是否满。满了了则调用await阻塞当前线程。同时释放lock锁。

  3. 如果没满,则调用enqueue方法将元素put进阻塞队列。此时还有一种可能是:第2步中被阻塞的线程被唤醒且又拿到了lock锁。

  4. 唤醒一个标记为notEmpty(消费者)的线程。

3.3 take操作
// 从头部获取元素,若队列为空则等待队列非空。
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 1. 获取锁
  lock.lockInterruptibly();
  try {
    // 2. 当队列为空时
    while (count == 0)
      // 2.1 当队列为空时,阻塞当前线程。等待`notEmpty.signal()`唤醒。
      notEmpty.await();
    // 3. 非空则进行入队操作
    return dequeue();
  } finally {
    lock.unlock();
  }
}

// 从`takeIndex`位置获取当前元素,只有获取到lock锁后才会调用
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  // 从`takeIndex`位置获取元素,然后清除该位置元素
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  // 
  if (++takeIndex == items.length)
    takeIndex = 0;
  // 队列元素减1
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 4. 唤醒一个标记为notFull(生产者)的线程
  notFull.signal();
  return x;
}

take的整体流程:

  1. 获取lock锁,拿到锁则执行下一步流程;未拿到则自旋竞争锁。

  2. 当前队列是否为空,若为空则调用notEmpty.await阻塞当前线程,同时释放锁,等待被唤醒。

  3. 若非空,则调用dequeue进行出队操作。此时还有一种可能:第2步中的阻塞的线程被唤醒并且又拿到了lock锁。

  4. 唤醒一个被标记为notFull(生产者)的线程。

3.4 总结
  1. puttake操作都需要先获得锁,没有获得锁的线程无法进行操作。

  2. 拿到锁后,并不一定能顺利执行put/take操作,还需要判断队列是否可用(是否满/空),不可用则会被阻塞,并释放锁。

  3. 在2中被阻塞的线程会被唤醒,但唤醒之后依然需要拿到锁之后才能继续向下执行。否则,自旋拿锁,拿到锁后再while判断队列是否可用。

看完上述内容,你们掌握Java中怎么使用BlockingQueue实现并发的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI