引言
由于 HashMap 在多线程情况下存在线程安全的问题,因此在多线程情况下不会去选择使用这种结构,相应的我们会有几种方案去解决:
- 采用线程安全的 HashTable
- 使用 Collections.synchorizedMap(Map) 来创建线程安全的Map集合;
- 使用 ConcurrentHashMap
但是由于效率和并发度的原因,往往会舍弃前两种选择第三种方式。
1.7 中的ConcurrentHashMap
1. 数据结构

JDK7 中 ConcurrentHashMap 由 Segment数组
+HashEntry结点组成
,和 HashMap一样,都是数组+链表的结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
final Segment<K,V>[] segments
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; transient int threshold; final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } } static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
final int segmentMask; final int segmentShift;
|
可以发现,ConcurrentHashMap 内部是由 Segment 数组组成,而 Segment数组又包含着一个 HashEntry 数组,其中 HashEntry 和 Entry结点类似,都是链表中的结点类型。不同的是 HashEntry 结点中的 value和 next 用 volatile
进行了修饰,这主要是为了保证在并发情况下内存的一致性。
问: volatile 的特性是啥?
- 保证内存可见性;即如果有多个线程对同一个共享变量进行修改,当前变量的最新值对其它线程来说是立即可见的。
- 防止指令重排,比如
int a = 6;
这条命令就不是一个原子指令,它实际由3个步骤组成,①定义变量a;②给a赋值为6;在多线程情况下可能刚好执行完步骤①后,另一个线程对a的值进行了修改,这就会出现问题;
2. put 插入数据操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); return s.put(key, hash, value, false); }
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } }
} finally { unlock(); } return oldValue; }
|
这里的 put操作和 HashMap中的 put 操作是类似的,只是在put前添加了上锁和解锁两个步骤,目的是为了保证同一时刻只有一个线程拥有修改的权限。
详细分析:
- 先根据 key 去计算hash值来确定应该插入在 Segment 数组的哪个位置;
- 尝试获取当前 Segment 对应的锁,获取成功继续插入,获取失败执行
scanAndLockForPut()
进行自旋操作;
- 如果成功获取锁,那么再次通过 hash和数组长度由
插入位置 = hash & (length-1)
确定在 HashEntry数组中应该插入的位置。定位到该该位置对应的链表头结点,遍历链表查找 key 值和当前key相同的结点,如果找到进行覆盖;如果没有找到,则首先判断是否需要扩容,扩容完毕后生成新结点进行插入。(注意扩容操作中使用的也是头插法);
- 如果自旋获取锁达到了一定的上限仍未获得,就会改为阻塞式获取锁。

问:ConcurrentHashMap 并发度高的原因?
因为 ConcurrentHashMap 使用了分段锁技术,ConcurrentHashMap 中包含的有一个 Segment数组,而Segment类是继承自 ReentrantLock 类,因此在操作每一个 Segment 时能够进行上锁和解锁控制,那么在多线程情况下,对于同一个 Segment 某一个时刻只能有一个线程进行操作,不同的 Segment 可以由不同的线程同时操作互不影响。ConcurrentHashMap 所能支持的最大并发度等于Segment 数组的大小,也就是说如果容量是16,最大并发度就是16。
问:ConcurrentHashMap 的get操作是怎样的?
get操作和 put类似,先根据 key计算 hash值确定到具体应该插入到 Segment数组的哪个位置;然后再根据 hash值和 HashEntry数组的大小确定到具体的插入位置。
由于 HashEntry 结点中的 value和next 都是 volatile 修饰的,因此能够保证内存的可见性,每次获取的都是最新的值。get 操作的整个步骤不需要加锁。
问:ConcurrentHashMap 在 1.7中仍然存在的问题?
由于实现数据结构仍然是 数组+链表,如果hash冲突比较严重那么会导致链表长度过长,在遍历链表进行对应结点查找时效率就比较低。
所以在 JDK8中采用了 数组+链表+红黑树 的数据结构,当链表长度大于等于8时存储结构会转化为红黑树,这样就增大了查询的效率。
1.8中的ConcurrentHashMap
1. 数据结构
JDK8 中移除了 Segement数组,引入了 红黑树的概念
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } }
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } } private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final float LOAD_FACTOR = 0.75f; static final int TREEIFY_THRESHOLD = 8; static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64;
|
2. put 插入数据操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
|
ConcurrentHashMap在进行 put 操作时过程如下:
- 插入的 key或者value都不允许为空,然后根据 key计算hash值:
h^(h>>>16) & HASH_BITS
;
- 判断是否需要进行初始化,如果要初始化通过
sizeCtl
来控制并发,sizeCtl
默认为0,当 sizeCtl
小于0时当前线程让出执行权Thread.yield()
;否则通过CAS操作来将 sizeCtl
修改为-1,然后进行数组初始化,初始化完毕后改为0;
- 根据 hash值和数组长度计算出应该插入的位置:
hash & (length-1)
,如果为空表示当前位置可以插入数据,利用CAS进行写入,失败则自旋保证成功;
- 否则说明当前存在 hash冲突,在解决冲突前需要判断数组是否需要扩容;
- 扩容完毕后利用 synchorized锁 写入数据,这里锁住的是数组中那个位置对应的头结点;
- 根据
binCount
判断是否需要转换为红黑树的结构。
3. get 查找操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
|
- 首先会根据传入的 key 计算hash值:
h ^ (h>>>16)& HASH_BITS
;
- 根据hash值和数组长度计算实际的存储位置,如果该位置有值而且key相等直接返回该处的值;
- 否则判断hash值是否小于0,小于0即为红黑树结构,那么在红黑树中查找;
- 否则为链表结构,遍历链表进行查找。
面试问题
(1)Collections.SynchronizedMap 是怎么实现线程安全的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; final Object mutex;
SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; }
SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; }
|
首先 SynchronizedMap 是位于 Collections 类下面的一个静态内部类,这个类中包含着 一个Map对象、一个互斥锁参数mutex
。
构造方法有两种,如果只传入Map对象,那么默认的互斥锁参数就是 this,也就是调用 SynchronizedMap的对象,就是Map。
创建完 SynchronizedMap 之后,剩余操作全部都在 synchronized 修饰的代码块中进行,因此对于同一个Map对象而言保证了多线程环境下的安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} }
public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }
|
(2)HashTable 为什么效率低?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public synchronized V get(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; }
public synchronized boolean containsKey(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return true; } } return false; }
|
HashTable 对它内部的所有方法都加上了锁,即使是一些读取操作(仅仅读取操作在多线程情况下是不会存在线程安全问题的),因此效率会很低。
(3)说一下 HashTable和HashMap的区别?
- 键的范围不同:HashMap 允许键为空值,HashTable 和 ConcurrentHashMap 不允许键为空值。
- 实现方式不同:HashMap 继承自 AbstractMap类,而 HashTable继承自 Dictionary类;
- 容量机制不同:HashMap初始容量为16,HashTable初始容量为11;两者的负载因子都为0.75。HashMap扩容时是将容量扩大为原来的2倍;而HashTable是将容量扩大为原来的2倍+1.
迭代器不同:HashMap中的迭代器Iterator是 fali-fast
类型的;HashTable 中有两个迭代器(Iterator
和 Enumeration
)其中Iterator
是fail-fast
类型的,而Enumeration
是fail-safe
类型的。所以当在迭代遍历过程中如果有其它的线程修改了集合的结果(比如增加、修改、删除),会抛出 ConcurrentModificationException
异常。类似的机制还体现在 ArrayList中,它和 HashMap 都是通过 modCount
这个变量的前后对比来保证多线程情况下遍历时数据的统一和安全。
参考:https://blog.csdn.net/baidu_41267789/article/details/113103905
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public E next() { checkForComodification(); int i = cursor; if (i >= size) throw new NoSuchElementException(); Object[] elementData = ArrayList.this.elementData; if (i >= elementData.length) throw new ConcurrentModificationException(); cursor = i + 1; return (E) elementData[lastRet = i]; }
final void checkForComodification() { if (modCount != expectedModCount) throw new ConcurrentModificationException(); }
|
(4)fail-fast机制是什么?
快速失败是 Java 集合中的一种机制,在用迭代器修改一个集合对象时,如果遍历过程中对集合的内容进行了修改(增删改),那么就会抛出 ConcurrentModificationException
异常。
原理:
在使用迭代器进行集合访问时会维护一个modCount
变量,当集合发生修改时就会修改modCount
。
当迭代器使用 hasNext、next
来访问下一个元素时,会首先检查 modCount
和预期值是否一样,不一样说明遍历时对集合进行了修改,那么会抛出 ConcurrentModificationException
异常。
注意:
这个异常只能用于并发修改的 bug,但是不能依赖于这个异常进行并发操作编程。
补充:
在 java.util 包下的集合类都是快速失败的,不能在多线程情况下迭代时进行修改;
还有另外一个机制`fail-safe`(fail-safe),java.concurrent 包下的容器都是 `fail-safe`类型的,可以并发修改。
(5)HashTable 为什么不允许空键或值?
参考:https://juejin.cn/post/6844904023363960839
https://blog.csdn.net/qq_40645822/article/details/102538774
HashMap 源码分析:
1 2 3 4 5 6
| static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
|
HashMap 中如果键为空,则直接返回0;否则通过 key.hashcode() ^ (key.hashcode()>>>16)
进行计算
HashTable 源码分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public synchronized V put(K key, V value) { if (value == null) { throw new NullPointerException(); }
Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") Entry<K,V> entry = (Entry<K,V>)tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } }
addEntry(hash, key, value, index); return null; }
|
hashTable 中先判断值是否为空,为空直接返回空指针异常;
如果键为空,那么在计算 key.hashcode()
时同样会报空指针异常。
这样设计是因为 Hashtable 可以用在多线程情况下。而多线程时如果允许 value为空,那么对于一个特定的key,它可能存在于表中但是值为空,也可能不存在于表中此时返回的值也为空,这样就容易造成模糊判断。
如果先进行 contains() 判断再获取行不行呢?
1 2 3 4 5
| if (map.contains(key)) { return map.get(key); } else { throw new KeyNotFoundException; }
|
也不可以,因为假设线程t1调用contains方法并找到密钥,它假定密钥存在并准备好返回值,无论它是否为null。在调用map.get之前,另一个线程t2从地图中删除该键。现在t1恢复并返回null。但是根据代码,t1的正确答案是KeyNotFoundException,因为该键已被删除。但它仍然返回null,因此预期的行为被打破。
而 HashMap由于是在单线程环境下,在通过 map.contains(key)
获取到密钥后不会存在其它线程去删除这个键,那么就能够保证结果的准确性。
(6)CAS性能很高,但是为什么 synchronized 在JDK8 之后反而用的更多了?
因为 synchronized在之前一直都是重量级锁,在JDK8 之后 synchronized 采用了锁升级的方式。
先使用偏向锁优化同一线程获取锁;如果这个过程中出现锁竞争而且竞争失败就会升级为轻量级锁,轻量级锁
只能用于解决并发度小的情况,当并发度大锁竞争比较严重时如果轻量级锁获取失败会先进行自旋操作,自旋
到一定的次数就必须升级为重量级锁了。

参考文章:
https://juejin.cn/post/6844903520957644808
https://juejin.cn/post/6844904023003250701
https://blog.csdn.net/u010723709/article/details/48007881