本篇内容介绍了“如何理解AQS源码”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
AQS abstractQueueSynchronizer(抽象队列同步器),是什么?
答:它是用来构建锁 或者 其他同步器组件的重量级基础框架,是整个JUC体系的基础。通过内置FIFO队列来完成获取线程取锁的排队工作,并通过一个int类型变量标识持有锁的状态;
前置知识点:
1、可重入锁(递归锁):
sync(隐式锁,jvm管理)和ReentrantLock(Lock显式锁,就是手动加解)是重入锁的典型代表,为可以重复使用的锁。一个变成多个流程,可以获取同一把锁。
可重入锁概念: 是指一个线程,在外层方法获取锁的时候,再次进入该线程的内层方法会自动获取锁(必须是同一个对象),不会被阻塞。可避免死锁
举例: 递归调用同一个 sync修饰的方法或者代码块。必须是一个对象才行。一个线程调用一个对象的method1,method1 调用method2,method2调用method3, 3个方法都是被sync修饰,这样也是一个可重入锁的例子 。
再比如下面这种
static Object lock = new Object(); public void mm(){ synchronized (lock){ System.out.println("===========mm method"); synchronized (lock){ System.out.println("=========== method"); } } }
只有一个对象 和同步代码块,如果sycn中嵌套sync 并都是lock对象,那么该线程就会持有当前对象的锁,并可重入。反编译后发现
public void mm(); Code: 0: getstatic #7 // Field lock:Ljava/lang/Object; 3: dup 4: astore_1 5: monitorenter 6: getstatic #8 // Field java/lang/System.out:Ljava/io/PrintStream; 9: ldc #9 // String ===========mm method 11: invokevirtual #10 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: return Exception table: from to target type 6 16 19 any 19 22 19 any
sync同步代码块加解锁,使用的命令为monitorenter 和 monitorexit(同步方法标识是ACC_SYNCHRONIZED,在flag中),enter 为加锁,必须成对出现,但这里却又两个exit。原因为第一个exit为程序正常运行后的解锁命令,并执行完后会执行goto到return ,也就是第24行,
第二个exit 为当程序出现异常时,需要执行的解锁命令;
如上就是可重入锁的相关概念
2、什么是LockSupport?
根据jdk8 的api文档显示定义为: 用于创建锁和其他同步类的基本线程阻塞原语;
是一个线程阻塞工具类,所有方法均为静态,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法。
先复习下object 对象的wait 和 notify 和Lock 的condition
wait 和notify 必须在sync 代码块中才能使用,否则报错。非法的监视器
condition的await 和 signal方法也必须在lock 和unlock方法前执行,否则报错,非法的监视器
线程一定要先 等待 ,再 被 唤醒,顺序不能换
LockSupport 有两个关键函数 park 和unpark,该类使用了Permit(许可证)的概念来阻塞和唤醒线程的功能。每个线程都会有一个Permit,该Permit 只有两个值 0 和1 ,默认是0。类似于信号量,但上限是1;
来看park方法:
public static void park() { //unsafe的方法。初始为0 UNSAFE.park(false, 0L); }
禁止当前线程进行线程调度,除非Permit可用,就是1
如果Permit 为1(有可用证书) 将变更为0(线程仍然会处理业务逻辑),并且立即返回。否则当前线程对于线程调度目的将被禁用,并处于休眠状态。直至发生三件事情之一:
一些其他线程调用当前线程作为目标的unpark ; 要么
其他一些线程当前线程为interrupts ; 要么
电话虚假(也就是说,没有理由)返回。
这种方法不报告是哪个线程导致该方法返回。 来电者应重新检查导致线程首先停放的条件。 呼叫者还可以确定线程在返回时的中断状态。
小结:Permit默认0,所以一开始调用park,当前线程被阻塞,直到别的线程将当前线程的Permit修改为1,从park方法处被唤醒,处理业务,然后会将permit修改为0,并返回;如果permit为1,调用park时会将permit修改为0,在执行业务逻辑到线程生命周期。与park方法定义吻合。
在看unpark方法:
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
在调用unpark方法后,会将Thread线程的许可permit设置成1,会自动唤醒thread线程,即,之前阻塞中的LockSupport.park方法会立即返回,然后线程执行业务逻辑 。 且 unpark可以在park之前执行。相当于执行park没有效果。
3、AQS abstractQueueSynchronizer 源码
剩余前置知识为: 公平锁、非公平锁、自旋锁、链表、模板设计模式
AQS使用volatile修饰的int类型的变量 标识锁的状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成node节点实现锁的分配,通过CAS(自旋锁)完成对state值的修改 ;
(1)node节点源码
static final class Node { /** Marker to indicate a node is waiting in shared mode */ //共享节点 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ //独占节点 static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ //线程被取消状态 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 后续线程需要唤醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ //邓丹condition唤醒 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ //共享室同步状态获取 将会无条件传播下去 static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ //初始为0,状态是上面几种,标识当前节点在队列中的状态 volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ //前置节点 volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ //后置节点 volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ //当线程对象 volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
node节点就是每一个等待执行的线程。还有一个waitState状态字段,标识当前等待中的线程状态
根据node节点 绘画一个aqs基本结构图
解释:state为状态位,aqs为同步器。有head 和tail两个 头 尾节点,当state = 1时,表明同步器被占用(或者说当前有线程持有了同一个对象的锁),将后续线程添加到队列中,并用双向链表连接,遵循FIFO。
(2)以ReentrantLock的实现分析。因为他也实现了Lock 并内部持有同步器sync和AQS(以银行柜台例子)
new ReentrantLock()或 new ReentrantLock(false)时,创建的是非公平锁,而 ReentrantLock对象内部还有 两个类 分别为公平同步器和非公平同步器
static final class NonfairSync extends Sync //公平锁 有一个判断队列中是否有排队的线程,这是与上面锁不同的获取方式 static final class FairSync extends Sync
公平锁解释:先到先得,新线程在获取锁时,如果这个同步器的等待队列中已经有线程在等待,那么当前线程会先进入等待队列;
非公平锁解释:新进来的线程不管是否有等待的线程,如果可以获取锁,则立刻占有锁。
这里还有一个关键的模板设计模式: 在查询aqs的tryAcquire方法时发现,该方法直接抛出异常,这就是典型的模板设计模式,强制要求子类重写该方法。否则不让用
1.1 当线程a到柜台办理业务时,会调用sync 的lock,即 a线程调用lock方法
final void lock() { //利用cas将当前对象的state 从0 设置成1,当然里面还有一个偏移量 //意思就是如果是0 就设置为1成功返回true if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
因为是第一个运行的线程,肯定是true所以,将当前运行的线程设置为a,即a线程占用了同步器,获取了锁
1.2 当b线程运行lock时,发现不能将0设置成1(cas思想),就会运行acquire(1)方法
public final void acquire(int arg) { //这里用到了模板设计模式,强制子类实现该方法 //因为默认使用非公平锁,所以看NonfairSync if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } //该方法就是非公平锁执行 等待的方法 final boolean nonfairTryAcquire(int acquires) { //获取当前b线程 final Thread current = Thread.currentThread(); //获取当前锁的状态是1,因为a线程已经获取,并将state修改为1 int c = getState(); //有可能b在设置state时,a正办理,到这儿时,a办理完了。state为0了。 if (c == 0) { //乐观的将state 从0 修改为 1 if (compareAndSetState(0, acquires)) { //设置当前获取锁的线程为b setExclusiveOwnerThread(current); return true; } } //有可能 a线程办完业务。又回头办理了一个,所以当前线程持有锁的线程依旧是a else if (current == getExclusiveOwnerThread()) { //2 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state的值 setState(nextc); return true; } //如果b线程走到这里,就证明b必须到等待队列里去了 return false; }
再来看逻辑运算符后面的逻辑
private Node addWaiter(Node mode) { //实例化一个节点,并将b 和 节点类型封装成node Node node = new Node(Thread.currentThread(), mode); //等待队列为null 所以tail初始化肯定是null //如果是线程c在b之后进来,tail就是b 节点 Node pred = tail; //c节点之后都走这个方法 if (pred != null) { //node的前置为tail //c 的前置设置为b node.prev = pred; //cas乐观,比较 如果当前节点仍然是b 就将b 设置成c //b就是tail尾节点,将tail设置成c //这里根据源码可知,就是将tail的值设置成c 并不影响pred的值,还是b if (compareAndSetTail(pred, node)) { //b 的下一个节点设置成c pred.next = node; return node; } } //线程b 入等待队列 enq(node); return node; } //入队列方法 private Node enq(final Node node) { //该方法类似于while(true) for (;;) { //获取tail节点 Node t = tail; //初始化锁等待队列 if (t == null) { // Must initialize //设置头部节点为新的节点 //这里看出,锁等待队列的第一个节点并非b,而是一个空node,该node为站位节点或者叫哨兵节点 if (compareAndSetHead(new Node())) //将头尾都指向该节点 tail = head; } else { //第二次循环时,t为空node,将b的前置设置为空node node.prev = t; //设置tail节点为b节点 if (compareAndSetTail(t, node)) { //空node节点的下一个节点为b node节点 t.next = node; return t; } } } } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }
以上b线程的进入等待队列的操作就完成了 ,但线程还是活跃的,如何阻塞的呢?
下面接着看acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋执行 for (;;) { //如果是b线程,这里p就是b节点的前置节点 final Node p = node.predecessor(); //空节点就是head节点,但又调用了一次tryAcquire方法,想再尝试获取锁资源 //如果a线程未处理完,那么这里返回false //如果a线程处理完成,那么这里就可以获取到锁 if (p == head && tryAcquire(arg)) { //将head设置成b节点 setHead(node); //原空节点的下连接断开 p.next = null; // help GC failed = false; return interrupted; } //第一次空节点进入should方法。返回false //当第二次循环到此处should方法返回true //执行parkAndCheckInterrupt方法,会将当前线程park,并获取b线程的中断状态,如果未中断返回false,并再次自旋一次 ,中断为true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //head节点就是空节点所以w=0 //空节点第二次进入时就是-1 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; //如果b节点状态是其他,则将节点连接变化一下 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //ws = 0时,使用cas将验证pred 和ws 的值,是空节点和0 并将ws修改为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
以上b线程未获取锁 并被挂起的操作就完成了
1.3 当a线程调用unlock方法时:
public void unlock() { sync.release(1); } public final boolean release(int arg) { //判断a线程是否完成业务。并释放锁,state=0 if (tryRelease(arg)) { //获取头部节点,就是空节点 Node h = head; //空节点在b获取锁时,状态变更为-1,所以这里是true if (h != null && h.waitStatus != 0) //唤醒线程 unparkSuccessor(h); return true; } return false; } //aqs父类的模板方法,强制要求子类实现该方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected final boolean tryRelease(int releases) { //将锁的状态设置为0 int c = getState() - releases; //判断当前线程与 锁的独占线程是否一致 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //所得状态标识 boolean free = false; if (c == 0) { free = true; //如果state=0证明a完成了业务。那么锁的独占状态就应该恢复为null setExclusiveOwnerThread(null); } //恢复锁的state状态 setState(c); return free; }
注意:这里state是减1操作。如果ReentrantLock不断可重入,那么这里是不能一次就归零的。所以才会有ReentrantLock 调了几次lock 就是要调几次unlock
a线程唤醒b线程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //空节点-1,设置成0 compareAndSetWaitStatus(node, ws, 0); //获取b节点,非null 且 waitState=0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //将b线程唤醒 LockSupport.unpark(s.thread); }
而 b线程还在acquireQueued方法里自旋呢,不过自旋后就会获取锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //b线程在a未释放锁之前一直在自旋, for (;;) { final Node p = node.predecessor(); //当a释放锁后,b获取到锁,将state设置为1 //并将空节点的所有连接断开等待GC回收 //并返回当前线程 b 的中断状态 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //park当前线程b 并获取b的中断状态,肯定是false,且调用的是带参的native方法,多次调用会重置b线程的中断状态 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //调用该方法的if判断如果进入了。会将b 中断,并在不断循环中再重置中断状态为false
1.4 c线程的执行流程与b线程类似。会将b节点充等待队列中移除,遵循FIFO
“如何理解AQS源码”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。