温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

JUC的LinkedBlockingQueue如何实现

发布时间:2021-12-21 10:23:36 来源:亿速云 阅读:129 作者:iii 栏目:大数据

本篇内容介绍了“JUC的LinkedBlockingQueue如何实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

由 Blocking 字样可以推断出 LinkedBlockingQueue 是阻塞队列,前面我们介绍过阻塞队列和非阻塞队列在实现上的区别,知道阻塞队列一般是基于锁机制来保证线程安全,本文我们就一起来分析一下 LinkedBlockingQueue 是如何基于锁构建线程安全队列的。

同样由 Linked 关键字我们可以推断出 LinkedBlockingQueue 底层依赖于链表实现,在 LinkedBlockingQueue 的内部实现了一个单链表,用以存放队列元素。其中,结点 Node 类定义如下:

static class Node<E> {

    E item;
    Node<E> next;

    Node(E x) {
        item = x;
    }
}

其中 next 指针的指向分为 3 种情况:

  1. 指向某个具体的后继结点。

  2. 指向自己,意味着后继结点为 head.next

  3. 指向 null,说明当前结点是队列的尾结点,没有后继结点。

LinkedBlockingQueue 定义了 head 和 last 指针分别指向队列的头结点和尾结点。此外,LinkedBlockingQueue 还定义了如下字段:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

    /** 当前队列的容量上限 */
    private final int capacity;
    /** 记录当前队列的元素个数 */
    private final AtomicInteger count = new AtomicInteger();

    /** 队列头结点 */
    transient Node<E> head;
    /** 队列尾结点 */
    private transient Node<E> last;

    /** 用于控制 take、poll 等操作,保证同一时间只有一个线程从队列获取元素 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 条件队列,记录出队列时因为队列为空而等待的线程 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 用户控制 put、offer 等操作,保证同一时间只有一个线程往队列添加元素 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 条件队列,记录入队列时因为队列已满而等待的线程 */
    private final Condition notFull = putLock.newCondition();

    // ... 省略方法定义

}

由上述字段定义可以看出,LinkedBlockingQueue 限制了队列的容量上限,并使用 AtomicInteger 类型字段对队列中的元素个数进行计数。虽然 LinkedBlockingQueue 底层依赖于链表实现,理论上是无界的,但是 LinkedBlockingQueue 在实现上却限制了队列的容量上限(默认为 Integer.MAX_VALUE)。

此外,针对出队列和入队列操作,LinkedBlockingQueue 分别设置了一把独占可重入锁,即 takeLock 和 putLock,从而保证同一时间只有一个线程执行出队列操作,只有一个线程执行入队列操作,且出队列的线程与入队列的线程彼此之间不相互影响。针对一些阻塞版本的出队列入队列方法,如果队列为空,则出队列线程会被记录到条件队列 notEmpty 中进行等待,如果队列已满,则入队列线程会被记录到条件队列 notFull 中进行等待。

BlockingQueue 接口

BlockingQueue 接口继承自 Queue 接口,用于描述阻塞队列。当队列无法及时响应用户请求时,例如当我们尝试从空队列中获取元素,或者继续往已满的有界队列中添加元素,BlockingQueue 定义了以下 4 种响应形式:

  1. 抛出异常。

  2. 立即返回特殊值,例如 null 或 false。

  3. 无限期阻塞当前请求,直到队列状态变为可用。

  4. 超时阻塞当前请求,直到队列状态变为可用。

BlockingQueue 接口的定义如下:

public interface BlockingQueue<E> extends Queue<E> {

    boolean offer(E e);
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean add(E e);
    void put(E e) throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    E take() throws InterruptedException;

    boolean remove(Object o);

    boolean contains(Object o);
    int remainingCapacity();

    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);

}

针对各方法的含义说明如下:

  • offer:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则返回 false,而不是抛出异常。BlockingQueue 同时还声明了超时版本的 offer 方法。

  • add:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则抛出 IllegalStateException 异常。

  • put:往队列中添加元素,对于有界队列来说,如果队列已满则阻塞当前请求,期间支持响应中断。

  • poll:移除队列头结点,并返回结点元素值,如果队列为空则等待指定时间,并在超时时返回 null,期间支持响应中断。

  • take:仅获取头结点元素值而不删除结点,如果队列为空则阻塞等待,期间支持响应中断。

  • remove:接收一个参数,从队列中删除值等于该参数的结点,如果存在多个结点满足要求,则删除第一个。

  • contains:接收一个参数,判断队列中是否存在值等于该参数的结点。

  • remainingCapacity:返回队列的剩余容量,如果是无界队列,则返回 Integer.MAX_VALUE

  • drainTo:从队列中移除所有(或指定个数)结点,并将结点元素放入参数指定的集合中返回,相对于逐个移除更加高效。

核心方法实现

LinkedBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。

添加元素:offer & add & put

针对添加元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#offerLinkedBlockingQueue#addLinkedBlockingQueue#put 方法,其中 LinkedBlockingQueue#add 是对 LinkedBlockingQueue#offer 的封装,并在队列已满时抛出 IllegalStateException 异常。

下面主要展开分析 LinkedBlockingQueue#offerLinkedBlockingQueue#put 方法的实现。首先来看一下 LinkedBlockingQueue#offer 方法,实现如下:

public boolean offer(E e) {
    // 不允许添加 null 元素
    if (e == null) {
        throw new NullPointerException();
    }
    final AtomicInteger count = this.count;
    // 当前队列已满,直接返回 false
    if (count.get() == capacity) {
        return false;
    }
    int c = -1;
    // 创建待添加元素对应的结点对象
    Node<E> node = new Node<>(e);
    final ReentrantLock putLock = this.putLock;
    // 加锁
    putLock.lock();
    try {
        // 再次校验队列是否已满
        if (count.get() < capacity) {
            // 往队列末端追加结点
            this.enqueue(node);
            // 队列元素个数计数加 1,并返回添加之前队列的大小
            c = count.getAndIncrement();
            // 当前队列在执行添加操作之后仍然存在空闲位置,尝试唤醒一个之前因为队列已满而等待的线程
            if (c + 1 < capacity) {
                notFull.signal();
            }
        }
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // c == 0 说明队列中至少存在一个元素(当前添加的),尝试唤醒一个之前因为队列为空而等待的线程
    if (c == 0) {
        this.signalNotEmpty();
    }
    return c >= 0;
}

与 ConcurrentLinkedQueue 一样,LinkedBlockingQueue 同样不允许往其中添加 null 元素。如果队列已满,则上述方法会直接返回 false,表示添加失败,否则创建待添加元素对应的结点对象,并继续执行:

  1. 加锁,保证同一时间只有一个线程在执行添加操作;

  2. 再次校验队列是否已满,如果已满则跳转至步骤 5,否则执行 LinkedBlockingQueue#enqueue 方法往队列末端插入结点;

  3. 结点个数计数加 1;

  4. 如果在完成本次添加操作之后,队列仍然未满,则尝试唤醒一个之前因为队列已满而等待的线程;

  5. 释放锁;

  6. 如果本次成功添加了一个元素,则调用 LinkedBlockingQueue#signalNotEmpty 方法尝试唤醒一个之前因为队列为空而等待的线程;

  7. 返回。

其中 LinkedBlockingQueue#signalNotEmpty 方法的实现比较简单,读者可以参考源码实现。这里简单提一下 LinkedBlockingQueue#enqueue 方法,实现如下:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

在 LinkedBlockingQueue 对象被构造出来时,head 和 last 指针均指向一个元素值为 null 的标记结点。由上述方法的实现可以看出当执行入队列操作时,是将结点赋值给 last 结点的 next 指针,并没有移除队列头部的 null 结点,下文在介绍出队列操作时返回的都是 head.next 结点元素值,理解了上述插入操作的执行过程也就不会疑惑为什么出队列时不是直接返回 head 结点的元素值。

LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#offer(E, long, TimeUnit) 方法,当队列已满时,该方法会阻塞等待指定的时间。

下面再来看一下 LinkedBlockingQueue#put 方法,相对于上面介绍的 LinkedBlockingQueue#offer 方法,对于有界队列而言,如果队列已满则该方法将无限期阻塞,方法实现如下:

public void put(E e) throws InterruptedException {
    // 不允许添加 null 元素
    if (e == null) {
        throw new NullPointerException();
    }
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 加锁,期间支持响应中断
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is not protected by lock.
         * This works because count can only decrease at this point (all other puts are shut out by lock),
         * and we (or some other waiting put) are signalled if it ever changes from capacity.
         * Similarly for all other uses of count in other wait guards.
         */

        // 队列已满,则等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 执行入队列操作
        this.enqueue(node);
        // 队列元素个数计数加 1,并返回添加之前队列的大小
        c = count.getAndIncrement();
        // 当前队列在执行添加操作之后仍然存在空闲位置,尝试唤醒一个之前因为队列已满而等待的线程
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // c == 0 说明队列中至少存在一个元素(当前添加的),尝试唤醒一个之前因为队列为空而等待的线程
    if (c == 0) {
        this.signalNotEmpty();
    }
}

由上述实现可以看出,相对于 LinkedBlockingQueue#offer 方法在队列已满时的直接返回,方法 LinkedBlockingQueue#put 会将当前线程添加到条件队列中等待其它线程释放队列空间。

获取元素:poll & peek & take

针对获取元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#pollLinkedBlockingQueue#peekLinkedBlockingQueue#take 方法,其中 LinkedBlockingQueue#peek 方法仅获取队列头结点元素值,而不移除头结点,实现上比较简单。下面展开分析 LinkedBlockingQueue#pollLinkedBlockingQueue#take 方法的实现机制。

首先来看一下 LinkedBlockingQueue#poll 方法,LinkedBlockingQueue 针对该方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:

public E poll() {
    final AtomicInteger count = this.count;
    // 当前队列为空,直接返回 null
    if (count.get() == 0) {
        return null;
    }
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lock();
    try {
        // 如果当前队列不为空
        if (count.get() > 0) {
            // 获取队列头结点元素,并移除头结点
            x = this.dequeue();
            // 队列元素计数值减 1,这里返回的是减 1 之前的值
            c = count.getAndDecrement();
            // 队列在执行移除操作后至少还存在一个元素,尝试唤醒一个之前因为队列为空而阻塞的线程
            if (c > 1) {
                notEmpty.signal();
            }
        }
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    /*
     * 之前队列已满,但是经过本次 poll 操作之后,至少有一个空闲位置,
     * 尝试唤醒一个之前因为队列已满而阻塞的线程
     */
    if (c == capacity) {
        this.signalNotFull();
    }
    return x;
}

如果队列为空则上述方法会直接返回 null,否则继续执行:

  1. 加锁,保证同一时间只有一个线程在执行获取操作;

  2. 再次校验队列是否为空,如果为空则跳转至步骤 5,否则执行 LinkedBlockingQueue#dequeue 方法移除队列头结点,并返回结点元素值;

  3. 结点个数计数减 1;

  4. 如果在完成本次移除操作之后,队列仍然非空,则尝试唤醒一个之前因为队列为空而等待的线程;

  5. 释放锁;

  6. 如果本次成功移除了一个元素,则调用 LinkedBlockingQueue#signalNotFull 方法尝试唤醒一个之前因为队列已满而等待的线程;

  7. 返回。

其中 LinkedBlockingQueue#signalNotFull 方法的实现比较简单,读者可以参考源码实现。前面我们分析了入队列 LinkedBlockingQueue#enqueue 方法,下面来看一下出队列方法,实现如下:

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    // 自引用,等待 GC 回收
    h.next = h; // help GC
    head = first;
    // 获取真正队列头结点的元素值
    E x = first.item;
    // 将队列头结点元素值置为 null
    first.item = null;
    return x;
}

理解了前面入队列的过程,则上述出队列的实现也就一目了然,只要清楚队列的头结点一直是一个值为 null 的结点,而真正有效的队列头结点是该结点的 next 结点。

LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#poll(long, TimeUnit) 方法,当队列为空时,该方法会阻塞等待指定的时间。

下面再来看一下 LinkedBlockingQueue#take 方法,相对于上面介绍的 LinkedBlockingQueue#poll 方法,对于有界队列而言,如果队列为空则该方法将无限期阻塞,方法实现如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列为空,则等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 获取队列头结点元素,并移除头结点
        x = this.dequeue();
        // 队列元素计数值减 1,这里返回的是减 1 之前的值
        c = count.getAndDecrement();
        // 队列在执行移除操作后至少还存在一个元素,尝试唤醒一个之前因为队列为空而阻塞的线程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    /*
     * 之前队列已满,但是经过本次 poll 操作之后,至少有一个空闲位置,
     * 尝试唤醒一个之前因为队列已满而阻塞的线程
     */
    if (c == capacity) {
        this.signalNotFull();
    }
    return x;
}

由上述实现可以看出,相对于 LinkedBlockingQueue#poll 方法在队列为空时的直接返回,方法 LinkedBlockingQueue#take 会将当前线程添加到条件队列中等待其它线程添加新的队列元素。

移除元素:remove

针对移除元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#remove 方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 LinkedBlockingQueue#poll 方法执行的。下面来分析一下有参版本的实现,如下:

public boolean remove(Object o) {
    if (o == null) {
        return false;
    }
    // 锁定出队列、入队列操作
    this.fullyLock();
    try {
        // 从头开始遍历队列
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果找到第一个目标元素,则移除
            if (o.equals(p.item)) {
                // 移除 p 结点,如果执行移除之后队列有空闲位置,
                // 则尝试唤醒一个之前因为队列已满而阻塞的线程
                this.unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 释放出队列、入队列操作
        this.fullyUnlock();
    }
}

上述方法接收一个参数,并执行删除元素值等于该参数的结点,如果存在多个满足条件的结点,则删除第一个。在执行删除操作之前会获取 putLock 和 takeLock 两把锁,以防止删除期间有其它线程执行出队列或入队列操作。

其它操作:size & contains

最后来看一下 LinkedBlockingQueue#containsLinkedBlockingQueue#size 方法的实现,前者用于检查队列是否包含值等于参数的结点,实现如下:

public boolean contains(Object o) {
    if (o == null) {
        return false;
    }
    // 锁定出队列、入队列操作
    this.fullyLock();
    try {
        // 从头开始遍历链表,并逐一比对
        for (Node<E> p = head.next; p != null; p = p.next) {
            if (o.equals(p.item)) {
                return true;
            }
        }
        return false;
    } finally {
        // 释放出队列、入队列操作
        this.fullyUnlock();
    }
}

方法 LinkedBlockingQueue#size 用于返回队列的结点个数,前面已经介绍了 LinkedBlockingQueue 定义了一个 AtomicInteger 类型的字段用于计数队列的结点个数,所以 LinkedBlockingQueue#size 方法能够精确的返回,且几乎没有性能开销,同时在实现上非常简单,如下:

public int size() {
    return count.get();
}

“JUC的LinkedBlockingQueue如何实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI