Java源码分析:ConcurrentHashMap
2016-03-05 14:59
681 查看
ConcurrentHashMap 源码分析
1、概述
上篇文章和大家分享了HashMap的源码,了解了HashMap的原理。和HashMap比较相似的有Hashtable,Hashtable的代码和HashMap大部分相似,关键的区别在于前者是线程安全的,后者线程不安全。Hashtable主要是对所有关于hash表的操作加锁来实现线程安全。当然这种效率肯定是不够高的,所以有了并发包中ConcurrentHashMap。它可以同步map中的数据,同时又能极高地提高并发效率,所以在多线程环境下使用它是很有必要的。先从整体上来分析一下ConcurrentHashMap的原理,Hashtable中所有的线程都使用一把锁,导致效率很低,如果将hash表分成多个数据段,容器中存在多个锁,每个数据段对应一把锁,这样当一个线程访问其中一个数据段的时候,别的线程也能访问其他数据段,提高了并发效率。
ConcurrentHashMap使用Segment作为分段,Segment是一种ReentrantLock,一个Segement作为一个类似HashMap的结构,维护这HashEntry。如果想要对Segment中的元素进行操作,必须先获取Segment锁。
二、ConcurrentHashMap具体实现
关键属性//加载因子,hashEntry个数和数组长度比值 static final float DEFAULT_LOAD_FACTOR = 0.75f; //并发级别 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //数组最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //segment最小容量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大segments个数 static final int MAX_SEGMENTS = 1 << 16; //size方法中如果获取失败的话重新执行的次数,下面会继续解释 static final int RETRIES_BEFORE_LOCK = 2; //segments的掩码值 final int segmentMask; //偏移量 final int segmentShift; //segment数组 final Segment<K,V>[] segments;
下面来看ConcurrentHashMap的构造方法:
//最关键的三个参数就是上面提到的初始化大小、加载因子以及并行等级。 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //规范参数 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments //寻找最佳匹配参数,这个匹配参数是一个不小于给定参数的2次幂 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //偏移量 this.segmentShift = 32 - sshift; //掩码 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); //创建数组 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //这里使用UNSAFE辅助类初始化 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
在concurrentHashMap中,由于使用了volatile变量,读操作一般不用加锁,对容器做结构性修改才会加锁。下面是put操作代码:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); //根据hash算出segement位置 j, int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>) //先判断s是否为空,由于非volatile变量。这里在ensureSegment中再次确保 UNSAFE.getObject// nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); //最后使用segment中的put方法 return s.put(key, hash, value, false); }
所以,concurrentHashMap中的很多方法都被延迟加载到Segment当中。
三、segment
下面主要讲解Segment类的实现。看源码前先整体看一下segment的结构:可以看到segment实现了一个类似hashMap的结构。ConcurrentHashMap使用的默认并发级别会创建包含16个Segment对象的数组,如下图:
下面来看Segment类的源码,由于篇幅太多,这里只介绍部分:
//可以看到它继承了ReentrantLock static final class Segment<K,V> extends ReentrantLock implements Serializable { //这里的table是由HashEntry对象组成的数组 transient volatile HashEntry<K,V>[] table; //包含hashEntry元素的个数,在统计hasnentry的个数时,就可以不用遍历这个Map,而是累加segment的count值。 transient int count; //被修改的次数 transient int modCount; //边界值 transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; }
由于将table设置成了volatile变量,根据volatile变量的语义,读操作不需要加锁。
下面看看segment的put操作:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //tryLock方法是为了检测是否有其他线程持有锁,如果有其他线程持有,则设置为空,如果没有其他线程持有则进入scanAndLockForput方法对put加锁,下面的操作和hashMap类似。锁住的是一个segment对象 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //根据散列码得到下标 int index = (tab.length - 1) & hash; //找到对应的桶 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) {//如果K/V存在 K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value;//设置VALUE if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else {//如果不存在添加一个新链表 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //如果大于边界值则进行重新hash if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock();//解锁 } return oldValue; }
这里如果在理想状态下,如果并行级别是16,则能支持16个线程操作。再继续看segment的rehash()方法:
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; //新的容量等于老容量的两倍 int newCapacity = oldCapacity << 1; //边界值也增大 threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { //不能直接清空每个表,必须保证现有map可读 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //计算新索引 int idx = e.hash & sizeMask; //如果e是单节点,直接赋值 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; //遍历到最后一个末尾节点 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } 把最后一个末尾节点放进新table中的索引位 newTable[lastIdx] = lastRun; // 克隆剩余的节点 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
可以看到,rehash方法首先对数组扩容,再通过hash & sizeMask算出新位置进行搬移。
再看一下remove方法:
//这里在当前线程缓存中移除该链,但是在其他线程的缓存中仍然存在 final V remove(Object key, int hash, Object value) { //检测是否被占用 if (!tryLock()) //自旋获取 scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; //根据hash值找到下标 int index = (tab.length - 1) & hash; //找到hash码对应的桶 HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; //找到要删除的节点 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
相关文章推荐
- Java 深拷贝和浅拷贝
- 深入浅出学习struts1框架(四):从MVC模式代码认识struts1框架
- 深入浅出学习Struts1框架(三):彻底去掉TestServlet中的字符串和if-else语句块
- 深入浅出学习Struts1框架(二):重构MVC模式代码中跳转路径和业务逻辑
- PAT 1007. 素数对猜想 (20);判断一个数为素数;java实现
- 深入浅出学习Struts1框架(一):一个简单mvc模式代码示例开始
- 《Java程序设计基础》 第3章手记
- Java I/O之各个类的作用
- java特种兵读书笔记(5-6)——并发之线程池与调度池
- Spring 获取request对象
- 利用java和浏览器导出的cookies进行模拟登录百度贴吧
- JavaSE入门学习26:Java异常处理(下)
- Spring AOP拦截对Controller的请求时的配置失败
- Java File and FileSystem 源代码分析
- Spring 配置文件详解
- JAVA编程指南 --继承
- eclipse默认编码为GBK,修改为UTF8的方法
- Eclipse 如何设置注释的模板
- 20145325张梓靖 《Java程序设计》第1周学习总结
- java调用天气预报webservice