您的位置:首页 > 其它

ConcurrentHashMap原理分析

2016-08-12 15:28 489 查看

org.jboss.netty.util.internal.ConcurrentHashMap

通过分析Hashtable就知道,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。



主要实体类

ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系。ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点

/**
*Thesegments,eachofwhichisaspecializedhashtable
*/
finalSegment<K,V>[]segments;

staticfinalclassHashEntry<K,V>{
finalKkey;
finalinthash;
volatileVvalue;
finalHashEntry<K,V>next;
}


可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
其它为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。

finalSegment<K,V>segmentFor(inthash){
returnsegments[(hash>>>segmentShift)&segmentMask];
}


ConcurrentHashMap的数据成员

publicclassConcurrentHashMap<K,V>extendsAbstractMap<K,V>
implementsConcurrentMap<K,V>,Serializable{
finalintsegmentMask;
finalintsegmentShift;
finalSegment<K,V>[]segments;
}


所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。每个Segment相当于一个子Hash表

staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{
transientvolatileintcount;
transientintmodCount;
transientintthreshold;
transientvolatileHashEntry<K,V>[]table;
finalfloatloadFactor;
}


count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步。loadFactor表示负载因子。

删除操作

publicVremove(Objectkey){
hash=hash(key.hashCode());
returnsegmentFor(hash).remove(key,hash,null);
}
//操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现:
Vremove(Objectkey,inthash,Objectvalue){
lock();
try{
intc=count-1;
HashEntry<K,V>[]tab=table;
intindex=hash&(tab.length-1);
HashEntry<K,V>first=tab[index];
HashEntry<K,V>e=first;
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))
e=e.next;

VoldValue=null;
if(e!=null){
Vv=e.value;
if(value==null||value.equals(v)){
oldValue=v;
++modCount;
HashEntry<K,V>newFirst=e.next;
*for(HashEntry<K,V>p=first;p!=e;p=p.next)
*newFirst=newHashEntry<K,V>(p.key,p.hash,
newFirst,p.value);
tab[index]=newFirst;
count=c;//write-volatile
}
}
returnoldValue;
}finally{
unlock();
}
}


整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。中间那个for循环是做什么用的呢?(*号标记)从代码来看,就是将定位之后的所有entry克隆并拼回前面去,但有必要吗?每次删除一个元素就要将那之前的元素克隆一遍?这点其实是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次。至于entry为什么要设置为不变性,这跟不变性的访问不需要同步从而节省时间有关。

整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

插入操作

Vput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){
lock();
try{
intc=count;
if(c++>threshold)//ensurecapacity
rehash();
HashEntry<K,V>[]tab=table;
intindex=hash&(tab.length-1);
HashEntry<K,V>first=tab[index];
HashEntry<K,V>e=first;
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))
e=e.next;

VoldValue;
if(e!=null){
oldValue=e.value;
if(!onlyIfAbsent)
e.value=value;
}
else{
oldValue=null;
++modCount;
tab[index]=newHashEntry<K,V>(key,hash,first,value);
count=c;//write-volatile
}
returnoldValue;
}finally{
unlock();
}
}


该方法也是在持有段锁(锁定整个segment)的情况下执行的,这当然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时能够rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n。而比较难懂的是这句intindex=hash&(tab.length-1),原来segment里面才是真正的hashtable,即每个segment是一个传统意义上的hashtable,这里就是找出需要的entry在table的哪一个位置,之后得到的entry就是这个链的第一个节点,如果e!=null,说明找到了,这是就要替换节点的值(onlyIfAbsent==false),否则,我们需要new一个entry,它的后继是first,而让tab[index]指向它,什么意思呢?实际上就是将这个新entry插入到链头。

获取操作

Vget(Objectkey,inthash){
if(count!=0){//read-volatile当前桶的数据个数是否为0
HashEntry<K,V>e=getFirst(hash);得到头节点
while(e!=null){
if(e.hash==hash&&key.equals(e.key)){
Vv=e.value;
if(v!=null)
returnv;
returnreadValueUnderLock(e);//recheck
}
e=e.next;
}
}
returnnull;
}

VreadValueUnderLock(HashEntry<K,V>e){
  lock();
  try{
    returne.value;
  }finally{
    unlock();
  }
}



get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是根据hash和key对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。

注意点:

对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index]=newHashEntry<K,V>(key,hash,first,value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空(在还没执行到this.value=value时,tab[index]已经被赋值并通过e.value使用了),这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值。

HashEntry(Kkey,inthash,ConcurrentHashMap.HashEntry<K,V>next,Vvalue){
this.hash=hash;
this.next=next;
this.key=key;
this.value=value;
}


java.util.concurrent.ConcurrentHashMap

我们首先来看一下ConcurrentHashMap类的声明:

publicclassConcurrentHashMap<K,V>extendsAbstractMap<K,V>
implementsConcurrentMap<K,V>,Serializable


其中,这个类继承了java.util.AbstractMap中已有的实现,这个在前面整理HashMap的时候已经提过了,重点看后面实现的接口ConcurrentMap和Serializable。Serializable是做序列化处理的,而ConcurrentMap的定义又如下:[/code]
publicinterfaceConcurrentMap<K,V>extendsMap<K,V>{

VputIfAbsent(Kkey,Vvalue);

booleanremove(Objectkey,Objectvalue);

booleanreplace(Kkey,VoldValue,VnewValue);

Vreplace(Kkey,Vvalue);
}

其中规定了4个方法

VputIfAbsent(Kkey,Vvalue);如果没有这个key,则放入这个key-value,返回null,否则返回key对应的value。

booleanremove(Objectkey,Objectvalue);移除key和对应的value,如果key对应的不是value,移除失败

booleanreplace(Kkey,VoldValue,VnewValue);替代key对应的值,仅当当前值为旧值

Vreplace(Kkey,Vvalue);替代key对应的值,只要当前有值

这些方法都在ConcurrentHashMap实现了,后文会部分提到这些。

1.构造方法和ConcurrentHashMap的Segment实现
先看下构造方法:

publicConcurrentHashMap(intinitialCapacity,
floatloadFactor,intconcurrencyLevel)

有几个重载的方法,说这个参数最全的,有3个参数,除了HashMap中涉及到的loadFactor和initialCapacity外,还有一个concurrencyLevel,翻译过来就是并发级别或者并发度。

与此对应,ConcurrentHashMap中有一个segments数组对象,元素类型是ConcurrentHashMap的内部类Segment,而concurrencyLevel就是这个segments数组的大小。

我们来看下这个Segment类:

staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable

Segment扩展了ReentrantLock并实现了Serializable接口。除此之外,我们还发现这个类里实现的东西和java.util.HashMap非常相似。

实际上,这个类正是整个ConcurrentHashMap实现的关键。我想,作为这篇文章读者的您,应该会用到过各式各样的数据库,就拿masql的innoDB引擎来看,它除了支持表级锁意外,还支持行级锁,意义就在于这减小了锁粒度,当只对某行数据进行操作的时候,很可能没有必要限制同一个表中其它行的数据。在这个类中,这个Segment也是起到了同样的作用。每个Segment本身就是一个ReentrantLock,只有要修改的数据存在在同一个Segment,才有可能会需要锁定,这样就提高了多线程情况下效率,没必要所有线程全部等待锁。

2.get()方法源码分析
我们先看下get()方法的实现。

publicVget(Objectkey){
Segment<K,V>s;//manuallyintegrateaccessmethodstoreduceoverhead
HashEntry<K,V>[]tab;
inth=hash(key);
longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;
if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&
(tab=s.table)!=null){
for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile
(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);
e!=null;e=e.next){
Kk;
if((k=e.key)==key||(e.hash==h&&key.equals(k)))
returne.value;
}
}
returnnull;
}


实际上,就是通过key计算得到的hash值,确定对应的Segment对象,并用原子操作获取到对应的table和table中hash值对应的对象。

我们可以看到,在这个过程中,是没有显式用到锁的,仅仅是通过Unsafe类和原子操作,避免了阻塞,提高了性能。

3.put()和putIfAbsent()方法分析
先看下put()方法的源码:

publicVput(Kkey,Vvalue){
Segment<K,V>s;
if(value==null)
thrownewNullPointerException();
inthash=hash(key);
intj=(hash>>>segmentShift)&segmentMask;
if((s=(Segment<K,V>)UNSAFE.getObject//nonvolatile;recheck
(segments,(j<<SSHIFT)+SBASE))==null)//inensureSegment
s=ensureSegment(j);
returns.put(key,hash,value,false);
}


我们看到其中最后是使用Segment的put()方法的调用,而putIfAbsent()的方法的调用,仅仅是最后一个参数不同。

我们进一步看下Segment的put()方法的调用:

finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){
HashEntry<K,V>node=tryLock()?null:
scanAndLockForPut(key,hash,value);
VoldValue;
try{
HashEntry<K,V>[]tab=table;
intindex=(tab.length-1)&hash;
HashEntry<K,V>first=entryAt(tab,index);
for(HashEntry<K,V>e=first;;){
if(e!=null){
Kk;
if((k=e.key)==key||
(e.hash==hash&&key.equals(k))){
oldValue=e.value;
if(!onlyIfAbsent){
e.value=value;
++modCount;
}
break;
}
e=e.next;
}
else{
if(node!=null)
node.setNext(first);
else
node=newHashEntry<K,V>(hash,key,value,first);
intc=count+1;
if(c>threshold&&tab.length<MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab,index,node);
++modCount;
count=c;
oldValue=null;
break;
}
}
}finally{
unlock();
}
returnoldValue;
}

和java.util.HashMap比起来,基本类似,有两个地方不同。一个是对onlyIfAbsent参数的判断处理。另一个则是对整个操作过程加锁,并在加锁的地方做了稍微巧妙的处理。就是在在等锁的过程中,不断的寻找和构建node节点对象,不管是否这个方法最终是否创建到了node节点,当这个方法返回时,一定是已经获得了这个Segment对应的锁。

而这个寻找和创建节点所在的循环,一方面是做节点的遍历查询,另一方面也是起到了自旋锁的作用,避免直接调用lock()而等锁阻塞,因为这样会在系统实现层面阻塞和唤醒线程,是有一定的切换成本的,对提高效率不利。

4.综述
其实,纵观整个类的实现,都肯定会涉及到Segment的处理和其中方法的调用。对这些方法的调用,分为读操作和写处理,通常读操作没用用锁,而在修改操作,如remove()replace()等方法都和put()类似,在整个操作过程中对Segment进行了尝试加锁和自旋等锁的前提操作,并最后释放锁。这些写操作的等锁基本上和put()中的scanAndLockForPut()方法等同,除了需要创建节点。

从整体上看,由于Segment降低了并发中的锁粒度,并在写操作使用了锁。保证了整个ConcurrentHashMap的线程安全,也保证了并发运行时的效率。

5.其它
在java.util.conccurent包中并没有TreeMap,但对于有序要求的容器,有基于skiplist数据结构的Map实现ConcurrentSkipListMap,而且也有skiplist的ConcurrentSkipListSet,和java.util包中的Set一样,是基于Map的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: