本文最后更新于 184 天前,其中的信息可能已经有所发展或是发生改变。
ConcurrentHashMap 类
ConcurrentHashMap 1.7
在JDK1.7中ConcurrentHashMap采用了数组+分段锁的方式实现。
Segment(分段锁)-减少锁的粒度
ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
存储结构
Java 7 版本 ConcurrentHashMap 的存储结构如图:
ConcurrnetHashMap 由很多个 Segment 组合,而每一个 Segment 是一个类似于 HashMap 的结构,所以每一个 HashMap 的内部可以进行扩容。
但是 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数是 16 个,所以可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。
初始化
通过 ConcurrentHashMap 的无参构造:
| |
| |
| |
| |
| public ConcurrentHashMap() { |
| this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); |
| } |
无参构造中调用了有参构造,传入了三个参数的默认值,他们的值是
| |
| |
| |
| static final int DEFAULT_INITIAL_CAPACITY = 16; |
| |
| |
| |
| |
| static final float DEFAULT_LOAD_FACTOR = 0.75f; |
| |
| |
| |
| |
| static final int DEFAULT_CONCURRENCY_LEVEL = 16; |
Segment 下面 entryset 数组的大小是用 DEFAULT_INITIAL_CAPACITY/DEFAULT_CONCURRENCY_LEVEL 求出来的。
接着看下这个有参构造函数的内部实现逻辑:
| @SuppressWarnings("unchecked") |
| public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { |
| |
| if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) |
| throw new IllegalArgumentException(); |
| |
| if (concurrencyLevel > MAX_SEGMENTS) |
| concurrencyLevel = MAX_SEGMENTS; |
| |
| |
| int sshift = 0; |
| int ssize = 1; |
| |
| while (ssize < concurrencyLevel) { |
| ++sshift; |
| ssize <<= 1; |
| } |
| |
| this.segmentShift = 32 - sshift; |
| |
| this.segmentMask = ssize - 1; |
| |
| if (initialCapacity > MAXIMUM_CAPACITY) |
| initialCapacity = MAXIMUM_CAPACITY; |
| |
| |
| int c = initialCapacity / ssize; |
| if (c * ssize < initialCapacity) |
| ++c; |
| int cap = MIN_SEGMENT_TABLE_CAPACITY; |
| |
| while (cap < c) |
| cap <<= 1; |
| |
| |
| Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), |
| (HashEntry<K,V>[])new HashEntry[cap]); |
| Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; |
| UNSAFE.putOrderedObject(ss, SBASE, s0); |
| this.segments = ss; |
| } |
总结一下在 Java 7 中 ConcurrnetHashMap 的初始化逻辑:
- 必要参数校验
- 校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值。无参构造默认值是 16
- 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值,作为初始化容量大小,默认是 16
- 记录 segmentShift 偏移量,这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到,默认是 32 - sshift = 28.
- 记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15
- 初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容。
put 操作
接着上面的初始化参数继续查看 put 方法源码:
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| public V put(K key, V value) { |
| Segment<K,V> s; |
| if (value == null) |
| throw new NullPointerException(); |
| int hash = hash(key); |
| |
| |
| |
| |
| |
| |
| int j = (hash >>> segmentShift) & segmentMask; |
| |
| if ((s = (Segment<K,V>)UNSAFE.getObject |
| (segments, (j << SSHIFT) + SBASE)) == null) |
| |
| s = ensureSegment(j); |
| |
| return s.put(key, hash, value, false); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @SuppressWarnings("unchecked") |
| private Segment<K,V> ensureSegment(int k) { |
| final Segment<K,V>[] ss = this.segments; |
| long u = (k << SSHIFT) + SBASE; |
| Segment<K,V> seg; |
| |
| if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { |
| Segment<K,V> proto = ss[0]; |
| |
| int cap = proto.table.length; |
| |
| float lf = proto.loadFactor; |
| |
| int threshold = (int)(cap * lf); |
| |
| HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; |
| if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { |
| |
| Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); |
| |
| while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) |
| == null) { |
| |
| if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) |
| break; |
| } |
| } |
| } |
| return seg; |
| } |
上面的源码分析了 ConcurrentHashMap 在 put 一个数据时的处理流程,下面梳理下具体流程:
- 计算要 put 的 key 的位置,获取指定位置的 Segment。
- 如果指定位置的 Segment 为空,则初始化这个 Segment.
初始化 Segment 流程:
- 检查计算得到的位置的 Segment 是否为null
- 为 null 继续初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组
- 再次检查计算得到的指定位置的 Segment 是否为null
- 使用创建的 HashEntry 数组初始化这个 Segment
- 自旋判断计算得到的指定位置的 Segment 是否为null,使用 CAS 在这个位置赋值为 Segment
- Segment.put 插入 key,value 值。
上面探究了获取 Segment 段和初始化 Segment 段的操作。
最后一行的 Segment 的 put 方法还没有查看,继续分析:
| final V put(K key, int hash, V value, boolean onlyIfAbsent) { |
| |
| HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); |
| V oldValue; |
| try { |
| HashEntry<K,V>[] tab = table; |
| |
| int index = (tab.length - 1) & hash; |
| |
| HashEntry<K,V> first = entryAt(tab, index); |
| for (HashEntry<K,V> e = first;;) { |
| if (e != null) { |
| |
| K k; |
| if ((k = e.key) == key || |
| (e.hash == hash && key.equals(k))) { |
| oldValue = e.value; |
| if (!onlyIfAbsent) { |
| e.value = value; |
| ++modCount; |
| } |
| break; |
| } |
| e = e.next; |
| } |
| else { |
| |
| if (node != null) |
| node.setNext(first); |
| else |
| node = new HashEntry<K,V>(hash, key, value, first); |
| int c = count + 1; |
| |
| if (c > threshold && tab.length < MAXIMUM_CAPACITY) |
| rehash(node); |
| else |
| |
| setEntryAt(tab, index, node); |
| ++modCount; |
| count = c; |
| oldValue = null; |
| break; |
| } |
| } |
| } finally { |
| unlock(); |
| } |
| return oldValue; |
| } |
由于 Segment 继承了 ReentrantLock,所以 Segment 内部可以很方便的获取锁,put 流程就用到了这个功能。
- tryLock() 获取锁,获取不到使用
scanAndLockForPut
方法继续获取
- 计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry
- 遍历 put 新元素,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。
- 如果这个位置上的 HashEntry 不存在:
- 如果当前容量大于扩容阀值,小于最大容量,进行扩容
- 直接头插法插入。
- 如果这个位置上的 HashEntry 存在:
- 判断链表当前元素 Key 和 hash 值是否和要 put 的 key 和 hash 值一致。一致则替换值
- 不一致,获取链表下一个节点,直到发现相同进行值替换,或者链表表里完毕没有相同的。
- 如果当前容量大于扩容阀值,小于最大容量,进行扩容
- 直接链表头插法插入
- 如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null
这里面的第一步中的 scanAndLockForPut
操作这里没有介绍,这个方法做的操作就是不断的自旋 tryLock()
获取锁。
当自旋次数大于指定次数时,使用 lock()
阻塞获取锁。在自旋时顺表获取下 hash 位置的 HashEntry。
下面结合源码查看一下:
| private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { |
| HashEntry<K,V> first = entryForHash(this, hash); |
| HashEntry<K,V> e = first; |
| HashEntry<K,V> node = null; |
| int retries = -1; |
| |
| while (!tryLock()) { |
| HashEntry<K,V> f; |
| if (retries < 0) { |
| if (e == null) { |
| if (node == null) |
| node = new HashEntry<K,V>(hash, key, value, null); |
| retries = 0; |
| } |
| else if (key.equals(e.key)) |
| retries = 0; |
| else |
| e = e.next; |
| } |
| else if (++retries > MAX_SCAN_RETRIES) { |
| |
| lock(); |
| break; |
| } |
| else if ((retries & 1) == 0 && |
| (f = entryForHash(this, hash)) != first) { |
| e = first = f; |
| retries = -1; |
| } |
| } |
| return node; |
| } |
rehash 扩容
ConcurrentHashMap 的扩容只会扩容到原来的两倍。老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+ oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置。
| private void rehash(HashEntry<K,V> node) { |
| HashEntry<K,V>[] oldTable = table; |
| |
| int oldCapacity = oldTable.length; |
| |
| int newCapacity = oldCapacity << 1; |
| |
| threshold = (int)(newCapacity * loadFactor); |
| |
| HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; |
| |
| int sizeMask = newCapacity - 1; |
| for (int i = 0; i < oldCapacity ; i++) { |
| |
| HashEntry<K,V> e = oldTable[i]; |
| if (e != null) { |
| HashEntry<K,V> next = e.next; |
| |
| int idx = e.hash & sizeMask; |
| if (next == null) |
| |
| newTable[idx] = e; |
| else { |
| |
| HashEntry<K,V> lastRun = e; |
| int lastIdx = idx; |
| |
| |
| for (HashEntry<K,V> last = next; last != null; last = last.next) { |
| int k = last.hash & sizeMask; |
| if (k != lastIdx) { |
| lastIdx = k; |
| lastRun = last; |
| } |
| } |
| |
| newTable[lastIdx] = lastRun; |
| |
| for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { |
| |
| V v = p.value; |
| int h = p.hash; |
| int k = h & sizeMask; |
| HashEntry<K,V> n = newTable[k]; |
| newTable[k] = new HashEntry<K,V>(h, p.key, v, n); |
| } |
| } |
| } |
| } |
| |
| int nodeIndex = node.hash & sizeMask; |
| node.setNext(newTable[nodeIndex]); |
| newTable[nodeIndex] = node; |
| table = newTable; |
| } |
这里第一个 for 是为了寻找这样一个节点,这个节点后面的所有 next 节点的新位置都是相同的,然后把这个作为一个链表赋值到新位置。
第二个 for 循环是为了把剩余的元素通过头插法插入到指定位置链表。
get 操作
到这里就很简单了,get 方法只需要两步即可:
- 计算得到 key 的存放位置。
- 遍历指定位置查找相同 key 的 value 值。
| public V get(Object key) { |
| Segment<K,V> s; |
| HashEntry<K,V>[] tab; |
| int h = hash(key); |
| long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; |
| |
| if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && |
| (tab = s.table) != null) { |
| for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile |
| (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); |
| e != null; e = e.next) { |
| |
| K k; |
| if ((k = e.key) == key || (e.hash == h && key.equals(k))) |
| return e.value; |
| } |
| } |
| return null; |
| } |
ConcurrentHashMap 1.8
存储结构
可以发现 Java8 的 ConcurrentHashMap 相对于 Java7 来说变化比较大,不再是之前的 Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表 / 红黑树。
当冲突链表达到一定长度时,链表会转换成红黑树。
CAS 操作
JDK 1.8 的 ConcurrentHashMap 保证线程安全是依赖于 CAS 操作,因此先来介绍一下这个
CAS(Compare-and-Swap/Exchange),即比较并替换,是一种实现并发常用到的技术。
CAS核心算法:
- 执行函数:CAS (V,E,N)
- V 表示准备要被更新的变量 (内存的值)
- E 表示我们提供的 期望的值 (期望的原值)
- N 表示新值 ,准备更新 V 的值 (新值)
算法思路:
- V是共享变量
- 我们拿着自己准备的这个E,去跟V去比较,
- 如果 E == V :说明当前没有其它线程在操作,所以我们把 N 这个值 写入对象的 V 变量中
- 如果 E != V :说明我们准备的这个 E 已经过时了,所以我们要重新准备一个最新的E ,去跟V 比较
- 比较成功后才能更新 V 的值为 N
如果多个线程同时使用CAS操作一个变量的时候,只有一个线程能够修改成功。
其余的线程提供的期望值已经与共享变量的值不一样了,所以均会失败。
由于CAS操作属于乐观派,它总是认为自己能够操作成功,所以操作失败的线程将会再次发起操作,而不是被OS挂起。
所以说,即使 CAS操作没有使用同步锁,其它线程也能够知道对共享变量的影响。
因为其它线程没有被挂起,并且将会再次发起修改尝试,所以无锁操作即CAS操作天生免疫死锁。
另外一点需要知道的是,CAS是系统原语,CAS操作是一条CPU的原子指令,所以不会有线程安全问题。
注意:
- ABA问题:
- E 和 E‘ 对比相同是不能保证百分百保证,其他线程没有在自己线程执行计算的过程里抢锁成功过
- 有可能其他线程操作后新 E’ 值和旧 E 值一样
- 解决方案:
- 在 E 对象里加个操作次数变量就行,每次判断时对比两个,E和操作次数就OK了
- 因为 ABA 问题中就算 E 相同操作次数也绝不相同
另外,CAS是靠硬件实现的,从而在硬件层面提升效率。实现方式是基于硬件平台的汇编指令,在intel的CPU中,使用的是 cmpxchg 指令。
但是在多核CPU的情况下,这个指令也不能保证原子性,需要在前面加上 lock 指令。lock 指令可以保证一个 CPU 核心在操作期间独占一片内存区域。这个实现方式为:总线锁和缓存锁。
在多核处理器的结构中,CPU 核心并不能直接访问内存,而是统一通过一条总线访问。
总线锁就是锁住这条总线,使其他核心无法访问内存。这种方式代价太大了,会导致其他核心停止工作。
而缓存锁并不锁定总线,只是锁定某部分内存区域。当一个 CPU 核心将内存区域的数据读取到自己的缓存区后,它会锁定缓存对应的内存区域。锁住期间,其他核心无法操作这块内存区域。
初始化 initTable
| |
| |
| |
| private final Node<K,V>[] initTable() { |
| Node<K,V>[] tab; int sc; |
| while ((tab = table) == null || tab.length == 0) { |
| |
| if ((sc = sizeCtl) < 0) |
| |
| Thread.yield(); |
| else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { |
| try { |
| if ((tab = table) == null || tab.length == 0) { |
| int n = (sc > 0) ? sc : DEFAULT_CAPACITY; |
| @SuppressWarnings("unchecked") |
| Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; |
| table = tab = nt; |
| sc = n - (n >>> 2); |
| } |
| } finally { |
| sizeCtl = sc; |
| } |
| break; |
| } |
| } |
| return tab; |
| } |
从源码中可以发现 ConcurrentHashMap 的初始化是通过自旋和 CAS 操作完成的。里面需要注意的是变量 sizeCtl
,它的值决定着当前的初始化状态。
- -1 说明正在初始化
- -N 说明有N-1个线程正在进行扩容
- 表示 table 初始化大小,如果 table 没有初始化
- 表示 table 容量,如果 table 已经初始化
put 操作
直接过一遍 put 源码:
| public V put(K key, V value) { |
| return putVal(key, value, false); |
| } |
| |
| |
| final V putVal(K key, V value, boolean onlyIfAbsent) { |
| |
| if (key == null || value == null) throw new NullPointerException(); |
| int hash = spread(key.hashCode()); |
| int binCount = 0; |
| for (Node<K,V>[] tab = table;;) { |
| |
| Node<K,V> f; int n, i, fh; |
| if (tab == null || (n = tab.length) == 0) |
| |
| tab = initTable(); |
| else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { |
| |
| if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) |
| break; |
| } |
| else if ((fh = f.hash) == MOVED) |
| tab = helpTransfer(tab, f); |
| else { |
| V oldVal = null; |
| |
| synchronized (f) { |
| if (tabAt(tab, i) == f) { |
| |
| if (fh >= 0) { |
| binCount = 1; |
| |
| for (Node<K,V> e = f;; ++binCount) { |
| K ek; |
| if (e.hash == hash && |
| ((ek = e.key) == key || |
| (ek != null && key.equals(ek)))) { |
| oldVal = e.val; |
| if (!onlyIfAbsent) |
| e.val = value; |
| break; |
| } |
| Node<K,V> pred = e; |
| if ((e = e.next) == null) { |
| pred.next = new Node<K,V>(hash, key, |
| value, null); |
| break; |
| } |
| } |
| } |
| else if (f instanceof TreeBin) { |
| |
| Node<K,V> p; |
| binCount = 2; |
| if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, |
| value)) != null) { |
| oldVal = p.val; |
| if (!onlyIfAbsent) |
| p.val = value; |
| } |
| } |
| } |
| } |
| if (binCount != 0) { |
| if (binCount >= TREEIFY_THRESHOLD) |
| treeifyBin(tab, i); |
| if (oldVal != null) |
| return oldVal; |
| break; |
| } |
| } |
| } |
| addCount(1L, binCount); |
| return null; |
| } |
过程概述:
- 根据 key 计算出 hashcode
- 判断是否需要进行初始化
- 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功
- 如果当前位置的
hashcode == MOVED == -1
,则需要进行扩容
- 如果都不满足,则利用 synchronized 锁写入数据
- 如果数量大于
TREEIFY_THRESHOLD
则要转换为红黑树
get 操作
get 流程比较简单,直接来吧:
| public V get(Object key) { |
| Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; |
| |
| int h = spread(key.hashCode()); |
| if ((tab = table) != null && (n = tab.length) > 0 && |
| (e = tabAt(tab, (n - 1) & h)) != null) { |
| |
| if ((eh = e.hash) == h) { |
| if ((ek = e.key) == key || (ek != null && key.equals(ek))) |
| |
| return e.val; |
| } |
| else if (eh < 0) |
| |
| return (p = e.find(h, key)) != null ? p.val : null; |
| while ((e = e.next) != null) { |
| |
| if (e.hash == h && |
| ((ek = e.key) == key || (ek != null && key.equals(ek)))) |
| return e.val; |
| } |
| } |
| return null; |
| } |
总结一下 get 过程:
- 根据 hash 值计算位置
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value
- 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之
- 如果是链表,遍历查找之
总结
Java7 中 ConcurrentHashMap 使用的分段锁,也就是每一个 Segment 上同时只有一个线程可以操作,每一个 Segment 都是一个类似 HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。但是 Segment 的个数一但初始化就不能改变。
Java8 中的 ConcurrentHashMap 使用的 Synchronized 锁加 CAS 的机制。结构也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。