您的位置:首页 > 编程语言 > Java开发

JAVA 并发类(二) ConcurrentHashMap 原理分析

2017-09-18 15:05 274 查看
HashTable是一个线程安全的类,使用synchronzied来锁住整张Hash表来实现线程安全,而ConcurrentHashMap允许多个修改操作并发执行,关键在于使用了锁分离技术。使用多个锁来控制对hash表的不同部分进行修改。ConcurrentHashMap内部使用段Segment来表示这些不同的部分,其中每个段都是一个HashTable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

还有一些需要跨段的方法比如size()和containsValue(),它们可能需要锁定整个表而不仅仅是某个断,而且需要按照顺序先锁定所有段在按照顺序释放所有的段,不按照顺序很有可能会产生死锁。

一. 实现原理

ConcurrentHashMap使用分段技术,把数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,实现真正的并发访问。



ConcurrentHashMap里有很多个Segment,每个Segment都有拥有一把锁[每个Segment都继承自ReentrantLock]

static final class Segment<K,V> extends ReentrantLock implements Serializable


这样的好处是对每个Segment中的数据需要同步操作的话都说使用每个Segment容器对象自身的锁来实现,只有对全局需要改变时锁定的是所有的Segment.

Segment下包含很多HashEntry列表数组,对于一个key经过三次hash操作确定最终位置:

先对一个key进行一个哈希操作,得到哈希值h1

将h1的高几位进行第二次哈希,得到h2,根据h2确定放在哪个Segment

再对h1的第几位进行第三次哈希,得到h3,根据h3确定放在Segement内的哪个HashEntry中

ConcurrentHashMap主要实体类就是三个:ConcurrentHashMap(整个哈希表),Segment(桶),HashEntry(节点)

/**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments;


ConcurrentHashMap允许多个读操作并发执行,读操作不需要加锁。

HashEntry结构如下:

static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
volatile HashEntry<K,V> next;
}


JDK6中的next指针也定义为final,并且每次插入将新节点作为链表的头结点。每次删除一个节点,会将删除节点之前的所有节点 拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除以后 有两条链存在,因而可以保证即使在同一条链中,有一个线程在删除,而另一个线程在遍历,它们都能工作良好,因为遍历的线程能继续使用原有的链。因而这种实现是一种更加细粒度的happens-before关系,即如果遍历线程在删除线程结束后开始,则它能看到删除后的变化,如果它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。如果这不符合你的需求,还是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()做的包装。

注意在HashMap中的Entry只有key是final的

初始化

构造函数如下:

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}


传入的参数有initCapacity,loadFactor,concurrencyLevel三个。

initCapacity表示新创建的ConcurrentHashMap的初始容量,也就是Entry的数量。默认是16个

loadFactor表示负载因子,表示当ConcurrentHashMap中元素个数大于loadFactor*initCapacity时就需要扩容,默认是0.75

cocurrentyLevel表示并发级别,用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为
static final int DEFAULT_CONCURRENCY_LEVEL = 16
理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因

初始化的一些操作

验证参数的合法性 如果不合法 就抛出异常

cocurrentyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认是
static final int MAX_SEGMENTS = 1 << 16
如果超过就设置为这个值

使用循环找到大于等于concurrencyLevel的第一个2的n次方的数size,这个数就是Segment数组的大小,并记录一共向左按位移动的次数sshift,并且设置segmentShift = 32 - sshift,并且segmentMask的值等于ssize - 1,segmentMask的各个二进制位都为1,目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。

检查给的容量值是否大于允许的最大容量值,如果大于该值,设置为该值。最大容量值为
static final int MAXIMUM_CAPACITY = 1 << 30
;

然后计算每个Segment平均应该放置多少个元素,这个值c是向上取整的值。比如初始容量为15,Segment个数为4,则每个Segment平均需要放置4个元素。

最后创建一个Segment实例,将其当做Segment数组的第一个元素。

put 操作

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}


操作如下:

判断value是否为null,如果是null,直接抛出异常

key通过一个哈希运算得到一个哈希值

将得到的哈希值往右按位移动segmentShift位,然后在于segmentMask做于运算得到segment的索引j.在初始化的时候我们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。

使用Unsafe的方式从Segment数组中获取该索引对应的Segment对象

往这个Segment对象中put值,

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}


put操作需要加锁

get 操作

public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}


操作步骤

和put操作一样,先通过key进行两次hash确定应该去哪个Segment中取数据

使用Unsafe获取对应的Segment,然后再进行一次&运算得到HashEntry链表的位置,然后从链表头开始遍历整个链表(因为Hash可能会有碰撞,所以用一个链表保存),如果找到对应的key,则返回对应的value值,如果链表遍历完都没有找到对应的key,则说明Map中不包含该key,返回null

值得注意的是,get操作是不需要加锁的(如果value为null,会调用readValueUnderLock,只有这个步骤会加锁),通过前面提到的volatile和final来确保数据安全。

size操作

size操作与put和get操作最大的区别在于,size操作需要遍历所有的Segment才能算出整个Map的大小,而put和get都只关心一个Segment。

假设我们当前遍历的Segment为SA,那么在遍历SA过程中其他的Segment比如SB可能会被修改,于是这一次运算出来的size值可能并不是Map当前的真正大小

size的实现方法是:

先不lock所有的Segment,遍历所有Segment,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum;         // sum of modCounts
long last = 0L;   // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creat
111b6
ion
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}


一个Map有4个Segment,标记为S1,S2,S3,S4,现在我们要获取Map的size。计算过程是这样的:第一次计算,不对S1,S2,S3,S4加锁,遍历所有的Segment,假设每个Segment的大小分别为1,2,3,4,更新操作次数分别为:2,2,3,1,则这次计算可以得到Map的总大小为1+2+3+4=10,总共更新操作次数为2+2+3+1=8;第二次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设这次每个Segment的大小变成了2,2,3,4,更新次数分别为3,2,3,1,因为两次计算得到的Map更新次数不一致(第一次是8,第二次是9)则可以断定这段时间Map数据被更新,则此时应该再试一次;第三次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设每个Segment的更新操作次数还是为3,2,3,1,则因为第二次计算和第三次计算得到的Map的更新操作的次数是一致的,就能说明第二次计算和第三次计算这段时间内Map数据没有被更新,此时可以直接返回第三次计算得到的Map的大小。最坏的情况:第三次计算得到的数据更新次数和第二次也不一样,则只能先对所有Segment加锁再计算最后解锁。

containsValue操作

containsValue操作采用了和size操作一样的想法:

public boolean containsValue(Object value) {
// Same idea as size()
if (value == null)
throw new NullPointerException();
final Segment<K,V>[] segments = this.segments;
boolean found = false;
long last = 0;
int retries = -1;
try {
outer: for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
for (int j = 0; j < segments.length; ++j) {
HashEntry<K,V>[] tab;
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null && (tab = seg.table) != null) {
for (int i = 0 ; i < tab.length; i++) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) {
found = true;
break outer;
}
}
}
sum += seg.modCount;
}
}
if (retries > 0 && sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}


hash

private int hash(Object k) {
int h = hashSeed;

if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}

h ^= k.hashCode();

// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h <<  15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h <<   3);
h ^= (h >>>  6);
h += (h <<   2) + (h << 14);
return h ^ (h >>> 16);
}


其他

ConcurrentHashMap的key和value都不可以为null,而HashMap中的key可以为null,Hashtable中的key不能为null.

ConcurrentHashMap是线程安全的类 但是不能保证使用了ConcurrentHashMap的操作都是线程安全的。

ConcurrentHashMap的get操作不需要加锁,put操作需要加锁
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: