引言

由于 HashMap 在多线程情况下存在线程安全的问题,因此在多线程情况下不会去选择使用这种结构,相应的我们会有几种方案去解决:

  • 采用线程安全的 HashTable
  • 使用 Collections.synchorizedMap(Map) 来创建线程安全的Map集合;
  • 使用 ConcurrentHashMap

但是由于效率和并发度的原因,往往会舍弃前两种选择第三种方式。

1.7 中的ConcurrentHashMap

1. 数据结构

img

JDK7 中 ConcurrentHashMap 由 Segment数组+HashEntry结点组成,和 HashMap一样,都是数组+链表的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//默认的数组大小16
static final int DEFAULT_INITIAL_CAPACITY = 16;

//扩容因子0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//ConcurrentHashMap中的 Segment 数组
final Segment<K,V>[] segments

//默认并发标准16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//Segment是ReentrantLock子类,因此拥有锁的操作
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table; // HashEntry数组
transient int count; // 结点个数
transient int threshold; // 扩容阈值
final float loadFactor; // 负载因子

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}

//HashEntry结点,和 HashMap中的 Entry结点一样
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value; //不同的是用 volatile进行修饰,保证内存的可见性
volatile HashEntry<K,V> next;
}
//segment中HashEntry[]数组最小长度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用于定位在segments数组中的位置,下面介绍
final int segmentMask;
final int segmentShift;

可以发现,ConcurrentHashMap 内部是由 Segment 数组组成,而 Segment数组又包含着一个 HashEntry 数组,其中 HashEntry 和 Entry结点类似,都是链表中的结点类型。不同的是 HashEntry 结点中的 value和 next 用 volatile 进行了修饰,这主要是为了保证在并发情况下内存的一致性。

问: volatile 的特性是啥?

  1. 保证内存可见性;即如果有多个线程对同一个共享变量进行修改,当前变量的最新值对其它线程来说是立即可见的。
  2. 防止指令重排,比如 int a = 6;这条命令就不是一个原子指令,它实际由3个步骤组成,①定义变量a;②给a赋值为6;在多线程情况下可能刚好执行完步骤①后,另一个线程对a的值进行了修改,这就会出现问题;

2. put 插入数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public V put(K key, V value) {
Segment<K,V> s;
// 这里解释了为什么 ConcurrentHashMap 不允许值为空
if (value == null)
throw new NullPointerException();

//根据key计算hash值,key也不能为null,否则hash(key)报空指针
int hash = hash(key);
// 根据hash值计算在segments数组中的位置
int j = (hash >>> segmentShift) & segmentMask;

//若为空,先创建初始化Segment再put值,不为空,直接put值。
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}



final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试去获取对应 Segment 的锁,获取失败会执行 scanAndLockForPut进行自旋获取
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;

try {
HashEntry<K,V>[] tab = table;

// 计算出在 table 中应该插入的位置
int index = (tab.length - 1) & hash;
//找到HashEntry[]中的指定位置的第一个节点
HashEntry<K,V> first = entryAt(tab, index);


for (HashEntry<K,V> e = first;;) {
//如果不为空,遍历这条链
if (e != null) {
K k;
//链中存在对应结点,用新值替换旧值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}

// 否则链表中没有对应结点,则创建新的结点进行插入
else {
if (node != null)
node.setNext(first);
else //该位置为空,则新建一个节点(注意这里采用链表头插入方式)
node = new HashEntry<K,V>(hash, key, value, first);
//键值对数量+1
int c = count + 1;
//如果键值对数量超过阈值
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//扩容
rehash(node);
else //未超过阈值,直接放在指定位置
setEntryAt(tab, index, node);
++modCount;
count = c;
//插入成功返回null
oldValue = null;
break;
}
}

} finally {
//解锁
unlock();
}
//修改成功,返回原值
return oldValue;
}

这里的 put操作和 HashMap中的 put 操作是类似的,只是在put前添加了上锁和解锁两个步骤,目的是为了保证同一时刻只有一个线程拥有修改的权限。

详细分析:

  • 先根据 key 去计算hash值来确定应该插入在 Segment 数组的哪个位置;
  • 尝试获取当前 Segment 对应的锁,获取成功继续插入,获取失败执行 scanAndLockForPut()进行自旋操作;
  • 如果成功获取锁,那么再次通过 hash和数组长度由插入位置 = hash & (length-1)确定在 HashEntry数组中应该插入的位置。定位到该该位置对应的链表头结点,遍历链表查找 key 值和当前key相同的结点,如果找到进行覆盖;如果没有找到,则首先判断是否需要扩容,扩容完毕后生成新结点进行插入。(注意扩容操作中使用的也是头插法);
  • 如果自旋获取锁达到了一定的上限仍未获得,就会改为阻塞式获取锁。

img

问:ConcurrentHashMap 并发度高的原因?

因为 ConcurrentHashMap 使用了分段锁技术,ConcurrentHashMap 中包含的有一个 Segment数组,而Segment类是继承自 ReentrantLock 类,因此在操作每一个 Segment 时能够进行上锁和解锁控制,那么在多线程情况下,对于同一个 Segment 某一个时刻只能有一个线程进行操作,不同的 Segment 可以由不同的线程同时操作互不影响。ConcurrentHashMap 所能支持的最大并发度等于Segment 数组的大小,也就是说如果容量是16,最大并发度就是16。

问:ConcurrentHashMap 的get操作是怎样的?

get操作和 put类似,先根据 key计算 hash值确定到具体应该插入到 Segment数组的哪个位置;然后再根据 hash值和 HashEntry数组的大小确定到具体的插入位置。

由于 HashEntry 结点中的 value和next 都是 volatile 修饰的,因此能够保证内存的可见性,每次获取的都是最新的值。get 操作的整个步骤不需要加锁。

问:ConcurrentHashMap 在 1.7中仍然存在的问题?

由于实现数据结构仍然是 数组+链表,如果hash冲突比较严重那么会导致链表长度过长,在遍历链表进行对应结点查找时效率就比较低。

所以在 JDK8中采用了 数组+链表+红黑树 的数据结构,当链表长度大于等于8时存储结构会转化为红黑树,这样就增大了查询的效率。

1.8中的ConcurrentHashMap

1. 数据结构

JDK8 中移除了 Segement数组,引入了 红黑树的概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//Node 结点依旧相同
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}

// 红黑树 TreeNode 结构
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}


// 最大容量 2^30
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
// 默认并发度
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 树化阈值
static final int TREEIFY_THRESHOLD = 8;
// 由红黑树转化为链表的结点个数阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 树化的最小结点数
static final int MIN_TREEIFY_CAPACITY = 64;

2. put 插入数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public V put(K key, V value) {
return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许插入键或值为空的值
if (key == null || value == null) throw new NullPointerException();

// 计算hash值,通过 hash ^ (hash>>>16),和HashMap一样
int hash = spread(key.hashCode());
// 用来统计链表长度,判断是否变换为红黑树
int binCount = 0;

for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;

// 数组为空,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();

// 数组中不存在对应结点,生成新结点尝试使用CAS进行插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS插入,插入失败会一直进行自旋插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}

// 判断是否需要进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

// 否则出现 hash冲突,根据当前的数据结构执行不同的插入方法
else {
V oldVal = null;

// 这里是对数组单个位置进行上锁,JDK7中是对Segment进行上锁
synchronized (f) {
if (tabAt(tab, i) == f) {

// 结点是链表的情况
if (fh >= 0) {
binCount = 1;
// 循环遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果找到链表中对应位置的结点,进行覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 否则在链表尾部插入新结点
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}

// 结构是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

// 判断是否达到了树化阈值
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 初始化数组,使用 sizeCtl来控制并发安全
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {

// sizeCtl小于0时当前线程让出执行权
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin

// 否则CAS操作将 sizeCtl修改为-1,然后进行数组初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

ConcurrentHashMap在进行 put 操作时过程如下:

  • 插入的 key或者value都不允许为空,然后根据 key计算hash值: h^(h>>>16) & HASH_BITS
  • 判断是否需要进行初始化,如果要初始化通过 sizeCtl来控制并发,sizeCtl默认为0,当 sizeCtl小于0时当前线程让出执行权Thread.yield();否则通过CAS操作来将 sizeCtl修改为-1,然后进行数组初始化,初始化完毕后改为0;
  • 根据 hash值和数组长度计算出应该插入的位置:hash & (length-1),如果为空表示当前位置可以插入数据,利用CAS进行写入,失败则自旋保证成功;
  • 否则说明当前存在 hash冲突,在解决冲突前需要判断数组是否需要扩容;
  • 扩容完毕后利用 synchorized锁 写入数据,这里锁住的是数组中那个位置对应的头结点;
  • 根据 binCount判断是否需要转换为红黑树的结构。

3. get 查找操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 根据key计算hash值
int h = spread(key.hashCode());

// 如果能够在table数组中定位到并且不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {

// hash值相等且key相等直接返回(链表头)
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}

// 否则hash值小于0为红黑树的结构,按照红黑树搜索
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;

// 否则剩下的就是链表结构,按照链表结构查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  • 首先会根据传入的 key 计算hash值: h ^ (h>>>16)& HASH_BITS
  • 根据hash值和数组长度计算实际的存储位置,如果该位置有值而且key相等直接返回该处的值;
  • 否则判断hash值是否小于0,小于0即为红黑树结构,那么在红黑树中查找;
  • 否则为链表结构,遍历链表进行查找。

面试问题

(1)Collections.SynchronizedMap 是怎么实现线程安全的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//SynchronizedMap 类,注意是 Collections 类下的一个静态内部类
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;

// 内部维护了一个 Map集合
private final Map<K,V> m; // Backing Map

// 内部维护了一个mutex互斥锁
final Object mutex; // Object on which to synchronize

// 构造方法一,只传入Map
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this; //这里锁住当前对象而不是类
}

// 构造方法二,传入Map和互斥锁变量
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}

首先 SynchronizedMap 是位于 Collections 类下面的一个静态内部类,这个类中包含着 一个Map对象、一个互斥锁参数mutex

构造方法有两种,如果只传入Map对象,那么默认的互斥锁参数就是 this,也就是调用 SynchronizedMap的对象,就是Map。

创建完 SynchronizedMap 之后,剩余操作全部都在 synchronized 修饰的代码块中进行,因此对于同一个Map对象而言保证了多线程环境下的安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}

(2)HashTable 为什么效率低?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// HashTable 的 get方法
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}

// HashTable 的containsKey方法
public synchronized boolean containsKey(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return true;
}
}
return false;
}

HashTable 对它内部的所有方法都加上了锁,即使是一些读取操作(仅仅读取操作在多线程情况下是不会存在线程安全问题的),因此效率会很低。

(3)说一下 HashTable和HashMap的区别?

  1. 键的范围不同:HashMap 允许键为空值,HashTable 和 ConcurrentHashMap 不允许键为空值。
  2. 实现方式不同:HashMap 继承自 AbstractMap类,而 HashTable继承自 Dictionary类;
  3. 容量机制不同:HashMap初始容量为16,HashTable初始容量为11;两者的负载因子都为0.75。HashMap扩容时是将容量扩大为原来的2倍;而HashTable是将容量扩大为原来的2倍+1.
image-20210420161225536
  1. 迭代器不同:HashMap中的迭代器Iterator是 fali-fast类型的;HashTable 中有两个迭代器(IteratorEnumeration)其中Iteratorfail-fast类型的,而Enumerationfail-safe类型的。所以当在迭代遍历过程中如果有其它的线程修改了集合的结果(比如增加、修改、删除),会抛出 ConcurrentModificationException 异常。类似的机制还体现在 ArrayList中,它和 HashMap 都是通过 modCount这个变量的前后对比来保证多线程情况下遍历时数据的统一和安全。

    参考:https://blog.csdn.net/baidu_41267789/article/details/113103905

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//比如这个是 ArrayList 的迭代器的遍历方法
public E next() {
checkForComodification(); // 检查多线程下是否对集合进行了修改
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}

//如何检查
final void checkForComodification() {
if (modCount != expectedModCount) //查看modCount和预期值是否一致
throw new ConcurrentModificationException();
}

(4)fail-fast机制是什么?

快速失败是 Java 集合中的一种机制,在用迭代器修改一个集合对象时,如果遍历过程中对集合的内容进行了修改(增删改),那么就会抛出 ConcurrentModificationException异常。

原理:

​ 在使用迭代器进行集合访问时会维护一个modCount变量,当集合发生修改时就会修改modCount

​ 当迭代器使用 hasNext、next来访问下一个元素时,会首先检查 modCount和预期值是否一样,不一样说明遍历时对集合进行了修改,那么会抛出 ConcurrentModificationException异常。

注意:

​ 这个异常只能用于并发修改的 bug,但是不能依赖于这个异常进行并发操作编程。

补充:

​ 在 java.util 包下的集合类都是快速失败的,不能在多线程情况下迭代时进行修改;

     还有另外一个机制`fail-safe`(fail-safe),java.concurrent 包下的容器都是 `fail-safe`类型的,可以并发修改。

(5)HashTable 为什么不允许空键或值?

参考:https://juejin.cn/post/6844904023363960839

https://blog.csdn.net/qq_40645822/article/details/102538774

HashMap 源码分析:

1
2
3
4
5
6
//JDK8 的HashMap的哈希函数操作
static final int hash(Object key) {
int h;
//如果键为空,则哈希值为0,会将值存储到哈希表的第一位中
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

HashMap 中如果键为空,则直接返回0;否则通过 key.hashcode() ^ (key.hashcode()>>>16)进行计算

HashTable 源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public synchronized V put(K key, V value) {

//这里值如果为空直接报空指针错误
if (value == null) {
throw new NullPointerException();
}

// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}

addEntry(hash, key, value, index);
return null;
}

hashTable 中先判断值是否为空,为空直接返回空指针异常;

如果键为空,那么在计算 key.hashcode()时同样会报空指针异常。

这样设计是因为 Hashtable 可以用在多线程情况下。而多线程时如果允许 value为空,那么对于一个特定的key,它可能存在于表中但是值为空,也可能不存在于表中此时返回的值也为空,这样就容易造成模糊判断。

如果先进行 contains() 判断再获取行不行呢?

1
2
3
4
5
if (map.contains(key)) {
return map.get(key);
} else {
throw new KeyNotFoundException;
}

也不可以,因为假设线程t1调用contains方法并找到密钥,它假定密钥存在并准备好返回值,无论它是否为null。在调用map.get之前,另一个线程t2从地图中删除该键。现在t1恢复并返回null。但是根据代码,t1的正确答案是KeyNotFoundException,因为该键已被删除。但它仍然返回null,因此预期的行为被打破。

而 HashMap由于是在单线程环境下,在通过 map.contains(key)获取到密钥后不会存在其它线程去删除这个键,那么就能够保证结果的准确性。

(6)CAS性能很高,但是为什么 synchronized 在JDK8 之后反而用的更多了?

因为 synchronized在之前一直都是重量级锁,在JDK8 之后 synchronized 采用了锁升级的方式。

先使用偏向锁优化同一线程获取锁;如果这个过程中出现锁竞争而且竞争失败就会升级为轻量级锁,轻量级锁

只能用于解决并发度小的情况,当并发度大锁竞争比较严重时如果轻量级锁获取失败会先进行自旋操作,自旋

到一定的次数就必须升级为重量级锁了。

image-20210421204000145

参考文章:

https://juejin.cn/post/6844903520957644808

https://juejin.cn/post/6844904023003250701

https://blog.csdn.net/u010723709/article/details/48007881