本篇文章给大家分享的是有关怎么在Java中使用ReentrantReadWriteLock实现多线程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
Java中的集合主要分为四类:1、List列表:有序的,可重复的;2、Queue队列:有序,可重复的;3、Set集合:不可重复;4、Map映射:无序,键唯一,值不唯一。
ReentrantReadWriteLock 是一个读写锁,允许多个读或者一个写线程在执行。
内部的 Sync 继承自 AQS,这个 Sync 包含一个共享读锁 ReadLock 和一个独占写锁 WriteLock。
该锁可以设置公平和非公平,默认非公平。
一个持有写锁的线程可以获取读锁。如果该线程先持有写锁,再持有读锁并释放写锁,称为锁降级。
WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。
public class ReentrantReadWriteLock implements ReadWriteLock { /** 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 持有的AQS子类对象 */ final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} public static class ReadLock implements Lock {} public static class WriteLock implements Lock {} //默认非公平 public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public static class ReadLock implements Lock { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } } public static class WriteLock implements Lock { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } } }
Sync 继承了 AQS,其中有一个 int 的成员变量 state,int 共32位,这里将其视为两部分,高16位表示读的数量,低16位表示写的数量,这里的数量表示线程重入后的总数量。
abstract static class Sync extends AbstractQueuedSynchronizer { //继承的一个int的成员变量,将其拆分为高16位和低16位 //private volatile int state; static final int SHARED_SHIFT = 16; //读一次,锁增加的值 static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //读的数量 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //写的数量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
读锁使用了一个 ThreadLocal<HoldCounter>
让每个线程有一个线程私有的 HoldCounter
,HoldCounter
包含一个线程 id 以及读重入的次数。
查找对应线程的HoldCounter
其实只用一个 ThreadLocalHoldCounter
也足够了。这里为了加快查询,用了两个额外的缓存,即 cachedHoldCounter
、firstReader
和 firstReaderHoldCount
(后两个组合起来相当于一个 HoldCounter
)。
在读锁的相关操作中,先检查 firstReader
是否为当前线程,否则检查 cachedHoldCounter
内部的线程是否为当前线程,如果失败最后会通过 readHolds
来获取当前线程的 HoldCounter
。
static final class HoldCounter { int count = 0; // 使用线程id,而不是线程的引用。这样可以防止垃圾不被回收 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //使用的ThreadLocal private transient ThreadLocalHoldCounter readHolds; //一个缓存 private transient HoldCounter cachedHoldCounter; //组合起来相当于一个缓存 private transient Thread firstReader = null; private transient int firstReaderHoldCount;
下面讲解 tryAcquireShared
和 tryReadLock
,tryReadLock
是一种直接抢占的非公平获取,和 tryAcquireShared
中的非公平获取有所不同。
根据注释
1.检查是否存在其他线程持有的写锁,是的话失败,返回 -1;
2.判断在当前公平状态下能否读,以及是否超过读的最大数量,满足条件则尝试 CAS 修改状态,让 state 加一个单位的读 SHARED_UNIT;修改成功后会根据三种情况,即首次读、firstReader 是当前线程,以及其他情况分别进行处理,成功,返回1;
3.前面未返回结果,会执行 fullTryAcquireShared
。
可以将该方法视为 fullTryAcquireShared
的一次快速尝试,如果尝试失败,会在 fullTryAcquireShared
的自旋中一直执行,直到返回成功或者失败。
//ReadLock public void lock() { sync.acquireShared(1); } //AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //Sync protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); // 如果写的数量不是0,且写线程不是当前线程,失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取读的个数 int r = sharedCount(c); // 如果当前线程想要读,没有被堵塞 // 当前读的数量未超过最大允许的读的个数 // CAS执行成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 第一次读,修改firstReader和firstReaderHoldCount if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 如果当前线程正好是firstReader } else if (firstReader == current) { firstReaderHoldCount++; // 其他情况,经过一系列处理后,使得rh为当前线程的HoldCounter // 对rh的记数加一 } else { HoldCounter rh = cachedHoldCounter; // 如果cached为null或者不是当前线程 if (rh == null || rh.tid != getThreadId(current)) // 从readHolds中get,并修改cached cachedHoldCounter = rh = readHolds.get(); // 如果cached不是null,但记数为null // 这种情况表示当前线程的HoldCounter已经被删除,即为null, // 但cached仍然保留着null之前的那个HoldCounter, // 为了方便,直接将cached设置给ThreadLocal即可 else if (rh.count == 0) readHolds.set(rh); //执行到这里,rh表示当前线程的HoldCounter,记数加1 rh.count++; } return 1; } // 前面未返回结果,执行第三步 return fullTryAcquireShared(current); }
在上述的简单尝试 tryAcquireShared
未能确定结果后,执行第三步 fullTryAcquireShared
自旋来不断尝试获取读锁,直到成功获取锁返回1,或者满足相应条件认定失败返回-1。
1.其他线程持有写锁,失败
2.当前线程读的尝试满足堵塞条件表示当前线程排在其他线程后面,且当前线程没有持有锁即非重入的情况,失败
3.其他情况则不断自旋CAS,达到最大读的数量会抛出异常,其他情况在成功后返回1。
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 存在其他线程持有写锁,返回-1 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //没有写锁,且该线程排在其他线程后面,应该被堵塞 //如果已经持有读锁,此次获取是重入,可以执行else if 之后的操作; //否则,会被堵塞,返回-1。 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly //检查firstReader if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //执行到下一步rh是cached或者readHolds.get(),检查rh rh = readHolds.get(); //在get时,如果不存在,会产生一个新的HoldCounter //记数为0表示不是重入锁,会删除让其重新为null if (rh.count == 0) readHolds.remove(); } } //返回失败 if (rh.count == 0) return -1; } } //达到最大值,不允许继续增加 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //和2.1.1中相似 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
该方法返回当前线程请求获得读锁是否应该被堵塞,在公平锁和非公平锁中的实现不同
在公平锁中,返回在排队的队列中当前线程之前是否存在其他线程,是的话返回 true,当前线程在队列头部或者队列为空返回 false。
// FairSync final boolean readerShouldBlock() { return hasQueuedPredecessors(); } // AQS public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
在非公平锁中,队列中存在两个节点,且第二个节点是独占的写节点,会返回 true,使得新来的读线程堵塞。
这种方式只能在第二个节点是请求写锁的情况下返回 true,避免写锁的无限等待;如果写锁的请求节点在队列的其他位置,返回 false,不影响新来的读线程获取读锁。
如果不按照这种方式处理,而按照队列中的顺序进行处理,则只要存在其他线程在读,每次来一个新的线程请求读锁,总是成功,写锁会一直等待下去。
// NonfairSync final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } // AQS final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
和 fullTryAcquireShared
有相似之处,该方法总是直接去抢占锁,直到其他线程获取写锁返回失败,或者当前当前线程获取读锁返回成功。
//ReadLock public boolean tryLock() { return sync.tryReadLock(); } //Sync final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
tryReleaseShared
在 if/else 中实现了通过 first/cached/readHolds 获取相应的 HoldCounter,并修改其中的记数,记数为0则删除;在 for 中,不断自旋实现 CAS 修改状态 c,如果修改后的状态为0,表示读写锁全部释放,返回 true,否则是 false。
// ReadLockpublic void unlock() { sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}// Syncprotected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 先检查 firstReader是否是当前线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //否则,处理 cached/readHolds中的HoldCounter } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //自旋修改 state for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //只有读写锁均释放干净,才返回true return nextc == 0; }}
下面讲解 tryAcquire
和 tryWriteLock
,tryWriteLock
是一种非公平的获取。
根据注释,tryAcquire 分为三步
1.如果读记数非0,或者写记数非0且写线程不是当前线程,失败
2.写锁的获取应该被堵塞或者CAS失败,失败
3.其他情况,写重入和新来的写线程,均成功
//WriteLockpublic void lock() { sync.acquire(1);}//AQSpublic final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //c分为两部分,写和读 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // c非0,w是0,则读记数非0 || 独占的写线程不是当前线程 // 返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入的情况 setState(c + acquires); return true; } // 写应该被堵塞或者CAS失败,返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 非重入,在CAS成功后,设定独占写线程为当前线程,返回true setExclusiveOwnerThread(current); return true;}
在公平锁中,检查队列前面是否有其他线程在排队,在非公平锁中,总是返回false,即总是不堵塞。
//FairSyncfinal boolean writerShouldBlock() { return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() { return false; // writers can always barge}
和 tryAcquire
在非公平锁的写法基本一样。
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true;}
在 tryRelease
中,修改相应的状态,如果修改后写锁记数为0,则返回 true。
//WriteLockpublic void unlock() { sync.release(1);}//AQSpublic final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}//Syncprotected final boolean tryRelease(int releases) { // 首先检查当前线程是否持有写锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 根据修改后的写记数来确定free boolean free = exclusiveCount(nextc) == 0; // 此时,写锁完全释放,设定写独占线程为null if (free) setExclusiveOwnerThread(null); setState(nextc); // 返回 free return free;}
以上就是怎么在Java中使用ReentrantReadWriteLock实现多线程,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。