一、什么是同步器
同步器是用来构建锁或者其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,它能实现大部分的同步需求。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程的并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了状态管理、线程的排队、等待与唤醒等底层操作。锁和同步容器很好地隔离了使用者和实现者所需要关注的领域。
二、同步器的基本成员(介绍常用的类好方法)
Node 是AQS的内部类构成AQS队列的一种数据结构。
成员变量 | 作用 |
---|---|
waitStatus | 记录节点的等待状态。包括如下状态:① CANCELLED,值为1,由于同步队列中等待线程超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。② SIGNAL值为-1,后继节点的线程处于等待状态,而当前线程如果释放了同步状态或者取消,将会通知后继节点,使得后继节点得以运行。③ CONDITION值为-2,节点在等待队列中,节点等待在Condtion上,当其他线程对Condtion调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。④ PROPAGATE值为-3,表示下一次共享式同步状态将会无条件地被传播下去。⑤ INITAL,值为0,初始状态 |
SHARED = new Node() | 表示共享式的node |
EXCLUSIVE = null | 独占式的node |
Node prev | Node的前节点 |
Node next | Node的后节点 |
nextWaitert | 等待队列的中node的下一个节点 |
ConditionObject是AQS的内部类构成类似Object的等待/通知机制。
成员/方法 | 作用 |
---|---|
Node firstWaiter | 等待队列的头节点 |
Node lastWaiter | 等待队列的尾节点 |
await() | 当前线程进入等待状态知道被通知或中断,当前线程进入运行状态且从await()返回的情况如下,包括:① 其它线程调用Interrupt()方法中断当前线程。② 如果当前线程从await()方法返回,那么表明该线程已经获取了Condtion对象锁对应的锁 |
awaitUninterruptibly() | 当前线程进入等待直到被通知,该方法对中断不敏感 |
awaitNanos(long nanosTimeout)) | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余的时间,如果在nanosTimeout纳秒之前被唤醒,那么返回就是(nanosTimeout-实际耗时);如果返回是0或者负数,那么可以认定已经超时了 |
awaitUntil(Date deadline) | 当前线程进入等待状态直到被通知、中断或者某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示超时,方法返回false |
signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关的锁 |
signalAll | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
AQS主要成员
成员变量 | 作用 |
---|---|
state | 维护锁的一个变量(同步状态,很重要)① setState 。② getState。 ③ compareAndSetState。 |
Node head | FIFO同步队列的头结点 。 |
Node tail | FIFO同步队列的尾结点 。 |
AQS主要方法
方法名 | 作用 |
---|---|
acquire() | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法会调用重写的tryAcquire(arg)方法(需要锁自己实现) |
release(int arg) | 独占式释放状态,如果释放状态成功,则会去唤醒头结点;释放状态调用tryRelease(arg)方法(需要自己实现) |
acquireShared(int arg) | 共享式获取同步状态,也就是说可以几个线程同时获取同步状态,如果当前线程未获取同步状态,将会进入同步队列。 |
releaseShared() | 共享式释放状态,释放之后会唤醒头结点 |
acquireInterruptibly() | 响应中断的独占式获取同步状态,当前线程未获取同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出中断异常,并返回 |
tryAcquireNanos() | 在acquireInterruptibly()基础上增加超时限制,如果当前线程在超时时间内没有获取同步状态,那么将返回false,如果获取到了返回true |
acquireSharedInterruptibly() | 响应中断的共享式获取同步状态 |
tryAcquireSharedNanos() | 在acquireSharedInterruptibly()的基础上增加超时限制 |
以上就是AQS的一些基本成员和方法,下面主要从现实的角度分析这些方法,理解这些方法的实现,能刚好的帮助我们去理解锁。
三、AQS的方法实现分析
1)、独占系列的方法
①、acquire()独占式获取同步状态,表示只会有一个线程获取,其它线程进入同步队列。
源代码如下:
// 获取锁的方法(独占模式)
public final void acquire(int arg) {
// tryAcquire(arg) 这个方法需要我们自己去实现,如果获取失败,
// 调用addWaiter构造节点
// acquireQueued
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们可以看见acquire方法内部调用了tryAcquire(arg)方法,这个方法需要构造同步组件的类自己去实现,不过返回值已经被AQS定义好了,返回true代表获取同步状态成功,返回false代表失败,需要将线程构造节点加入同步队列,就是调用acquireQueued这个方法。
acquireQueued这个方法实际是先去调用了addWaiter方法。
addWaiter(),这个方法其实就是把当前节点加入到同步队列,加入成功才返回,其实队列初始化时会制造一个空的节点,然后在空的节点后面设置同步节点(可以理解为每次获取获取锁的那个线程其实就是头结点,它是一个空的节点,结合acquireQueued方法,每次获取锁之后,该节点就会升级为头节点,并且变成一个空节点。)
private Node addWaiter(Node mode) {
// 构造一个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取尾节点
Node pred = tail;
// 判断尾节点是否为空
if (pred != null) {
// 不为空,设置node节点的上一个节点为pred(也就是尾节点)
node.prev = pred;
// cas设置尾节点
if (compareAndSetTail(pred, node)) {
// 成功后,设置pred节点的next节点为node,返回
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued()方法,通过上面的addWaiter方法我们已经把这个节点加入同步队列,接下来需要处理这个节点。首先判断自己的前节点是否是头结点,自己是否获取到同步状态,如果满足,把自己设置尾头结点,返回,如果不是,进入shouldParkAfterFailedAcquire(详情见后面方法分析)方法主要作用是判断自己的前置节点是否是SIGNAL状态,是的话自己就可以阻塞自己了,调用parkAndCheckInterrupt(详情见后面方法分析)方法,直到被唤醒或者中断。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
// p是头结点,自己获取锁成功
if (p == head && tryAcquire(arg)) {
// 设置自己为头节点,变成一个空节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 找到前节点为signal,然后阻塞自己
// 清理等待超时或者中断的节点
// 尝试设置线程的状态为signal
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 出现异常
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法,这个方法主要做三件事,判断自己的前置节点是否是SIGNAL,返回true,就可以阻塞了,不是如果状态大于0,证明前面的节点被中断或者超时了,需要从队列清理了,不是大于0,就利用cas设置前置节点为SIGNAL,返回false。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取node前节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 如果pred节点释放了状态,会通知自己
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 大于0证明,前面的线程等待超时或者已经被中断,需要从节点中移除
// 需要找到不大于0的那个节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 找到小于等于0的前节点,设置为SIGNAL
// 这个地方ws值只会为PROPAGATE或者0
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt方法,这个方法需要前面的方法返回true才会执行,它会阻塞node的这个线程,返回线程的中断中断状态并清理Thread.interrupted(),所以独占式获取同步状态对中断不响应的。
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire方法,在finally块里面,出现异常就会执行这个方法,做一些处理当前node的操作。
// 异常后,finally里面执行的方法
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// node是为节点,设置尾节点是pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 不是头结点和尾节点,前节点是SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
附上一张acquire独占式获取同步状态的流程图:
②、release()独占式释放同步状态,释放线程后唤醒节点。
源代码:
其中tryRelease也需要同步组件自己去实现,语义也被AQS所定义,true代表释放成功,false代表失败,如果为true,就需要决定是否去唤醒节点,首先获取同步队列的头节点,判断头结点不是空,证明有同步对别有节点才需要唤醒,判断头结点不是刚刚初始化,如果是刚刚初始化,就还没有阻塞,请参考acquire的acquireQueued处理节点的逻辑,都为true执行unparkSuccessor方法,false返回。
public final boolean release(int arg) {
// 释放锁
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空,证明初始化了
// 证明头结点不是刚刚创建
// 那就可以去唤醒头结点或者它的后继节点
// 为0就证明没有其他节点了,不需要唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor()方法,唤醒节点,唤醒的node的后置节点,因为在获取同步状态是我们阻塞的也是后置节点,唤醒后置节点后,会去找到前节点,也就是当前的结点去获取同步状态,然后再把自己变成头结点。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 没有这个节点或者超时或者被中断了,查找一个可以用的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 证明有这个节点
if (s != null)
LockSupport.unpark(s.thread);
}
独占式释放同步状态的流程图:
③、acquireInterruptibly()响应中断的独占式获取同步状态
可以看出如果线程中断立马返回异常,然后再去执行tryAcquire()获取同步状态,获取失败执行doAcquireInterruptibly方法。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly()方法,和acquire的acquireQueued的方法差不多,区别就是在parkAndCheckInterrupt这个方法如果返回true,就会抛异常InterruptedException,说明这个方法响应异常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
④、tryAcquireNanos()带时间的获取同步状态,在时间内获取到,返回true,超时返回false,首先判断线程中断状态,为true就抛异常,为false就尝试获取同步状态tryAcquire,获取失败执行doAcquireNanos方法。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos()方法,其实这个都是在acquire方法上的改进,我们看看这个方法,首先算下时间也就是deadline,然后加入同步队列addWaiter方法,然后判断node的前节点是否为头结点,是就尝试获取同步状态,都为true就返回,为false就接着算下时间,判断node前节点是否为SIGNAL,也就是shouldParkAfterFailedAcquire这个方法,为true,线程阻塞计算的时间,然后true(等待阻塞时间到)和false都判断线程中断状态,中断就抛出异常,执行异常方法,不为true,继续循环,直到获取锁或者超时。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2)、共享系列的方法
①、acquireShared共享式获取同步状态,获取失败就加入同步队列
AQS也把语义指定好了,返货负数证明没有了,就执行doAcquireShared方法
public final void acquireShared(int arg) {
// 返回负数就证明没有锁了,加入同步队列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared()方法,首先构造节点加入队列addWaiter,然后获取node的前节点,判断node的前节点是否为头结点,如果是,获取资源的个数,如果资源大于等于0,调用setHeadAndPropagate方法,然后返回,不满足,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法和独占式一样。
private void doAcquireShared(int arg) {
// 构建共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前节点
final Node p = node.predecessor();
// 前节点是否是头节点
if (p == head) {
// 获取锁的个数
int r = tryAcquireShared(arg);
// 大于等于0,获取锁成功
if (r >= 0) {
setHeadAndPropagate(node, r); // 设置头结点,如果有多余资源接着唤醒
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()方法,设置头结点,设置waitStatus为Propagate,为如果还有资源,唤醒后面的节点,调用doReleaseShared方法(这个方法会在共享式释放同步状态详解)
private void setHeadAndPropagate(Node node, int propagate) {
// 头结点
Node h = head; // Record old head for check below
// 设置头结点为node
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 还有资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 当前节点的next节点是共享或者没有next节点
if (s == null || s.isShared())
// 唤醒后置节点
doReleaseShared();
}
}
②、releaseShared()共享式释放状态
tryReleaseShared是需要同步组件自己去实现,释放成功调用doReleaseShared唤醒节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法,方法有些复杂,不好理解,我们主要来分析三个if的含义,第一个 if (ws == Node.SIGNAL) 表示当前node需要被唤醒,然后后面利用cas设置waitStatus为0,因为是共享模式可能有多个线程同时来释放同步状态,所以只能有一个释放成功,另外一个重试;第二个else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),其实也是用来处理并发的,当第一次并发失败的线程第二次进入时,可能会看到ws等于0(因为成功的线程设置的),所以利用cas设置为PROPAGATE,表示传递,这里补充一下不管是0或者PROPAGATE,都会被唤醒的线程利用cas设置为SIGNAL(参考shouldParkAfterFailedAcquire方法);第三个(h == head)
被唤醒的线程B会首先执行setHead
因此如果最后h!=head,说明新一轮的唤醒竞争已经开始,当前线程c已经觉察到,因此继续参与竞争,加快唤醒
因此如果最后h==head,说明新一轮的唤醒竞争尚未开始,而被唤醒的线程B必然会开启新一轮的唤醒竞争,而当前线程c可以安心退出唤醒竞选
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 头结点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 表示后面节点需要唤醒
if (ws == Node.SIGNAL) {
// 多线程控制并发 ,可能存在多个线程同时来修改
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果ws等于0,尝试把cas设置waitStatus为PROPAGATE,传递下去
// 请联系shouldParkAfterFailedAcquire方法一起看
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点没有发生变化,表示设置完成,退出循环
// 如果发生变化,加入唤醒的过程(加速唤醒,可能存在多个线程在唤醒这些node,速度比一个接一个要快)
if (h == head) // loop if head changed
break;
}
}
③、acquireSharedInterruptibly()和tryAcquireSharedNanos()一个响应中断,一个响应中断支持添加获取的超时时间(参考独占模式的这些方法)
3)、ConditionObject系列方法
①、await()方法,类似Object的await方法,阻塞线程释放锁。
我们可以看见await的第一步是调用addConditionWaiter方法,它的作用是构建等待节点加入队列的尾部,使用的也是AQS的Node,队列里面顺便也会清理清除Node不为CONDITION的节点;第二步需要释放线程获取的同步状态fullyRelease方法;第三步:阻塞线程,找到线程中断时机,也就是调用signal方法的前后顺序;第四步:调用acquireQueued方法处理节点(阻塞还是其它);第五步:清理节点unlinkCancelledWaiters方法(清除Node不为CONDITION的节点);第六步:响应await语义,await阻塞线程时调用interrupt方法会抛异常reportInterruptAfterWait方法。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入等待队列,清除节点
Node node = addConditionWaiter();
// 释放状态
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞线程
LockSupport.park(this);
// 线程是被中断唤醒的
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 加入同步队列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清理节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 响应中断 await语义
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法每次都是加入队列的尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
// 获取第一个
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 获取第一个的下一个
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// t需要断开连接
t.nextWaiter = null;
// 第一次trail = null
// firstWaiter = next
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
isOnSyncQueue方法判断node是否在同步队列中
final boolean isOnSyncQueue(Node node) {
// 节点状态为CONDITION ,或者node.prev == null 等待节点没有前置节点
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 等待节点没有next节点
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 循环查找
return findNodeFromTail(node);
checkInterruptWhileWaiting 方法,判断是否是中断唤醒,这方法就是为了确认中断的时机是在signal的前面还是后面signal,因为需要响应中断
private int checkInterruptWhileWaiting(Node node) {
// 判断是否是线程中断唤醒
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 设置成功表示在signal 执行之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 设置成功表示在signal 执行之后
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
acquireQueued和unlinkCancelledWaiters方法前面都介绍过了,一个是加入同步队列,一个是清理节点,介绍下reportInterruptAfterWait方法,它是我为了响应线程Interrupt方法,interruptMode == THROW_IE只在在signal方法后调用Interrupt方法才满足,线程阻塞时调用Interrupt方法会抛异常,这是Object.await里面满足的,请参考checkInterruptWhileWaiting方法里面的transferAfterCancelledWait方法理解其实现。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
②、awaitNanos(long nanosTimeout)和awaitUntil(Date deadline)都是提供了超时时间,和await方法类似,只是加入了时间机制。
③、awaitUninterruptibly不响应中断方法,发现里面都没有判断是都发生中断的标记,只有调用signal唤醒node,循环才会结束,然后调用acquireQueued处理这个节点(阻塞还是其它)
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
④、signal方法,唤醒第一个等待队列的node。
public final void signal() {
// 判断是否获取锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 初始化
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal方法,首先把这个first节点和等待队列断开连接,然后把调用transferForSignal方法把节点从等待队列加入同步队列,唤醒节点的线程,然后被唤醒的线程就会在await方法里面执行acquireQueued这个方法。
private void doSignal(Node first) {
do {
// 队列里面即将没有节点,所以首尾都要为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 把first 断开连接
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
⑤、signalAll唤醒所有等待队列的节点加入同步队列,并且清空等待队列
public final void signalAll() {
// 判断是否获取锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取第一个节点
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
// 队列设置为null
lastWaiter = firstWaiter = null;
// 从首节点开始加入同步队列,知道队列为空
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
四、总结
我们学习AQS其实我觉得主要从三个方面,也就是本文的第三部分,从独占式获取和释放同步状态、共享式获取和释放同步状态和ConditionObject里面的等待/通知机制;这里在说一下独占式释放锁和共享式释放锁,独占式因为只会有一个线程获取同步状态,所以释放时也只会有一个,但是在共享这一块,我们在释放同步同步状态时可能会有多个线程同时来释放,可能出现并发的情况,理解doReleaseShared是理解共享式释放的重点;学习获取和释放同步状态,理解同步队列节点的变化是重点;学习等待/通知理解等待队列和同步队列的关系和节点的转换;只有学习好了AQS才能更好的学习后面JUC的那些锁。
最后感慨下AQS里面的逻辑是真心有些绕,本人有些理解的可能有些不够。
参考《Java 并发编程的艺术》
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。