您的位置:首页 > 其它

ConcurrentHashMap浅析

2015-08-14 21:43 302 查看

概述:

ConcurrentHashMap是HashMap的升级版,我们都知道HashMap是不可靠的,线程不安全的,而Hashtable在同步的时候又会将整张表都锁住,从而在多并发的情况下效率低下。于是ConcurrentHashMap出现了,综合了两者的优点,所以一直是高并发情况下开发者的首选,但是相对的,它也有自身的一些不足,我们来分析一下它的原理。

ConcurrentHashMap结构分析:

ConcurrentHashMap胜于HashMap,却又不同于Hashtable,怎么解释?ConcurrentHashMap将哈希数组分成了好多段,这里的哈希数组可以想象成HashMap中存放链表头结点的地址的数组,但是却又不同,这个哈希数组中每一段中又存放着一个类似于Hashtable的结构,也就可以说ConcurrentHashMap中每一个段就是一个Hashtable。如图:(图片来自:http://www.cnblogs.com/ITtangtang/p/3948786.html)我们可以看到ConcurrentHashMap是由很多段组成的,而每一个段就是一个Hashtable。这样处理的原因是,我们同步的时候,不用锁住一张表,我们只需要锁住一个段就可以,其他的段也可以进行相应的操作,这也称为“分段锁”。有了大致的理解后,我们来看看具体的信息:

成员信息

final Segment<K,V>[] segments;
这个就是我们上面说的段数组,默认为16个,每一个段数组内部都是一个Hashtable,而且我们注意到,这个是final类型的,也就是分配好之后就不变了,这样做的原因就是如果ConcurrentHashMap需要rehash的时候,就不会影响Segments数组的大小,只需要改变每一个段内table的数组长度即可。如下是每个段内的成员 分布:
       transient volatile int count;//table内元素的数量        //修改次数               transient int modCount;        //临界值               transient int threshold;        //存放链表头结点的数组,相当于Hashtable中的哈希数组            transient volatile HashEntry<K,V>[] table;        //加载因子        final float loadFactor;
我们注意到这里的table并不是final,这也就说明了我们上面的观点,rehash时只需要调整table的长度即可。再来看每个链表节点的信息:
static final class HashEntry<K,V> {        final K key;        final int hash;        volatile V value;     final HashEntry<K,V> next;        ......}
我们可以看到每个节点的key,hash,next都是final的,这也就说明我们在插入删除的时候不能从链表的中间插入删除,只能在表头处理。(下面会详细讲解)。因为像在HashMap在链表的中间插入删除,如果读操作不加锁会导致读取到不一致的数据,想象一个场景:线程A将节点2的数据读取出来,刚好此时线程B将节点2删除了,那么此时读取到的数据便是不一致的,因为原来的节点2的next是指向节点3 的,而现在是指向节点4的。如果我们只允许在表头处理,那么就保证了HashEntry几乎是不可变的(删除的时候会变)。而value的注意到是volatile修饰,就说明value读取的时候一定是最新的值。-----------------------------------------------------------------------------------------------------------------------------------------------------

构造函数:

public ConcurrentHashMap(int initialCapacity,                             float loadFactor, int concurrencyLevel) {        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)            throw new IllegalArgumentException();        if (concurrencyLevel > MAX_SEGMENTS)            concurrencyLevel = MAX_SEGMENTS;        // Find power-of-two sizes best matching arguments        int sshift = 0;        int ssize = 1;        while (ssize < concurrencyLevel) {            ++sshift;            ssize <<= 1;        }  //寻找小于2的n次方        segmentShift = 32 - sshift;        segmentMask = ssize - 1;        this.segments = Segment.newArray(ssize);          if (initialCapacity > MAXIMUM_CAPACITY)            initialCapacity = MAXIMUM_CAPACITY;        int c = initialCapacity / ssize;        if (c * ssize < initialCapacity)            ++c;        int cap = 1;        while (cap < c)            cap <<= 1;        for (int i = 0; i < this.segments.length; ++i)            this.segments[i] = new Segment<K,V>(cap, loadFactor);    }
创建一个ConcurrentHashMap,如果不指定参数,那么默认的是(16,0.75,16)。如果指定的段的数量不是2的n次方,那么我们要找到小于给定数最大的2的n次方。segmentShift 和segmentMask 是在hash算法的时候要用到,后面我们会看到。划分好各个段以后,我们还要对每个段进行初始化。C表示一个段内可以存放多少HashEntry,也就是一个段内hashEntry数组的长度,那么cap就是不大于c的2的n次方。然后我们根据cap和加载因子建立Hashentry数组。至此构造一个新的ConcurrentHashMap完成,我们该往里面放点值了。

Put方法

public V put(K key, V value) {        if (value == null)//说明不支持值为null的,但是支持键为null的,后面会说到原因            throw new NullPointerException();        int hash = hash(key.hashCode());        return segmentFor(hash).put(key, hash, value, false);    }
说put方法之前,我们来说说定位,也就是hash。
 private static int hash(int h) {        // Spread bits to regularize both segment and index locations,        // using variant of single-word Wang/Jenkins hash.        h += (h <<  15) ^ 0xffffcd7d;        h ^= (h >>> 10);        h += (h <<   3);        h ^= (h >>>  6);        h += (h <<   2) + (h << 14);        return h ^ (h >>> 16);    } final Segment<K,V> segmentFor(int hash) {        return segments[(hash >>> segmentShift) & segmentMask];       }
重新计算key的hashcode的哈希值,这里在哈希的目的是减少哈希冲突,是使每个元素都能够均匀的分布在不同的段上,从而提高容器的存取效率。segmentShift默认28,segmentMask默认15,将计算到的哈希值无符号右移28位,即让高四位进行运算。
V put(K key, int hash, V value, boolean onlyIfAbsent) {            lock();            try {                int c = count;                if (c++ > threshold) // ensure capacity                    rehash();  //扩容                HashEntry<K,V>[] tab = table;                int index = hash & (tab.length - 1);                HashEntry<K,V> first = tab[index];                HashEntry<K,V> e = first;                while (e != null && (e.hash != hash || !key.equals(e.key)))                    e = e.next;                V oldValue;                if (e != null) {                    oldValue = e.value;                    if (!onlyIfAbsent)                        e.value = value;                }                else {                    oldValue = null;                    ++modCount;                    tab[index] = new HashEntry<K,V>(key, hash, first, value);                     //-------1-----------------                    count = c; // write-volatile                                                //-----------3--------------                }                return oldValue;            } finally {                unlock();            }           }
Put方法一执行,首先就会先加锁,然后接着判断是否需要扩容,然后定位到待插入元素的链表头,接着循环判断是否存在key键相同的,若有存在的便将旧值保存起来,用新值覆盖旧值。如果没找到,将旧值置为null,创建一个新的节点,放到链表的头部。然后更新count的值。--------------------------------------------------------------------------------------------------------------------------------------------------------------------

get方法

public V get(Object key) {        int hash = hash(key.hashCode());        return segmentFor(hash).get(key, hash);       }
这里的hash定位和上面一样,我们可以看到这里一共要定位两次:首先定位到是在哪一个段,然后在这个段的HashEntry数组中定位到是哪一个索引,即哪一个链表。
V get(Object key, int hash) {            if (count != 0) { // read-volatile                         //--------------2------------------                HashEntry<K,V> e = getFirst(hash);                    //--------------4------------------                while (e != null) {                    if (e.hash == hash && key.equals(e.key)) {                        V v = e.value;                        if (v != null)                            return v;                        return readValueUnderLock(e); // recheck                          }                    e = e.next;                }            }            return null;           }
这里的思路也很简单:首先判断count是否为0,如果为0说明不存在,直接返回null。若存在则定位到链表头部位,遍历链表,如果找到键相同的,那么就返回对应的值,没找到返回null。我们简单的分析完会产生几个问题。

1、为什么判断count?

首先来看count的定义:
transient volatile int count;
发现是volatile,接着我们在源码中发现,无论是删除还是插入都会修改count,这就说明无论是哪一个线程更改了数据,count对所有线程都是可见的,而且都是最新值。所以在这里判断一下,确保ConcurrentHashMap中的确没有数据。

2、if (v != null)是什么意思?

一般来说,我们已经在链表中找到我们需要的键了,那么对应的值肯定是存在的,因为ConcurrentHashMap中不允许值为null,那么这句话不是多此一举吗?no,no,no,我们想象一个场景:线程A在执行到put代码块中的第1行(虚线注释的第1行),此时正在执行HashEntry的构造函数,就在此时线程B执行get代码块中的第2行(虚线注释的第2行),由于线程A的构造函数还没赋值,当然后续的第3行,也就没有给count赋值,此时线程B就读到没有值,就会返回null,可是事实是我们已经put进去了,但是没有读到值。所以为了防止这类情况发生,我们需要在判断一次,如果value等于null,进入readValueUnderLock(e)(图中recheck那一行),代码如下:
 V readValueUnderLock(HashEntry<K,V> e) {            lock();            try {                return e.value;            } finally {                unlock();            }        }
我们可以明确的看出来,这里是先要获得锁,也就是保证其他人先不能修改数据,然后返回value。-----------------------------------------------------------------------------------------------------------------------------------------------------------------------

remove方法:

public V remove(Object key) {	int hash = hash(key.hashCode());        return segmentFor(hash).remove(key, hash, null);    }
同样的先定位数据。
V remove(Object key, int hash, Object value) {            lock();            try {                int c = count - 1;                HashEntry<K,V>[] tab = table;                int index = hash & (tab.length - 1);                HashEntry<K,V> first = tab[index];                HashEntry<K,V> e = first;                while (e != null && (e.hash != hash || !key.equals(e.key)))                    e = e.next;                V oldValue = null;                if (e != null) {                    V v = e.value;                    if (value == null || value.equals(v)) {                        oldValue = v;                        // All entries following removed node can stay                        // in list, but all preceding ones need to be                        // cloned.                        ++modCount;                        HashEntry<K,V> newFirst = e.next;                        for (HashEntry<K,V> p = first; p != e; p = p.next)                            newFirst = new HashEntry<K,V>(p.key, p.hash,                                                          newFirst, p.value);                        tab[index] = newFirst;                        count = c; // write-volatile                    }                }                return oldValue;            } finally {                unlock();            }           }
我们定位到链表的头结点以后,遍历找寻key,如果没有对应的键,那么就返回null,如果发现了,我们要把待删节点之前的数据节点,逐个拷贝出来(因为前面说过,next是不可变的,所以只能在表头操作),从前往后拷贝,拷贝第1个节点,放到待删节点的前1个节点,依次类推,可能还有点模糊,我们看下面的图:图片忘记复制的哪里的。依照图上面的假如我们要删除e3,我们就需要先拷贝e1,作为e3的前驱,让其挂上e3的后继,然后删除e3,依次类推。------------------------------------------------------------------------------------------------------------------------------------------------------------

Size方法

我们来看看会锁定全段的方法。
 public int size() {        final Segment<K,V>[] segments = this.segments;        long sum = 0;        long check = 0;        int[] mc = new int[segments.length];        // Try a few times to get accurate count. On failure due to        // continuous async changes in table, resort to locking.        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {            check = 0;            sum = 0;            int mcsum = 0;            for (int i = 0; i < segments.length; ++i) {                sum += segments[i].count;                mcsum += mc[i] = segments[i].modCount;            }            if (mcsum != 0) {                for (int i = 0; i < segments.length; ++i) {                    check += segments[i].count;                    if (mc[i] != segments[i].modCount) {                        check = -1; // force retry                        break;                    }                }            }            if (check == sum)                break;        }        if (check != sum) { // Resort to locking all segments            sum = 0;            for (int i = 0; i < segments.length; ++i)                segments[i].lock();            for (int i = 0; i < segments.length; ++i)                sum += segments[i].count;            for (int i = 0; i < segments.length; ++i)                segments[i].unlock();        }        if (sum > Integer.MAX_VALUE)            return Integer.MAX_VALUE;        else            return (int)sum;    }
size方法首先不是锁定全段,首先进行遍历,对段上的数据进行遍历,期间用到了modCount,并且将每个段的修改次数都写进了一个新数组,然后累加所有的和,接下来在第二次遍历时,判断第一次得到的修改次数和第二次的是否一样,如果完全一样而且累加和都相等,说明在求取size的过程中没有其他任何线程对其进行修改;相反如果在求取size的过程中,另一个线程修改了数据,就会造成第一个遍历的和第二次遍历得到的修改次数不一样,那么此时会在这样判断一次,也就是整体的在循环一次,因为RETRIES_BEFORE_LOCK默认是2。完成后如果还不能无法避免其他线程在修改,那么此时锁定所有的段,,然后对其进行求取size,最后释放所有段的锁。containsVlaue的方法和上面类似。------------------------------------------------------------------------------------------------------------------------------------------------------------------

迭代器的那些事:

说到迭代器肯定是弱一致性的,其实ConcurrentHashMap中的get、clear方法等都是弱一致性的。那么什么是弱一致性?拿get来说,刚才我们看到如果我们网ConcurrentHashMap中put了一条数据,我们希望立即读到的时候,有可能会读取不到的,但是ConcurrentHashMap中的确存在这条数据。这就是弱一致性。换成clear,也就是当我清空了一部分的节点空间,然后其余线程又再put数据,因为clear不是同步的,所以其他线程有可能刚好put到我们清空的空间中,这样clear返回的时候,ConcurrentHashMap中就可能还有数据产生。迭代器也是一样,当迭代遍历的时候,其他线程修改了遍历过的数据,那我们遍历得到的结果就和ConcurrentHashMap中实际存在的数据就会不一致。所以相对于的Hashtable便是强一致的,当在遍历时,只要某个线程修改了数据,那就会抛出ConcurrentModificationException异常。ConcurrentHashMap的弱一致性是为了提高效率,是一致性和效率之间的权衡。如果要保持绝对的一致性,可以选择Hashtable或者同步后的HashMap。

小结

ConcurrentHashMap是HashMap的线程安全升级版,是Hashtable的改进版。但是不能完全替代Hashtable,因为在某些必须保证一致性的前提下,我们会选择Hashtable。ConcurrentHashMap的键可以为null,值却不能为null,这和以前我们见到的HashMap都不一样(HashMap是键值都可为null,Hashtable是键值都不可为null),那么究竟是为什么呢?我们刚才在上面的get代码快中发现,如果get的key存在,但是value却为null,则需要重新加锁后重读,所以值为null是有特殊用处的。ConcurrentHashMap和Hashtable:Hashtable利用synchronized锁住整张表,当Hashtable的数量增大到一定程度时,迭代时就会锁住整张表,就造成了性能和效率的下降,而ConcurrentHashMap则使用分段锁,每次只用锁住一个段,不影响其他的段进行操作。ConcurrentHashMap在读取的时候不用加锁,所以也造成了弱一致性。而Hashtable无论任何情况都会加锁,所以也成就了强一致性。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: