(1) 背景
HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
(2) 简介
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问.
结构
ConcurrentHashMap是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.
基本成员
default_initial_capacitymap默认容量,必须是2的冥
/**
* 默认的初始容量 16
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
default_load_factor默认负载因子(存储的比例)
/**
* 默认的负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
default_concurrency_level默认并发数量,segments数组量(ps:初始化后不能修改)
/**
* 默认的并发数量,会影响segments数组的长度(初始化后不能修改)
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
maximum_capacitymap最大容量
/**
* 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换
* ConcurrentHashMap大小必须是2^n,且小于等于2^30
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
min_segment_table_capacityHashEntry[]默认容量
/**
* 每个segment中table数组的长度,必须是2^n,至少为2
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
max_segments最大并发数,segments数组最大量
/**
* 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
*/
static final int MAX_SEGMENTS = 1 << 16;
retries_before_lock重试次数,在加锁之前
/**
* 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
*/
static final int RETRIES_BEFORE_LOCK = 2;
segmentMask计算segment位置的掩码(segments.length-1)
/**
* 用于segment的掩码值,用于与hash的高位进行取&
*/
final int segmentMask;
segmentShift
/**
* 用于算segment位置时,hash参与运算的位数
*/
final int segmentShift;
segmentssegment数组
/**
* segments数组
*/
final Segment<K,V>[] segments;
HashEntry存储数据的链式结构
static final class HashEntry<K,V> {
// hash值
final int hash;
// key
final K key;
// 保证内存可见性,每次从内存中获取
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 使用volatile语义写入next,保证可见性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
Segment继承ReentrantLock锁,用于存放HashEntry[]
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 对segment加锁时,在阻塞之前自旋的次数
*
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成
* volatile保证可见性
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问
*/
transient int count;
/**
* 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位
* 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性.
* 只能在锁中或其他volatile读保证可见性之间进行访问
*/
transient int modCount;
/**
* 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
*/
transient int threshold;
/**
* 负载因子
*/
final float loadFactor;
/**
* 构造方法
*/
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
/**
* ConcurrentHashMap 构造方法
* @param initialCapacity 初始化容量
* @param loadFactor 负载因子
* @param concurrencyLevel 并发segment,segments数组的长度
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 大于最大segments容量,取最大容量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2^sshift = ssize 例如:sshift = 4,ssize = 16
// 根据concurrencyLevel计算出ssize为segments数组的长度
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) { // 第一次 满足
++sshift; // 第一次 1
ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
}
// segmentShift和segmentMask的定义
this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数
this.segmentMask = ssize - 1; // segments位置范围
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 计算每个segment中table的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// HashEntry[]默认 容量
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 确保cap是2^n
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建segments并初始化第一个segment数组,其余的segment延迟初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
无参构造使用默认参数
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
基本方法
一些UNSAFE方法
HashEntry
setNext
/**
* 使用volatile语义写入next,保证可见性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
entryAt get HashEntry
/**
* 获取给定table的第i个元素,使用volatile读语义
*/
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
setEntryAt set HashEntry
/**
* 设置给定的table的第i个元素,使用volatile写语义
*/
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
put 插入元素
执行流程分析
(1) map的put方法就做了三件事情,找出segments的位置;判断当前位置有没有初始化,没有就调用ensureSegment()方法初始化;然后调用segment的put方法.
(2) segment的put方法.,获取当前segment的锁,成功接着执行,失败调用scanAndLockForPut方法自旋获取锁,成功后也是接着往下执行.
(3) 通过hash计算出位置,获取节点,找出相同的key和hash替换value,返回.没有找到相同的,设置找出的节点为当前创建节点的next节点,设置创建节点前,判断是否需要扩容,需要调用扩容方法rehash();不需要,设置节点,返回,释放锁.
/**
* map的put方法,定位segment
*/
public V put(K key, V value) {
Segment<K,V> s;
// value不能为空
if (value == null)
throw new NullPointerException();
// 获取hash
int hash = hash(key);
// 定位segments 数组的位置
int j = (hash >>> segmentShift) & segmentMask;
// 获取这个segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 为null 初始化当前位置的segment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
/**
* put到table方法
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 是否获取锁,失败自旋获取锁(直到成功)
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 定义位置
int index = (tab.length - 1) & hash;
// 获取第一个桶的第一个元素
// entryAt 底层调用getObjectVolatile 具有volatile读语义
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) { // 证明链式结构有数据 遍历节点数据替换,直到e=null
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { // 找到了相同的key
oldValue = e.value;
if (!onlyIfAbsent) { // 默认值false
e.value = value; // 替换value
++modCount;
}
break; // 结束循环
}
e = e.next;
}
else { // e=null (1) 之前没有数据 (2) 没有找到替换的元素
// node是否为空,这个获取锁的是有关系的
// (1) node不为null,设置node的next为first
// (2) node为null,创建头节点,指定next为first
if (node != null)
// 底层使用 putOrderedObject 方法 具有volatile写语义
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 扩容条件 (1)entry数量大于阈值 (2) 当前table的数量小于最大容量 满足以上条件就扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 扩容方法,方法里面具体讲
rehash(node);
else
// 给table的index位置设置为node,
// node为头结点,原来的头结点first为node的next节点
// 底层也是调用的 putOrderedObject 方法 具有volatile写语义
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
解释下(hash >>> segmentShift) & segmentMask定位segment位置(个人理解)
ensureSegment初始化segment方法
执行流程
(1) 计算位置,使用UNSAFE的方法判断当前位置有没有初始化,然后使用segmets[0]的模板创建一个新的HashEntry[],再次判断当前位置有没有初始化,可能存在多线程同时初始化,然后创建一个新的segment,最后使用自旋cas设置新的segment的位置,保证只有一个线程初始化成功.
/**
*
* @param k 位置
* @return segments
*/
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments; // 当前的segments数组
long u = (k << SSHIFT) + SBASE; // raw offset // 计算原始偏移量,在segments数组的位置
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 判断没有被初始化
Segment<K,V> proto = ss[0]; // use segment 0 as prototype // 获取第一个segment ss[0]
// 这就是为什么要在初始化化map时要初始化一个segment,需要用cap和loadFactoe 为模板
int cap = proto.table.length; // 容量
float lf = proto.loadFactor; // 负载因子
int threshold = (int)(cap * lf); // 阈值
// 初始化ss[k] 内部的tab数组 // recheck
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次检查这个ss[k] 有没有被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
// 创建一个Segment
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 这里用自旋CAS来保证把segments数组的u位置设置为s
// 万一有多线程执行到这一步,只有一个成功,break
// getObjectVolatile 保证了读的可见性,所以一旦有一个线程初始化了,那么就结束自旋
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
scanAndLockForPut自旋获取锁方法
具体流程看代码注释,解释下这个方法的优化(看的某位大佬的博客)
(1) 我们在put方法获取锁失败,才会进入这个方法,这个方法采用自旋获取锁,直到成功才返回,但是使用了自旋次数的限制,这么做的好处是什么了,就是竞争太激烈的话,这个线程可能一直获取不到锁,自旋也是消耗cpu性能的,所以当达到自旋次数时,就阻塞当前线程,直到有线程释放了锁,通知这些线程.在等待过程中是不消耗cpu的.
(2) 当我们进入这个方法时,说明获取锁失败,那么可别是别的线程在对这个segment进行修改操作,所以说如果别的线程在操作之后,我们自己的工作内存中的数据可能已经不是最新的了,这个时候我们使用具有volatile语义的方法重新读了数据,在自旋过程中遍历这些数据,把最新的数据缓存在工作内存中,当前线程再次获取锁时,我们的数据是最新的,就不用重新去住内存中获取,这样在自旋获取的锁的过程中就预热了这些数据,在获取锁之后的执行中就提升了效率.
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash); // 根据hash获取头结点
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // 是为了找到对应hash桶,遍历链表时找到就停止
while (!tryLock()) { // 尝试获取锁,成功就返回,失败就开始自旋
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) { // 结束遍历节点
if (node == null) // 创造新的节点
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0; // 结束遍历
}
else if (key.equals(e.key)) // 找到节点 停止遍历
retries = 0;
else
e = e.next; // 下一个节点 直到为null
}
else if (++retries > MAX_SCAN_RETRIES) { // 达到自旋的最大次数
lock(); // 进入加锁方法,失败进入队列,阻塞当前线程
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // 头结点变化,需要重新遍历,说明有新的节点加入或者移除
retries = -1;
}
}
return node;
}
rehash扩容方法
解释下节点位置变化这一块的处理,如下图所示.
/**
*扩容方法
*/
private void rehash(HashEntry<K,V> node) {
// 旧的table
HashEntry<K,V>[] oldTable = table;
// 旧的table的长度
int oldCapacity = oldTable.length;
// 扩容原来capacity的一倍
int newCapacity = oldCapacity << 1;
// 新的阈值
threshold = (int)(newCapacity * loadFactor);
// 新的table
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码
int sizeMask = newCapacity - 1;
// 遍历旧的table
for (int i = 0; i < oldCapacity ; i++) {
// table中的每一个链表元素
HashEntry<K,V> e = oldTable[i];
if (e != null) { // e不等于null
HashEntry<K,V> next = e.next; // 下一个元素
int idx = e.hash & sizeMask; // 重新计算位置,计算在新的table的位置
if (next == null) // Single node on list 证明只有一个元素
newTable[idx] = e; // 把当前的e设置给新的table
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e; // 当前e
int lastIdx = idx; // 在新table的位置
for (HashEntry<K,V> last = next;
last != null;
last = last.next) { // 遍历链表
int k = last.hash & sizeMask; // 确定在新table的位置
if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
lastIdx = k; // 记录变化位置
lastRun = last; // 记录变化节点
}
}
// 以下把链表设置到新table分为两种情况
// (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
// (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
// ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
// 下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 处理扩容时那个添加的节点
// 计算位置
int nodeIndex = node.hash & sizeMask; // add the new node
// 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
node.setNext(newTable[nodeIndex]);
// 设置位置
newTable[nodeIndex] = node;
// 新table替换旧的table
table = newTable;
}
get 没有加锁,效率高
注意:get方法使用了getObjectVolatile方法读取segment和hashentry,保证是最新的,具有锁的语义,可见性
分析:为什么get不加锁可以保证线程安全
(1) 首先获取value,我们要先定位到segment,使用了UNSAFE的getObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment.
(2) 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table.
(3) 然后我们获取具体的hashentry,也时使用了UNSAFE的getObjectVolatile具有读的volatile语义,然后遍历查找返回.
(4) 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可.
/**
* get 方法
*/
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 获取segment的位置
// getObjectVolatile getObjectVolatile语义读取最新的segment,获取table
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// getObjectVolatile getObjectVolatile语义读取最新的hashEntry,并遍历
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// 找到相同的key 返回
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size 尝试3次不加锁获取sum,如果发生变化就全部加锁,size和containsValue方法的思想也是基本类似.
执行流程
(1) 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum.
(2) 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环.
(3) 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环.
(4) 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size.(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞).
/**
* size
* @return
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // 为true表示size溢出32位
long sum; // modCounts的总和
long last = 0L; // previous sum
int retries = -1; // 第一次不计算次数,所以会重试三次
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) { // 重试次数达到3次 对所有segment加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) { // seg不等于空
sum += seg.modCount; // 不变化和size一样
int c = seg.count; // seg 的size
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last) // 没有变化
break;
last = sum; // 变化,记录这一次的变化值,下次循环时对比.
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
remove replace和remove都是用了scanAndLock这个方法
解释下scanAndLock这个方法,和put方法的scanAndLockForPut方法思想类似,都采用了同样的优化手段.
/**
* 删除方法
*/
final V remove(Object key, int hash, Object value) {
if (!tryLock()) // 获取锁
scanAndLock(key, hash); // 自旋获取锁
V oldValue = null;
try {
HashEntry<K,V>[] tab = table; // 当前table
int index = (tab.length - 1) & hash; // 获取位置
HashEntry<K,V> e = entryAt(tab, index);// 找到元素
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next; // 下一个元素
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { // e的key=传入的key
V v = e.value; // 获取value
if (value == null || value == v || value.equals(v)) { // 如果value相等
if (pred == null) // 说明是头结点,让next节点为头结点即可
setEntryAt(tab, index, next);
else // 说明不是头结点,就把当前节点e的上一个节点pred的next节点设置为当前e的next节点
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
isEmpty 其实也和size的思想类似,不过这个始终没有加锁,提高了性能
执行流程
(1) 第一次遍历就干了一件事,确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,都为0,就累加modCount为sum.
(2) 判断sum是否为0,不为0,就代表了map之前是有数据的,被remove和clean了,modCount指的是操作次数,再次确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,并且累减sum,最后判断sum是否为0,为0就代表没有任何变化,不为0,就代表在第一次统计过程中有线程又添加了元素,所以返回false.但是如果在第二次统计时又发生了变化了,所以这个不是那么的准确,但是不加锁提高了性能,也是一个可以接受的方案.
```public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false; // 某一个不为null,立即返回
sum += seg.modCount;
}
}
// 上面执行完 说明不为空,并且过程可能发生了变化
// 发生变化
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L) // 变化值没有为0,说明不为空
return false;
}
// 没有发生变化
return true;
}
1.7 ConcurrentHashMap 使用了分段锁的思想提高了并发的的访问量,就是使用很多把锁,每一个segment代表了一把锁,每一段只能有一个线程获取锁;但是segment的数量初始化了,就不能修改,所以这也代表了并发的不能修改,这也是1.7的一个局限性.从get方法可以看出使用了UNSAFE的一些方法和volatile关键字来代替锁,提高了并发性.在size和containsValue这些方法提供一种尝试思想,先不加锁尝试统计,如果其中没有变化就返回,有变化接着尝试,达到尝试次数再加锁,这样也避免了立即加锁对并发的影响.
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。