ConcurrentHashMap原理分析
2016-08-12 15:28
489 查看
org.jboss.netty.util.internal.ConcurrentHashMap
通过分析Hashtable就知道,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。主要实体类
ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系。ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点/** *Thesegments,eachofwhichisaspecializedhashtable */ finalSegment<K,V>[]segments; staticfinalclassHashEntry<K,V>{ finalKkey; finalinthash; volatileVvalue; finalHashEntry<K,V>next; }
可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
其它为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。
finalSegment<K,V>segmentFor(inthash){ returnsegments[(hash>>>segmentShift)&segmentMask]; }
ConcurrentHashMap的数据成员
publicclassConcurrentHashMap<K,V>extendsAbstractMap<K,V> implementsConcurrentMap<K,V>,Serializable{ finalintsegmentMask; finalintsegmentShift; finalSegment<K,V>[]segments; }
所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。每个Segment相当于一个子Hash表
staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{ transientvolatileintcount; transientintmodCount; transientintthreshold; transientvolatileHashEntry<K,V>[]table; finalfloatloadFactor; }
count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步。loadFactor表示负载因子。
删除操作
publicVremove(Objectkey){ hash=hash(key.hashCode()); returnsegmentFor(hash).remove(key,hash,null); } //操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现: Vremove(Objectkey,inthash,Objectvalue){ lock(); try{ intc=count-1; HashEntry<K,V>[]tab=table; intindex=hash&(tab.length-1); HashEntry<K,V>first=tab[index]; HashEntry<K,V>e=first; while(e!=null&&(e.hash!=hash||!key.equals(e.key))) e=e.next; VoldValue=null; if(e!=null){ Vv=e.value; if(value==null||value.equals(v)){ oldValue=v; ++modCount; HashEntry<K,V>newFirst=e.next; *for(HashEntry<K,V>p=first;p!=e;p=p.next) *newFirst=newHashEntry<K,V>(p.key,p.hash, newFirst,p.value); tab[index]=newFirst; count=c;//write-volatile } } returnoldValue; }finally{ unlock(); } }
整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。中间那个for循环是做什么用的呢?(*号标记)从代码来看,就是将定位之后的所有entry克隆并拼回前面去,但有必要吗?每次删除一个元素就要将那之前的元素克隆一遍?这点其实是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次。至于entry为什么要设置为不变性,这跟不变性的访问不需要同步从而节省时间有关。
整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。
插入操作
Vput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){ lock(); try{ intc=count; if(c++>threshold)//ensurecapacity rehash(); HashEntry<K,V>[]tab=table; intindex=hash&(tab.length-1); HashEntry<K,V>first=tab[index]; HashEntry<K,V>e=first; while(e!=null&&(e.hash!=hash||!key.equals(e.key))) e=e.next; VoldValue; if(e!=null){ oldValue=e.value; if(!onlyIfAbsent) e.value=value; } else{ oldValue=null; ++modCount; tab[index]=newHashEntry<K,V>(key,hash,first,value); count=c;//write-volatile } returnoldValue; }finally{ unlock(); } }
该方法也是在持有段锁(锁定整个segment)的情况下执行的,这当然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时能够rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n。而比较难懂的是这句intindex=hash&(tab.length-1),原来segment里面才是真正的hashtable,即每个segment是一个传统意义上的hashtable,这里就是找出需要的entry在table的哪一个位置,之后得到的entry就是这个链的第一个节点,如果e!=null,说明找到了,这是就要替换节点的值(onlyIfAbsent==false),否则,我们需要new一个entry,它的后继是first,而让tab[index]指向它,什么意思呢?实际上就是将这个新entry插入到链头。
获取操作
Vget(Objectkey,inthash){ if(count!=0){//read-volatile当前桶的数据个数是否为0 HashEntry<K,V>e=getFirst(hash);得到头节点 while(e!=null){ if(e.hash==hash&&key.equals(e.key)){ Vv=e.value; if(v!=null) returnv; returnreadValueUnderLock(e);//recheck } e=e.next; } } returnnull; }
VreadValueUnderLock(HashEntry<K,V>e){
lock();
try{
returne.value;
}finally{
unlock();
}
}
get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是根据hash和key对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。
注意点:
对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。
如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index]=newHashEntry<K,V>(key,hash,first,value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空(在还没执行到this.value=value时,tab[index]已经被赋值并通过e.value使用了),这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值。
HashEntry(Kkey,inthash,ConcurrentHashMap.HashEntry<K,V>next,Vvalue){
this.hash=hash;
this.next=next;
this.key=key;
this.value=value;
}
java.util.concurrent.ConcurrentHashMap
我们首先来看一下ConcurrentHashMap类的声明:publicclassConcurrentHashMap<K,V>extendsAbstractMap<K,V>
implementsConcurrentMap<K,V>,Serializable
其中,这个类继承了java.util.AbstractMap中已有的实现,这个在前面整理HashMap的时候已经提过了,重点看后面实现的接口ConcurrentMap和Serializable。Serializable是做序列化处理的,而ConcurrentMap的定义又如下:[/code]
publicinterfaceConcurrentMap<K,V>extendsMap<K,V>{
VputIfAbsent(Kkey,Vvalue);
booleanremove(Objectkey,Objectvalue);
booleanreplace(Kkey,VoldValue,VnewValue);
Vreplace(Kkey,Vvalue);
}
其中规定了4个方法
VputIfAbsent(Kkey,Vvalue);如果没有这个key,则放入这个key-value,返回null,否则返回key对应的value。
booleanremove(Objectkey,Objectvalue);移除key和对应的value,如果key对应的不是value,移除失败
booleanreplace(Kkey,VoldValue,VnewValue);替代key对应的值,仅当当前值为旧值
Vreplace(Kkey,Vvalue);替代key对应的值,只要当前有值
这些方法都在ConcurrentHashMap实现了,后文会部分提到这些。
1.构造方法和ConcurrentHashMap的Segment实现
先看下构造方法:
publicConcurrentHashMap(intinitialCapacity,
floatloadFactor,intconcurrencyLevel)
有几个重载的方法,说这个参数最全的,有3个参数,除了HashMap中涉及到的loadFactor和initialCapacity外,还有一个concurrencyLevel,翻译过来就是并发级别或者并发度。
与此对应,ConcurrentHashMap中有一个segments数组对象,元素类型是ConcurrentHashMap的内部类Segment,而concurrencyLevel就是这个segments数组的大小。
我们来看下这个Segment类:
staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable
Segment扩展了ReentrantLock并实现了Serializable接口。除此之外,我们还发现这个类里实现的东西和java.util.HashMap非常相似。
实际上,这个类正是整个ConcurrentHashMap实现的关键。我想,作为这篇文章读者的您,应该会用到过各式各样的数据库,就拿masql的innoDB引擎来看,它除了支持表级锁意外,还支持行级锁,意义就在于这减小了锁粒度,当只对某行数据进行操作的时候,很可能没有必要限制同一个表中其它行的数据。在这个类中,这个Segment也是起到了同样的作用。每个Segment本身就是一个ReentrantLock,只有要修改的数据存在在同一个Segment,才有可能会需要锁定,这样就提高了多线程情况下效率,没必要所有线程全部等待锁。
2.get()方法源码分析
我们先看下get()方法的实现。
publicVget(Objectkey){
Segment<K,V>s;//manuallyintegrateaccessmethodstoreduceoverhead
HashEntry<K,V>[]tab;
inth=hash(key);
longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;
if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&
(tab=s.table)!=null){
for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile
(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);
e!=null;e=e.next){
Kk;
if((k=e.key)==key||(e.hash==h&&key.equals(k)))
returne.value;
}
}
returnnull;
}
实际上,就是通过key计算得到的hash值,确定对应的Segment对象,并用原子操作获取到对应的table和table中hash值对应的对象。
我们可以看到,在这个过程中,是没有显式用到锁的,仅仅是通过Unsafe类和原子操作,避免了阻塞,提高了性能。
3.put()和putIfAbsent()方法分析
先看下put()方法的源码:
publicVput(Kkey,Vvalue){
Segment<K,V>s;
if(value==null)
thrownewNullPointerException();
inthash=hash(key);
intj=(hash>>>segmentShift)&segmentMask;
if((s=(Segment<K,V>)UNSAFE.getObject//nonvolatile;recheck
(segments,(j<<SSHIFT)+SBASE))==null)//inensureSegment
s=ensureSegment(j);
returns.put(key,hash,value,false);
}
我们看到其中最后是使用Segment的put()方法的调用,而putIfAbsent()的方法的调用,仅仅是最后一个参数不同。
我们进一步看下Segment的put()方法的调用:
finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){
HashEntry<K,V>node=tryLock()?null:
scanAndLockForPut(key,hash,value);
VoldValue;
try{
HashEntry<K,V>[]tab=table;
intindex=(tab.length-1)&hash;
HashEntry<K,V>first=entryAt(tab,index);
for(HashEntry<K,V>e=first;;){
if(e!=null){
Kk;
if((k=e.key)==key||
(e.hash==hash&&key.equals(k))){
oldValue=e.value;
if(!onlyIfAbsent){
e.value=value;
++modCount;
}
break;
}
e=e.next;
}
else{
if(node!=null)
node.setNext(first);
else
node=newHashEntry<K,V>(hash,key,value,first);
intc=count+1;
if(c>threshold&&tab.length<MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab,index,node);
++modCount;
count=c;
oldValue=null;
break;
}
}
}finally{
unlock();
}
returnoldValue;
}
和java.util.HashMap比起来,基本类似,有两个地方不同。一个是对onlyIfAbsent参数的判断处理。另一个则是对整个操作过程加锁,并在加锁的地方做了稍微巧妙的处理。就是在在等锁的过程中,不断的寻找和构建node节点对象,不管是否这个方法最终是否创建到了node节点,当这个方法返回时,一定是已经获得了这个Segment对应的锁。
而这个寻找和创建节点所在的循环,一方面是做节点的遍历查询,另一方面也是起到了自旋锁的作用,避免直接调用lock()而等锁阻塞,因为这样会在系统实现层面阻塞和唤醒线程,是有一定的切换成本的,对提高效率不利。
4.综述
其实,纵观整个类的实现,都肯定会涉及到Segment的处理和其中方法的调用。对这些方法的调用,分为读操作和写处理,通常读操作没用用锁,而在修改操作,如remove()replace()等方法都和put()类似,在整个操作过程中对Segment进行了尝试加锁和自旋等锁的前提操作,并最后释放锁。这些写操作的等锁基本上和put()中的scanAndLockForPut()方法等同,除了需要创建节点。
从整体上看,由于Segment降低了并发中的锁粒度,并在写操作使用了锁。保证了整个ConcurrentHashMap的线程安全,也保证了并发运行时的效率。
5.其它
在java.util.conccurent包中并没有TreeMap,但对于有序要求的容器,有基于skiplist数据结构的Map实现ConcurrentSkipListMap,而且也有skiplist的ConcurrentSkipListSet,和java.util包中的Set一样,是基于Map的。
相关文章推荐
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- jdk concurrent collection---ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- Java集合---ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析(转载)
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析
- ConcurrentHashMap原理分析