Condition介绍
上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充。ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现。
常用方法
Condition中主要的方法有2个
这里的await(),signal()必须在ReentrantLock#lock()和ReentrantLock#unlock()之间调用。
Condition实现分析
Condition的实现也是利用AbstractQueuedSynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,然后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成ReentrantLock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。
await()方法源码分析
await()源码如下
public final void await() throws InterruptedException { //判断是否当前线程是否被中断中断则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //加入等待队列 Node node = addConditionWaiter(); //释放当前线程锁 int savedState = fullyRelease(node); int interruptMode = 0; //判断是否在同步阻塞队列,如果不在一直循环到被加入 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //判断是否被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //获取锁,如果获取中被中断则设置中断状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清除等待队列中被"激活"的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //如果当前线程被中断,处理中断逻辑 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
主要分以下几步
下面来看下addConditionWaiter()实现
private Node addConditionWaiter() { //获取等待队列尾部节点 Node t = lastWaiter; //如果尾部状态不为CONDITION,如果已经被"激活",清理之,然后重新获取尾部节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建以当前线程为基础的节点,并将节点模式设置成CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果尾节点不存在,说明队列为空,将头节点设置成当前节点 if (t == null) firstWaiter = node; //如果尾节点存在,将此节点设置成尾节点的下个节点 else t.nextWaiter = node; //将尾节点设置成当前节点 lastWaiter = node; return node; }
addConditionWaiter()的逻辑很简单,就是创建以当前线程为基础的节点并把节点加入等待队列的尾部待其他线程处理。
下面来看下fullyRelease(Node node)实现
final int fullyRelease(Node node) { boolean failed = true; try { //获取阻塞队列中当前线程节点的锁状态值 int savedState = getState(); //释放当前线程节点锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { //释放失败讲节点等待状态设置成关闭 if (failed) node.waitStatus = Node.CANCELLED; } }
调用getState()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示多次重入,然后调用release(savedState)释放所有锁,如果释放成功返回锁状态值。
下面来看下isOnSyncQueue(Node node)实现
final boolean isOnSyncQueue(Node node) { //判断当前节点是否是CONDITION或者前置节点是否为空如果为空直接返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果下个节点存在,则在同步阻塞队列中返回true if (node.next != null) // If has successor, it must be on queue return true; //遍历查找当前节点是否在同步阻塞队列中 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。
下面看下checkInterruptWhileWaiting(Node node)实现
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态THROW_IE和REINTERRUPT,THROW_IE是调用signal()前被中断返回,REINTERRUPT在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferAfterCancelledWait(node)取判断是那种中断状态,transferAfterCancelledWait(node)方法分2步
如果使用await()方法上面2步其实是没什么作用其最后一定会返回false,因为await()被激活只能调用 signal()方法,而signal()方法肯定已经将节点加入到同步阻塞队列中。所以以上逻辑是给await(long time, TimeUnit unit)等带超时激活方法用的。
acquireQueued(node, savedState)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkCancelledWaiters()方法
private void unlinkCancelledWaiters() { //获取等待队列头节点 Node t = firstWaiter; Node trail = null; while (t != null) { //获取下个节点 Node next = t.nextWaiter; //如果状态不为CONDITION说明已经加入阻塞队列需要清理掉 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else //获取下个节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
此方法就是从头开始查找状态不为CONDITION的节点并清理,状态不为CONDITION节点说明此节点已经加入到阻塞队列,已经不需要维护。
下面来看下reportInterruptAfterWait(interruptMode)方法
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //如果是THROW_IE模式直接抛出异常 if (interruptMode == THROW_IE) throw new InterruptedException(); //如果是REINTERRUPT模式标记线程中断由上层处理中断 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
此方法处理中断逻辑。如果是THROW_IE模式直接抛出异常,如果是REINTERRUPT模式标记线程中断由上层处理中断。
signal()方法源码分析
signal()源码如下
public final void signal() { //是否当前线程持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //通知"激活"头节点线程 if (first != null) doSignal(first); }
先调用isHeldExclusively()判断锁是否被当前线程持有,然后检查等待队列是否为空,不为空就是可以取第一个节点调用doSignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,所以上下文中带""的激活都是这种解释。
下面看下isHeldExclusively()实现
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
实现就是比较下当前线程和持有锁的线程是否同一个
下面看下doSignal(first)的实现
private void doSignal(Node first) { do { //头指头后移一位,如果后面的节点为空,则将尾指头也指向空,说明队列为空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //清空头节点的下个节点 first.nextWaiter = null; //如果"激活"失败者取下个继续,直到成功或者遍历完 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。
下面来看下transferForSignal(first)方法
final boolean transferForSignal(Node node) { //将CONDITION状态设置成0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //加入到同步阻塞队列 Node p = enq(node); int ws = p.waitStatus; //状态异常直接激活 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
(1)此方法先先将CONDITION状态设置成0,因为如果是CONDITION状态加入到同步阻塞队列,激活的时候是不识别的。
(2)加入到同步阻塞队列的尾部。所以同步阻塞队列中前面如果有多个在排队,调用unlock()不会马上激活此节点。
(3)状态异常直接调用unpark激活,这边按理说如果状态异常情况下激活,await()在调用unlock()被激活后会进行相应的异常处理,但看await()代码没有处理则是正常执行。
这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。
以上所述是小编给大家介绍的Java并发编程之Condition源码分析详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对亿速云网站的支持!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。