温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

SynchronousQueue 1.8 源码解析

发布时间:2020-08-04 09:57:03 来源:网络 阅读:981 作者:wx5c78c8b1dbb1b 栏目:编程语言

[TOC]

SynchronousQueue 1.8 源码解析

一,简介

SynchronousQueue 是一个很奇怪的队列,感觉都不能叫队列,因为内部没有数据的存储空间,队列不能peek,因为不存在元素,任何入队的线程都会阻塞,直到有线程来出队,也就是这个队列是一组操作,入队和出队要一起离开,出队也是一样,必须等入队,必须结伴而行;队列支持公平和非公平的模式(指的是队列匹配线程的顺序),公平模式的数据结构是队列(FIFO),非公平模式使用的是栈(LIFO)。

二,UML 图

SynchronousQueue 1.8 源码解析

三,基本成员
abstract static class Transferer<E> {
    // 出队入队都是这一个方法
    abstract E transfer(E e, boolean timed, long nanos);
}

    // npu数
    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    // 带超时时间的自旋次数
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    // 没有超时的自旋次数
    static final int maxUntimedSpins = maxTimedSpins * 16;
TransferStack 非公平的实现,主要成员

TransferStack 继承 Transferer

注意:这几个状态很重要,因为继承了Transferer,所以出队和入队都是使用的transfer方法,状态是用来区分的,后面方法部分会详解

        /** 0表示消费者 */
        static final int REQUEST    = 0;
        /** 1表示数据的生产者 */
        static final int DATA       = 1;
        /** 2 表示数据正在匹配 */
        static final int FULFILLING = 2;

        static final class SNode {
            volatile SNode next;        // 下一个节点
            volatile SNode match;       // 匹配的节点
            volatile Thread waiter;     // 等待的线程
            Object item;                // 数据
            int mode;                   // 模式 0 , 1 , 2
        }
    /** 头结点 */
    volatile SNode head;
TransferQueue 公平实现,主要成员

TransferQueue 继承 Transferer

    static final class QNode {
            volatile QNode next;          // next 节点
            volatile Object item;         // 数据项
            volatile Thread waiter;       // 等待线程
            final boolean isData;         // 区分生产和消费
    }

        /** 头结点 */
        transient volatile QNode head;
        /** 尾节点 */
        transient volatile QNode tail;
四,常用方法
构造方法
    public SynchronousQueue() {
        this(false);
    }

    // 构造方法,fair表示公平或者非公平
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
TransferStack 非公平常用方法
offer 方法
    public boolean offer(E e) {
        // e 不能为null
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            // e 不能为null
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
put 方法
public void put(E e) throws InterruptedException {
    // e 不能为null
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            // 支持中断
            Thread.interrupted();
            throw new InterruptedException();

    }
poll 方法
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
take 方法
public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
transfer 方法
    E transfer(E e, boolean timed, long nanos) {

            SNode s = null; // constructed/reused as needed
            //  根据所传元素判断为生产or消费
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) { // 无限循环
                SNode h = head; // 获取头结点
                if (h == null || h.mode == mode) {  // 头结点为空或者当前节点状态(0,1,2)和头结点相同
                    if (timed && nanos <= 0) {      // can't wait 设置有时间
                        // 节点不为null并且为取消状态
                        if (h != null && h.isCancelled())
                            // 弹出取消的节点
                            casHead(h, h.next);     // pop cancelled node
                        else
                            // 超时直接返回null
                            return null;
                     // 没有设置超时
                    } else if (casHead(h, s = snode(s, e, h, mode))) { // 将h设为自己的next节点
                        // 空旋或者阻塞直到s结点被FulFill操作所匹配
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {                // wait was cancelled 节点被取消了
                            clean(s);
                            return null;
                        }
                        // 找到匹配的线程了
                        // h == head 可能已经已经被匹配
                        // h.next 等于s 不同类型
                        if ((h = head) != null && h.next == s)
                            // 弹出h 和 s
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                // 未匹配
                } else if (!isFulfilling(h.mode)) { // try to fulfill // 尝试匹配节点
                    if (h.isCancelled())            // already cancelled // 节点被取消
                        casHead(h, h.next);         // pop and retry // 修改头结点
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            // 没有下一个节点了,结束这次循环,走最外层循环重新开始
                            if (m == null) {        // all waiters are gone // m等于null
                                casHead(s, null);   // pop fulfill node // cas 设置head
                                s = null;           // use new node next time
                                break;              // restart main loop // 结束循环
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {     // 尝试匹配,成功
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match // 失败,说明m背的线程匹配了,或者取消了
                                s.casNext(m, mn);   // help unlink //  修改next节点
                        }
                    }
                } else {                            // help a fulfiller 正在匹配
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone 匹配完成了
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
awaitFulfill 方法
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {

            // 计算时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 获取当前线程
            Thread w = Thread.currentThread();
            // shouldSpin控制自旋
            // shouldSpin 用于检测当前节点是否需要自旋
            // 如果栈为空、该节点是首节点或者该节点是匹配节点,则先采用自旋,否则阻塞
            int spins = (shouldSpin(s) ?
                    (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) { // 死循环
                if (w.isInterrupted()) // 线程被中断
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)  // 存在匹配节点 ,返回
                    return m;
                if (timed) { // 存在超时设置
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                // 自旋;每次自旋的时候都需要检查自身是否满足自旋条件,满足就 - 1,否则为0
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                // 设置node的线程
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                // 如果不是超时,就阻塞
                else if (!timed)
                    LockSupport.park(this);
                // 设置超时阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
clean 方法
    void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            // next节点
            SNode past = s.next;
            // next节点也被中断了,直接删除
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb cancelled nodes at head
            // 从栈顶开始找,清除取消的节点
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes
            // 从有效的头节点开始p ,到s的后继节点,继续清除
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

分析transfer 方法:

  • 我们可以发现transfer 是通过e是空来判断是offer方法还是poll方法的,也就是入队者和出对者的区分。

  • 第一种情况,如果队列为空head,或者队列存在的head节点和自己的模式相同,首先判断有没有超时或者取消,有就执行这些操作,没有就执行入队操作,然后把新加入的节点加入栈顶,然后调用awaitFulfill方法阻塞线程,直到被中断,超时或者匹配成功,为什么要阻塞了因为大家的模式都相同没法匹配,所以只能阻塞线程,直到一个不同模式的线程匹配成功,唤醒自己,这也是awaitFulfill方法的结束流程。

  • 第二种情况,如果入队的模式不同,通过isFulfilling方法判断head节点有没有在匹配,没有就执行匹配流程,

    首先判断节点是否被取消了,没有在判断自己是不是唯一的一个节点,如果是循环,重新开始流程,如果不是上面的这些情况,就可以开始匹配节点了,调用tryMatch方法,成功唤醒另一个节点,然后一起出栈,返回结果,失败就向后推进,找下一个节点,这里可能别的线程会竞争的匹配。

  • 第三种情况,入队的模式不同,但是head节点正在匹配,那就帮助它匹配完成,然后重新走整个循环。
TransferQueue 公平常用方法

入队和出队的方法是一样的,我们主要看下transfer 方法吧。

transfer 方法
    E transfer(E e, boolean timed, long nanos) {

            QNode s = null; // constructed/reused as needed
            // 判断是生产者 还是消费者
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                // 没有初始化
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
                // h==t 刚刚初始化 t.isData == isData,尾节点和当前节点的类型一样
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next; // 获取尾节点
                    if (t != tail)      // 尾节点发生变了变化             // inconsistent read
                        continue;
                    if (tn != null) {   // 重设为节点             // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)  // 超时了       // can't wait
                        return null;
                    if (s == null) // 构建新节点
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))       // failed to link in 有竞争
                        continue;

                    // 替换尾节点
                    advanceTail(t, s);              // swing tail and wait
                    // 自旋/阻塞 返回的是中断取消/匹配的节点
                    Object x = awaitFulfill(s, e, timed, nanos);
                    // 中断
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    // 匹配成功了,需要执行出队操作
                    if (!s.isOffList()) {
                        // not already unlinked
                        // 修改头结点
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    // 出队从头元素开始
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head) // 队列发生变化重来
                        continue;                   // inconsistent read

                    Object x = m.item;
                    // isData == (x != null) 判断模式是否相同,不相同才能匹配
                    // x == m 说明已经被中断或者超时了
                    // m.casItem(x, e) 匹配
                    if (isData == (x != null) ||    // m already fulfilled
                            x == m ||                   // m cancelled
                            !m.casItem(x, e)) {// lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
                    // 匹配成功
                    // 替换头
                    advanceHead(h, m);              // successfully fulfilled
                    // 唤醒等待线程
                    LockSupport.unpark(m.waiter); // 唤醒线程
                    return (x != null) ? (E)x : e;
                }
            }
        }

分析transfer方法:

  • 也是通过e来判断是入队还是出队的,都是调用transfer方法,transfer方法可以看出两部分,入队和匹配。
  • 第一部分入队,入队的模式也是相同的,入队是从尾节点开始,获取尾节点,判断尾节点有没有发生变化,可能存在多线程的情况,发生改变就重新遍历,没有就判断尾节点有没有next节点,有就说明别的线程添加了新的节点,需要更新尾节点,然后构造新的节点加入当前尾节点的next节点,更新尾节点,然后调用awaitFulfill阻塞当前节点,直到中断,超时或者匹配,然后清除匹配成功的节点,调用clean方法。
  • 第二部分匹配(出队),出队是从头节点开始,然后判断模式是否不同,是否被取消,cas设置item,其实也就是数据的传递,如果匹配成功,唤醒等待在m的线程,这里注意把m设置成了头结点,其实就是把m节点弹出了,因为我们匹配取得头结点的next节点。
五,总结

SynchronousQueue 的实现还是很复杂的,我们可以发现虽然是个阻塞队列,可是没有使用锁;这个队列适合传递的场景,队列没有存储元素的队列,出队和入队必须结伴而行。

参考 《Java 并发编程的艺术》

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI