您的位置:首页 > 编程语言 > Java开发

Java源码分析:ConcurrentHashMap

2016-03-05 14:59 681 查看

ConcurrentHashMap 源码分析

1、概述

上篇文章和大家分享了HashMap的源码,了解了HashMap的原理。和HashMap比较相似的有Hashtable,Hashtable的代码和HashMap大部分相似,关键的区别在于前者是线程安全的,后者线程不安全。Hashtable主要是对所有关于hash表的操作加锁来实现线程安全。当然这种效率肯定是不够高的,所以有了并发包中ConcurrentHashMap。它可以同步map中的数据,同时又能极高地提高并发效率,所以在多线程环境下使用它是很有必要的。

先从整体上来分析一下ConcurrentHashMap的原理,Hashtable中所有的线程都使用一把锁,导致效率很低,如果将hash表分成多个数据段,容器中存在多个锁,每个数据段对应一把锁,这样当一个线程访问其中一个数据段的时候,别的线程也能访问其他数据段,提高了并发效率。

ConcurrentHashMap使用Segment作为分段,Segment是一种ReentrantLock,一个Segement作为一个类似HashMap的结构,维护这HashEntry。如果想要对Segment中的元素进行操作,必须先获取Segment锁。



二、ConcurrentHashMap具体实现

关键属性

//加载因子,hashEntry个数和数组长度比值
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//数组最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//segment最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//最大segments个数
static final int MAX_SEGMENTS = 1 << 16;

//size方法中如果获取失败的话重新执行的次数,下面会继续解释
static final int RETRIES_BEFORE_LOCK = 2;
//segments的掩码值
final int segmentMask;

//偏移量
final int segmentShift;

//segment数组
final Segment<K,V>[] segments;


下面来看ConcurrentHashMap的构造方法:

//最关键的三个参数就是上面提到的初始化大小、加载因子以及并行等级。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//规范参数
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
//寻找最佳匹配参数,这个匹配参数是一个不小于给定参数的2次幂
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//偏移量
this.segmentShift = 32 - sshift;
//掩码
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//创建数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//这里使用UNSAFE辅助类初始化
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}


在concurrentHashMap中,由于使用了volatile变量,读操作一般不用加锁,对容器做结构性修改才会加锁。下面是put操作代码:

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
//根据hash算出segement位置 j,
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)
//先判断s是否为空,由于非volatile变量。这里在ensureSegment中再次确保
UNSAFE.getObject// nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
s = ensureSegment(j);
//最后使用segment中的put方法
return s.put(key, hash, value, false);
}


所以,concurrentHashMap中的很多方法都被延迟加载到Segment当中。

三、segment

下面主要讲解Segment类的实现。看源码前先整体看一下segment的结构:



可以看到segment实现了一个类似hashMap的结构。ConcurrentHashMap使用的默认并发级别会创建包含16个Segment对象的数组,如下图:



下面来看Segment类的源码,由于篇幅太多,这里只介绍部分:

//可以看到它继承了ReentrantLock
static final class Segment<K,V> extends ReentrantLock implements Serializable {

//这里的table是由HashEntry对象组成的数组
transient volatile HashEntry<K,V>[] table;

//包含hashEntry元素的个数,在统计hasnentry的个数时,就可以不用遍历这个Map,而是累加segment的count值。
transient int count;
//被修改的次数
transient int modCount;

//边界值
transient int threshold;

final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}


由于将table设置成了volatile变量,根据volatile变量的语义,读操作不需要加锁。

下面看看segment的put操作:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock方法是为了检测是否有其他线程持有锁,如果有其他线程持有,则设置为空,如果没有其他线程持有则进入scanAndLockForput方法对put加锁,下面的操作和hashMap类似。锁住的是一个segment对象
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//根据散列码得到下标
int index = (tab.length - 1) & hash;
//找到对应的桶
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {//如果K/V存在
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;//设置VALUE
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {//如果不存在添加一个新链表
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果大于边界值则进行重新hash
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();//解锁
}
return oldValue;
}


这里如果在理想状态下,如果并行级别是16,则能支持16个线程操作。再继续看segment的rehash()方法:

private void rehash(HashEntry<K,V> node) {

HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//新的容量等于老容量的两倍
int newCapacity = oldCapacity << 1;

//边界值也增大
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
//不能直接清空每个表,必须保证现有map可读

HashEntry<K,V> e = oldTable[i];

if (e != null) {
HashEntry<K,V> next = e.next;
//计算新索引
int idx = e.hash & sizeMask;
//如果e是单节点,直接赋值
if (next == null)   //  Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//遍历到最后一个末尾节点
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
把最后一个末尾节点放进新table中的索引位
newTable[lastIdx] = lastRun;
// 克隆剩余的节点
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}


可以看到,rehash方法首先对数组扩容,再通过hash & sizeMask算出新位置进行搬移。

再看一下remove方法:

//这里在当前线程缓存中移除该链,但是在其他线程的缓存中仍然存在
final V remove(Object key, int hash, Object value) {
//检测是否被占用
if (!tryLock())
//自旋获取
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
//根据hash值找到下标
int index = (tab.length - 1) & hash;
//找到hash码对应的桶
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
//找到要删除的节点
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {

if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: