ConcurrentHashMap浅析
2015-08-14 21:43
302 查看
概述:
ConcurrentHashMap是HashMap的升级版,我们都知道HashMap是不可靠的,线程不安全的,而Hashtable在同步的时候又会将整张表都锁住,从而在多并发的情况下效率低下。于是ConcurrentHashMap出现了,综合了两者的优点,所以一直是高并发情况下开发者的首选,但是相对的,它也有自身的一些不足,我们来分析一下它的原理。ConcurrentHashMap结构分析:
ConcurrentHashMap胜于HashMap,却又不同于Hashtable,怎么解释?ConcurrentHashMap将哈希数组分成了好多段,这里的哈希数组可以想象成HashMap中存放链表头结点的地址的数组,但是却又不同,这个哈希数组中每一段中又存放着一个类似于Hashtable的结构,也就可以说ConcurrentHashMap中每一个段就是一个Hashtable。如图:(图片来自:http://www.cnblogs.com/ITtangtang/p/3948786.html)我们可以看到ConcurrentHashMap是由很多段组成的,而每一个段就是一个Hashtable。这样处理的原因是,我们同步的时候,不用锁住一张表,我们只需要锁住一个段就可以,其他的段也可以进行相应的操作,这也称为“分段锁”。有了大致的理解后,我们来看看具体的信息:成员信息
final Segment<K,V>[] segments;这个就是我们上面说的段数组,默认为16个,每一个段数组内部都是一个Hashtable,而且我们注意到,这个是final类型的,也就是分配好之后就不变了,这样做的原因就是如果ConcurrentHashMap需要rehash的时候,就不会影响Segments数组的大小,只需要改变每一个段内table的数组长度即可。如下是每个段内的成员 分布:
transient volatile int count;//table内元素的数量 //修改次数 transient int modCount; //临界值 transient int threshold; //存放链表头结点的数组,相当于Hashtable中的哈希数组 transient volatile HashEntry<K,V>[] table; //加载因子 final float loadFactor;我们注意到这里的table并不是final,这也就说明了我们上面的观点,rehash时只需要调整table的长度即可。再来看每个链表节点的信息:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; ......}我们可以看到每个节点的key,hash,next都是final的,这也就说明我们在插入删除的时候不能从链表的中间插入删除,只能在表头处理。(下面会详细讲解)。因为像在HashMap在链表的中间插入删除,如果读操作不加锁会导致读取到不一致的数据,想象一个场景:线程A将节点2的数据读取出来,刚好此时线程B将节点2删除了,那么此时读取到的数据便是不一致的,因为原来的节点2的next是指向节点3 的,而现在是指向节点4的。如果我们只允许在表头处理,那么就保证了HashEntry几乎是不可变的(删除的时候会变)。而value的注意到是volatile修饰,就说明value读取的时候一定是最新的值。-----------------------------------------------------------------------------------------------------------------------------------------------------
构造函数:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //寻找小于2的n次方 segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }创建一个ConcurrentHashMap,如果不指定参数,那么默认的是(16,0.75,16)。如果指定的段的数量不是2的n次方,那么我们要找到小于给定数最大的2的n次方。segmentShift 和segmentMask 是在hash算法的时候要用到,后面我们会看到。划分好各个段以后,我们还要对每个段进行初始化。C表示一个段内可以存放多少HashEntry,也就是一个段内hashEntry数组的长度,那么cap就是不大于c的2的n次方。然后我们根据cap和加载因子建立Hashentry数组。至此构造一个新的ConcurrentHashMap完成,我们该往里面放点值了。
Put方法
public V put(K key, V value) { if (value == null)//说明不支持值为null的,但是支持键为null的,后面会说到原因 throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false); }说put方法之前,我们来说说定位,也就是hash。
private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }重新计算key的hashcode的哈希值,这里在哈希的目的是减少哈希冲突,是使每个元素都能够均匀的分布在不同的段上,从而提高容器的存取效率。segmentShift默认28,segmentMask默认15,将计算到的哈希值无符号右移28位,即让高四位进行运算。
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); //扩容 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); //-------1----------------- count = c; // write-volatile //-----------3-------------- } return oldValue; } finally { unlock(); } }Put方法一执行,首先就会先加锁,然后接着判断是否需要扩容,然后定位到待插入元素的链表头,接着循环判断是否存在key键相同的,若有存在的便将旧值保存起来,用新值覆盖旧值。如果没找到,将旧值置为null,创建一个新的节点,放到链表的头部。然后更新count的值。--------------------------------------------------------------------------------------------------------------------------------------------------------------------
get方法
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }这里的hash定位和上面一样,我们可以看到这里一共要定位两次:首先定位到是在哪一个段,然后在这个段的HashEntry数组中定位到是哪一个索引,即哪一个链表。
V get(Object key, int hash) { if (count != 0) { // read-volatile //--------------2------------------ HashEntry<K,V> e = getFirst(hash); //--------------4------------------ while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }这里的思路也很简单:首先判断count是否为0,如果为0说明不存在,直接返回null。若存在则定位到链表头部位,遍历链表,如果找到键相同的,那么就返回对应的值,没找到返回null。我们简单的分析完会产生几个问题。
1、为什么判断count?
首先来看count的定义:transient volatile int count;发现是volatile,接着我们在源码中发现,无论是删除还是插入都会修改count,这就说明无论是哪一个线程更改了数据,count对所有线程都是可见的,而且都是最新值。所以在这里判断一下,确保ConcurrentHashMap中的确没有数据。
2、if (v != null)是什么意思?
一般来说,我们已经在链表中找到我们需要的键了,那么对应的值肯定是存在的,因为ConcurrentHashMap中不允许值为null,那么这句话不是多此一举吗?no,no,no,我们想象一个场景:线程A在执行到put代码块中的第1行(虚线注释的第1行),此时正在执行HashEntry的构造函数,就在此时线程B执行get代码块中的第2行(虚线注释的第2行),由于线程A的构造函数还没赋值,当然后续的第3行,也就没有给count赋值,此时线程B就读到没有值,就会返回null,可是事实是我们已经put进去了,但是没有读到值。所以为了防止这类情况发生,我们需要在判断一次,如果value等于null,进入readValueUnderLock(e)(图中recheck那一行),代码如下:V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }我们可以明确的看出来,这里是先要获得锁,也就是保证其他人先不能修改数据,然后返回value。-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
remove方法:
public V remove(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null); }同样的先定位数据。
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }我们定位到链表的头结点以后,遍历找寻key,如果没有对应的键,那么就返回null,如果发现了,我们要把待删节点之前的数据节点,逐个拷贝出来(因为前面说过,next是不可变的,所以只能在表头操作),从前往后拷贝,拷贝第1个节点,放到待删节点的前1个节点,依次类推,可能还有点模糊,我们看下面的图:图片忘记复制的哪里的。依照图上面的假如我们要删除e3,我们就需要先拷贝e1,作为e3的前驱,让其挂上e3的后继,然后删除e3,依次类推。------------------------------------------------------------------------------------------------------------------------------------------------------------
Size方法
我们来看看会锁定全段的方法。public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { // Resort to locking all segments sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; }size方法首先不是锁定全段,首先进行遍历,对段上的数据进行遍历,期间用到了modCount,并且将每个段的修改次数都写进了一个新数组,然后累加所有的和,接下来在第二次遍历时,判断第一次得到的修改次数和第二次的是否一样,如果完全一样而且累加和都相等,说明在求取size的过程中没有其他任何线程对其进行修改;相反如果在求取size的过程中,另一个线程修改了数据,就会造成第一个遍历的和第二次遍历得到的修改次数不一样,那么此时会在这样判断一次,也就是整体的在循环一次,因为RETRIES_BEFORE_LOCK默认是2。完成后如果还不能无法避免其他线程在修改,那么此时锁定所有的段,,然后对其进行求取size,最后释放所有段的锁。containsVlaue的方法和上面类似。------------------------------------------------------------------------------------------------------------------------------------------------------------------
迭代器的那些事:
说到迭代器肯定是弱一致性的,其实ConcurrentHashMap中的get、clear方法等都是弱一致性的。那么什么是弱一致性?拿get来说,刚才我们看到如果我们网ConcurrentHashMap中put了一条数据,我们希望立即读到的时候,有可能会读取不到的,但是ConcurrentHashMap中的确存在这条数据。这就是弱一致性。换成clear,也就是当我清空了一部分的节点空间,然后其余线程又再put数据,因为clear不是同步的,所以其他线程有可能刚好put到我们清空的空间中,这样clear返回的时候,ConcurrentHashMap中就可能还有数据产生。迭代器也是一样,当迭代遍历的时候,其他线程修改了遍历过的数据,那我们遍历得到的结果就和ConcurrentHashMap中实际存在的数据就会不一致。所以相对于的Hashtable便是强一致的,当在遍历时,只要某个线程修改了数据,那就会抛出ConcurrentModificationException异常。ConcurrentHashMap的弱一致性是为了提高效率,是一致性和效率之间的权衡。如果要保持绝对的一致性,可以选择Hashtable或者同步后的HashMap。小结
ConcurrentHashMap是HashMap的线程安全升级版,是Hashtable的改进版。但是不能完全替代Hashtable,因为在某些必须保证一致性的前提下,我们会选择Hashtable。ConcurrentHashMap的键可以为null,值却不能为null,这和以前我们见到的HashMap都不一样(HashMap是键值都可为null,Hashtable是键值都不可为null),那么究竟是为什么呢?我们刚才在上面的get代码快中发现,如果get的key存在,但是value却为null,则需要重新加锁后重读,所以值为null是有特殊用处的。ConcurrentHashMap和Hashtable:Hashtable利用synchronized锁住整张表,当Hashtable的数量增大到一定程度时,迭代时就会锁住整张表,就造成了性能和效率的下降,而ConcurrentHashMap则使用分段锁,每次只用锁住一个段,不影响其他的段进行操作。ConcurrentHashMap在读取的时候不用加锁,所以也造成了弱一致性。而Hashtable无论任何情况都会加锁,所以也成就了强一致性。相关文章推荐
- list3
- HDU 1166 敌兵布阵 (线段树 单点增减, 区间求和)
- hdu 4370 0 or 1 (最短路)
- HAVOK只碰撞一个shape的一面
- CocoaPods第三方库管理 iOS
- poj-2367 Genealogical tree
- 6174问题
- hdu5024
- 进程间通信的方式
- 排序算法整理
- 第四天:内存管理-property参数
- android调用httpclient.excute报android.os.NetworkOnMainThreadException 异常处理
- Html5 Canvas笔记(1)-CanvasAppTemplate代码
- android学习之Service的笔记,里面service里有监听用户通话状态的实例
- Hibernate的一对多增改级联操作
- iOS_UIImage中 + imageNamed: 和 + imageWithContentsOfFile:两个方法的区别
- Number of 1 Bits
- 算法的时间复杂度
- hdu 1002 A + B Problem II(大正整数相加)
- 112 Path Sum