温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何理解AQS源码

发布时间:2021-10-19 16:42:17 来源:亿速云 阅读:129 作者:iii 栏目:编程语言

本篇内容介绍了“如何理解AQS源码”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

AQS abstractQueueSynchronizer(抽象队列同步器),是什么?

答:它是用来构建锁 或者 其他同步器组件的重量级基础框架,是整个JUC体系的基础。通过内置FIFO队列来完成获取线程取锁的排队工作,并通过一个int类型变量标识持有锁的状态;

    前置知识点:

    1、可重入锁(递归锁):

    sync(隐式锁,jvm管理)和ReentrantLock(Lock显式锁,就是手动加解)是重入锁的典型代表,为可以重复使用的锁。一个变成多个流程,可以获取同一把锁。

    可重入锁概念: 是指一个线程,在外层方法获取锁的时候,再次进入该线程的内层方法会自动获取锁(必须是同一个对象),不会被阻塞。可避免死锁

    举例: 递归调用同一个 sync修饰的方法或者代码块。必须是一个对象才行。一个线程调用一个对象的method1,method1 调用method2,method2调用method3, 3个方法都是被sync修饰,这样也是一个可重入锁的例子 。

    再比如下面这种

static Object lock = new Object();
public void mm(){
    synchronized (lock){
        System.out.println("===========mm method");
        synchronized (lock){
            System.out.println("=========== method");
        }
    }
}

     只有一个对象 和同步代码块,如果sycn中嵌套sync 并都是lock对象,那么该线程就会持有当前对象的锁,并可重入。反编译后发现

public void mm();
    Code:
       0: getstatic     #7                  // Field lock:Ljava/lang/Object;
       3: dup
       4: astore_1
       5: monitorenter
       6: getstatic     #8                  // Field java/lang/System.out:Ljava/io/PrintStream;
       9: ldc           #9                  // String ===========mm method
      11: invokevirtual #10                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
      14: aload_1
      15: monitorexit
      16: goto          24
      19: astore_2
      20: aload_1
      21: monitorexit
      22: aload_2
      23: athrow
      24: return
    Exception table:
       from    to  target type
           6    16    19   any
          19    22    19   any

    sync同步代码块加解锁,使用的命令为monitorenter 和 monitorexit(同步方法标识是ACC_SYNCHRONIZED,在flag中),enter 为加锁,必须成对出现,但这里却又两个exit。原因为第一个exit为程序正常运行后的解锁命令,并执行完后会执行goto到return ,也就是第24行,

    第二个exit 为当程序出现异常时,需要执行的解锁命令;

如上就是可重入锁的相关概念

    2、什么是LockSupport?

    根据jdk8 的api文档显示定义为: 用于创建锁和其他同步类的基本线程阻塞原语; 

    是一个线程阻塞工具类,所有方法均为静态,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法。

    先复习下object 对象的wait 和 notify 和Lock 的condition

  • wait 和notify 必须在sync 代码块中才能使用,否则报错。非法的监视器

  • condition的await 和 signal方法也必须在lock 和unlock方法前执行,否则报错,非法的监视器

  • 线程一定要先 等待 ,再 被 唤醒,顺序不能换

      LockSupport 有两个关键函数 park 和unpark,该类使用了Permit(许可证)的概念来阻塞和唤醒线程的功能。每个线程都会有一个Permit,该Permit 只有两个值 0 和1 ,默认是0。类似于信号量,但上限是1;

     来看park方法

public static void park() {
    //unsafe的方法。初始为0
    UNSAFE.park(false, 0L);
}

    禁止当前线程进行线程调度,除非Permit可用,就是1

    如果Permit 为1(有可用证书) 将变更为0(线程仍然会处理业务逻辑),并且立即返回。否则当前线程对于线程调度目的将被禁用,并处于休眠状态。直至发生三件事情之一:

  • 一些其他线程调用当前线程作为目标的unpark ; 要么

  • 其他一些线程当前线程为interrupts ; 要么

  • 电话虚假(也就是说,没有理由)返回。

     这种方法不报告是哪个线程导致该方法返回。 来电者应重新检查导致线程首先停放的条件。 呼叫者还可以确定线程在返回时的中断状态。

     小结:Permit默认0,所以一开始调用park,当前线程被阻塞,直到别的线程将当前线程的Permit修改为1,从park方法处被唤醒,处理业务,然后会将permit修改为0,并返回;如果permit为1,调用park时会将permit修改为0,在执行业务逻辑到线程生命周期。与park方法定义吻合。

     在看unpark方法:

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

     在调用unpark方法后,会将Thread线程的许可permit设置成1,会自动唤醒thread线程,即,之前阻塞中的LockSupport.park方法会立即返回,然后线程执行业务逻辑 。 且 unpark可以在park之前执行。相当于执行park没有效果。

    3、AQS abstractQueueSynchronizer 源码

    剩余前置知识为: 公平锁、非公平锁、自旋锁、链表、模板设计模式

     AQS使用volatile修饰的int类型的变量 标识锁的状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成node节点实现锁的分配,通过CAS(自旋锁)完成对state值的修改 ;

如何理解AQS源码

    (1)node节点源码

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
    	//共享节点
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
    	//独占节点
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
    	//线程被取消状态
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
    	//	后续线程需要唤醒
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
    	//邓丹condition唤醒
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
    	//共享室同步状态获取 将会无条件传播下去
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
    	//初始为0,状态是上面几种,标识当前节点在队列中的状态
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
    	//前置节点
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
    	//后置节点
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
    	//当线程对象
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

    node节点就是每一个等待执行的线程。还有一个waitState状态字段,标识当前等待中的线程状态

根据node节点 绘画一个aqs基本结构图

如何理解AQS源码

     解释:state为状态位,aqs为同步器。有head 和tail两个 头 尾节点,当state = 1时,表明同步器被占用(或者说当前有线程持有了同一个对象的锁),将后续线程添加到队列中,并用双向链表连接,遵循FIFO。

     (2)以ReentrantLock的实现分析。因为他也实现了Lock 并内部持有同步器sync和AQS(以银行柜台例子)

     new ReentrantLock()或 new ReentrantLock(false)时,创建的是非公平锁,而 ReentrantLock对象内部还有 两个类 分别为公平同步器和非公平同步器

static final class NonfairSync extends Sync
//公平锁 有一个判断队列中是否有排队的线程,这是与上面锁不同的获取方式
static final class FairSync extends Sync

    公平锁解释:先到先得,新线程在获取锁时,如果这个同步器的等待队列中已经有线程在等待,那么当前线程会先进入等待队列;

    非公平锁解释:新进来的线程不管是否有等待的线程,如果可以获取锁,则立刻占有锁。

    这里还有一个关键的模板设计模式: 在查询aqs的tryAcquire方法时发现,该方法直接抛出异常,这就是典型的模板设计模式,强制要求子类重写该方法。否则不让用

    1.1  当线程a到柜台办理业务时,会调用sync 的lock,即 a线程调用lock方法

final void lock() {
    //利用cas将当前对象的state 从0 设置成1,当然里面还有一个偏移量
    //意思就是如果是0 就设置为1成功返回true
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

    因为是第一个运行的线程,肯定是true所以,将当前运行的线程设置为a,即a线程占用了同步器,获取了锁

    1.2 当b线程运行lock时,发现不能将0设置成1(cas思想),就会运行acquire(1)方法

public final void acquire(int arg) {
    //这里用到了模板设计模式,强制子类实现该方法
    //因为默认使用非公平锁,所以看NonfairSync 
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

//该方法就是非公平锁执行 等待的方法
final boolean nonfairTryAcquire(int acquires) {
    //获取当前b线程
    final Thread current = Thread.currentThread();
    //获取当前锁的状态是1,因为a线程已经获取,并将state修改为1
    int c = getState();
    //有可能b在设置state时,a正办理,到这儿时,a办理完了。state为0了。
    if (c == 0) {
        //乐观的将state 从0 修改为 1
        if (compareAndSetState(0, acquires)) {
            //设置当前获取锁的线程为b
            setExclusiveOwnerThread(current);
            return true;
        }
    }

    //有可能 a线程办完业务。又回头办理了一个,所以当前线程持有锁的线程依旧是a
    else if (current == getExclusiveOwnerThread()) {
        //2
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //设置state的值
        setState(nextc);
        return true;
    }
    //如果b线程走到这里,就证明b必须到等待队列里去了
    return false;
}

     再来看逻辑运算符后面的逻辑

private Node addWaiter(Node mode) {
    //实例化一个节点,并将b 和 节点类型封装成node
    Node node = new Node(Thread.currentThread(), mode);
    //等待队列为null 所以tail初始化肯定是null
    //如果是线程c在b之后进来,tail就是b 节点
    Node pred = tail;
    //c节点之后都走这个方法
    if (pred != null) {
        //node的前置为tail
        //c 的前置设置为b
        node.prev = pred;
        //cas乐观,比较 如果当前节点仍然是b 就将b 设置成c
        //b就是tail尾节点,将tail设置成c
        //这里根据源码可知,就是将tail的值设置成c 并不影响pred的值,还是b
        if (compareAndSetTail(pred, node)) {
            //b 的下一个节点设置成c
            pred.next = node;
            return node;
        }
    }
    //线程b 入等待队列
    enq(node);
    return node;
}

//入队列方法
private Node enq(final Node node) {
    //该方法类似于while(true)
    for (;;) {
        //获取tail节点
        Node t = tail;
        //初始化锁等待队列
        if (t == null) { // Must initialize
            //设置头部节点为新的节点
            //这里看出,锁等待队列的第一个节点并非b,而是一个空node,该node为站位节点或者叫哨兵节点
            if (compareAndSetHead(new Node()))
                //将头尾都指向该节点
                tail = head;
        } else {
            //第二次循环时,t为空node,将b的前置设置为空node
            node.prev = t;
            //设置tail节点为b节点
            if (compareAndSetTail(t, node)) {
                //空node节点的下一个节点为b node节点
                t.next = node;
                return t;
            }
        }
    }
}
/**
 * CAS head field. Used only by enq.
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

     以上b线程的进入等待队列的操作就完成了 ,但线程还是活跃的,如何阻塞的呢?

     下面接着看acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋执行
        for (;;) {
            //如果是b线程,这里p就是b节点的前置节点
            final Node p = node.predecessor();
            //空节点就是head节点,但又调用了一次tryAcquire方法,想再尝试获取锁资源
            //如果a线程未处理完,那么这里返回false
            //如果a线程处理完成,那么这里就可以获取到锁
            if (p == head && tryAcquire(arg)) {
                //将head设置成b节点
                setHead(node);
                //原空节点的下连接断开
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //第一次空节点进入should方法。返回false
            //当第二次循环到此处should方法返回true
            //执行parkAndCheckInterrupt方法,会将当前线程park,并获取b线程的中断状态,如果未中断返回false,并再次自旋一次 ,中断为true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //head节点就是空节点所以w=0
    //空节点第二次进入时就是-1
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    //如果b节点状态是其他,则将节点连接变化一下
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //ws = 0时,使用cas将验证pred 和ws 的值,是空节点和0  并将ws修改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

     以上b线程未获取锁 并被挂起的操作就完成了

    1.3  当a线程调用unlock方法时:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    //判断a线程是否完成业务。并释放锁,state=0
    if (tryRelease(arg)) {
        //获取头部节点,就是空节点
        Node h = head;
        //空节点在b获取锁时,状态变更为-1,所以这里是true
        if (h != null && h.waitStatus != 0)
            //唤醒线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//aqs父类的模板方法,强制要求子类实现该方法
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected final boolean tryRelease(int releases) {
    //将锁的状态设置为0
    int c = getState() - releases;
    //判断当前线程与 锁的独占线程是否一致
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    //所得状态标识
    boolean free = false;
    if (c == 0) {
        free = true;
        //如果state=0证明a完成了业务。那么锁的独占状态就应该恢复为null
        setExclusiveOwnerThread(null);
    }
    //恢复锁的state状态
    setState(c);
    return free;
}

    注意:这里state是减1操作。如果ReentrantLock不断可重入,那么这里是不能一次就归零的。所以才会有ReentrantLock 调了几次lock 就是要调几次unlock

    a线程唤醒b线程

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        //空节点-1,设置成0
        compareAndSetWaitStatus(node, ws, 0);

	//获取b节点,非null 且 waitState=0
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //将b线程唤醒
        LockSupport.unpark(s.thread);
}

     而 b线程还在acquireQueued方法里自旋呢,不过自旋后就会获取锁。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //b线程在a未释放锁之前一直在自旋,
        for (;;) {
            final Node p = node.predecessor();
            //当a释放锁后,b获取到锁,将state设置为1
            //并将空节点的所有连接断开等待GC回收
            //并返回当前线程 b 的中断状态
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //park当前线程b 并获取b的中断状态,肯定是false,且调用的是带参的native方法,多次调用会重置b线程的中断状态
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//调用该方法的if判断如果进入了。会将b 中断,并在不断循环中再重置中断状态为false

     1.4 c线程的执行流程与b线程类似。会将b节点充等待队列中移除,遵循FIFO

“如何理解AQS源码”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI