ConcurrentHashMap 源码分析(1.8)

概述

ConcurrentHashMap 是一个加强版的 HashMap,它支持并发操作。比起 HashTable 直接锁住整个数组,ConcurrentHashMap 锁的粒度更细,锁的是单个数组元素,而且有些操作例如初始化,对数组元素赋值等使用的是 CAS 方法。并且在扩容的时候支持并发扩容,速度更快。

下面就来看下 ConcurrentHashMap 的源码,主要分析它的初始化、扩容、put、get 操作及相关延伸。

主要属性

1
2
3
4
5
6
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
transient volatile Node<K,V>[] table; // 比起 HashMap,新增 volatile 修饰

这些属性的含义和值和 HashMap(本文说的 HashMap 都是指 1.8 版本)的一样

下面是 ConcurrentHashMap 区别于 HashMap 的主要属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
* 用于控制初始化和扩容,它的值的含义:
* -1:表示正在初始化
* -(1+n):有 n 的线程在扩容
* 当 table 为 null 时,保存 table 初始化时的大小,默认为 0
* 初始化后,保存扩容阀值
*/
private transient volatile int sizeCtl;

/**
* 扩容时,下一个需要拆分的索引
*/
private transient volatile int transferIndex;

/**
* 用于扩容,只有在扩容时才不为空
*/
private transient volatile Node<K,V>[] nextTable;

static final int MOVED = -1; // ForwardingNode 的 hash 值

static final int HASH_BITS = 0x7fffffff; // 等于 Integer.MAX_VALUE

private static final long SIZECTL; // sizeCtl 的内存偏移量

static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU 的数量

主要内部类

Node

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

普通的节点类型,和 HashMap 不同的是,val 和 next 用 volatile 修饰

ForwardingNode

1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

}

ForwardingNode 在扩容时使用,它的 hash 值为 MOVED

构造方法

ConcurrentHashMap()

1
2
public ConcurrentHashMap() {
}

默认的构造方法什么也没做

ConcurrentHashMap(int)

1
2
3
4
5
6
7
8
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

该构造方法指定初始化容量,tableSizeFor 方法保证初始化容量是 2 的幂,同时将初始容量保存到 sizeCtl 中。

ConcurrentHashMap(int, float)

1
2
3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

指定初始化容量和加载因子,并发数为 1

ConcurrentHashMap(int, float, int)

1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

指定初始化容量、加载因子、并发数。根据初始化和加载因子确定初始容量,并保持在 sizeCtl

ConcurrentHashMap(Map<? extends K, ? extends V>)

1
2
3
4
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

根据已有集合创建并初始化 ConcurrentHashMap。

一些重要方法

Unsafe 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 原子操作:返回数组指定位置的元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// CAS 原子操作:数组指定位置设置元素 v,c 为之前的元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子操作:数组指定位置设置元素 v
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

initTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 有其他线程在初始化或者扩容时,线程挂起,初始化工作只能由一个线程进行
// 因为后面初始化时要将 sizeCtl 置为 -1,所以这样用 sc 保存 sizeCtl 的值,
if ((sc = sizeCtl) < 0)
Thread.yield();
// 利用 CAS 把 sizeCtl 置为 -1(要修改的值有第二个参数的内存偏移量确定),表示本线程正在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 确定初始容量,如果用户没有指定,就使用默认初始容量
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // 指定扩容阀值(加载因子为 0.75)
}
} finally {
sizeCtl = sc; // sizeCtl 更新为新的扩容阀值
}
break;
}
}
return tab;
}

该方法根据初始容量对数组进行初始化。利用 CAS 保证只能由一个线程来完成初始化工作。初始化完成后 sizeCtl 设置为扩容阀值(初始容量 * 0.75)。

transfer

进行扩容操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 记录每个 CPU 处理的数组元素个数,最少处理 16 个(也就是向前移动的跨度)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range

// 开始扩容时,初始化 nextTable,其长度为当前数组长度的两倍
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;

// 扩容时,某位置处理完毕后,就将头结点置为 fwd,表示正在扩容,该位置的元素已经被转移走
// 其他需要该位置元素的线程在看到该节点后,就可以来协助扩容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 判断当前要处理的位置是否要向前移动
boolean finishing = false; // 扩容结束的标志
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;

// i 向前移动(移动多少看情况),i 用于记录处理的位置
// 几个局部变量的含义:
// bound: 当 i 的值 >= bound 时,i 位置都由本线程处理
// nextIndex: 记录 transferIndex 的值,当 i < bound 时,确定 i 向前移动后新的值
// nextBound: 确定下一个 bound 的值
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}

// 处理结束,或者说是没有找到要处理的位置(0 ~ n-1)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 结束扩容,由最后一个处理的线程提交新数组并更新扩容阀值
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 有其他线程帮忙扩容,本线程提前结束,由其它线程收尾
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 数组位置为 null,用 fwd 标记扩容即可,不用做其他处理
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置已经有线程处理了,继续向前
else if ((fh = f.hash) == MOVED)
advance = true;
// 处理这个位置的元素
else {
synchronized (f) { // 锁住该位置
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 处理链表,方法和 HashMap 类似,不过处理后链表节点的顺序有变化
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将划分后的链表分别放在新数组的相应位置和相应位置加上旧数组长度
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 处理完后,旧数组相应位置的头节点设为 fwd,作为扩容的标记
setTabAt(tab, i, fwd);
// 继续向前,处理前面位置的节点
advance = true;
}
// 处理红黑树,和 HashMap 的处理方法类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

开始扩容时,先计算每个线程一次处理多少个位置,最少处理 16 个。

第一个进行扩容的线程需要初始化 nextTable,其长度是当前数组长度的两倍,扩容完成后它会作为新数组。

创建一个 ForwardingNode 节点,每处理完一个位置后,旧数组的头结点都会置为这个 ForwardingNode 节点,作为扩容的标志,其他线程要访问元素时,发现这个标记,就会来协助扩容,多线程并发扩容,可以提高扩容的速度。

在处理数组元素的时候,是从后往前处理的,一个线程处理完它要处理的数量后,如果还没有扩容完毕,就会继续向前移动,直到处理完所有位置。处理某个位置的元素时,处理方法和 HashMap 类似,也是根据节点 hash 值和旧数组容量的与运算值,划分为两部分,分别放在原位置和原位置后移旧数组容量的位置。在处理完后,需要把旧数组的头结点置为 ForwardingNode 节点,标记该处位置的元素已被移动。

put

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

和 HashMap 类似,直接调用 putVal

putVal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
  final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为 null
if (key == null || value == null) throw new NullPointerException();

int hash = spread(key.hashCode()); // 计算 key 的 hash 值,计算方式和 HashMap 类似
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f 为插入数组位置的头结点,n 为数组长度,i 为数组插入位置,fh 是 f 的 hash 值
Node<K,V> f; int n, i, fh;

if (tab == null || (n = tab.length) == 0) // 第一次插入时,初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 插入的数组位置为 null
// 通过 CAS 的方式在该位置放入新节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED) // 该位置的元素已被移动,说明正在扩容,去协助扩容
tab = helpTransfer(tab, f);
else { // 添加节点到该数组位置的链表或红黑树上
V oldVal = null;
synchronized (f) { // 插入过程对插入的数组位置加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash >= 0, 说明是链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 遍历链表
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 已存在相同节点,判断是否要更新 value
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 不存在相同节点,将新节点插入到链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 插入到红黑树中
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { // 检测节点个数
// 当链表节点达到树化阀值(8),将链表转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 更新相关容量,达到扩容阀值时进行扩容
return null;
}

第一次插入元素时,初始化数组,初始化时利用 CAS 进行同步。

如果插入的数组位置为空,通过 CAS 的方式在该位置放入新节点。

如果发现节点被移动,说明其他线程正在扩容,就去协助扩容。

如果要插入的数组位置不为空,就对该位置的数组元素加锁,然后节点类型,插入到链表或红黑树中。如果是插入的是链表,会遍历链表,发现相同节点就看下要不要更新 value,然后提取退出遍历,返回旧 value,否则就将新节点插入到链表的尾部。

插入完成后,更新容量,达到扩容阀值时调用 transfer 进行扩容。

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算 key 的 hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // 头结点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // 从红黑树查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 从链表查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

get 方法就比较简单,除了在得到相应位置头结点时用了 Unsafe 的原子操作,其他地方都没有加锁,支持并发查找。查找流程和 HashMap 一样,也是根据节点类型分情况查找。

参考

-------------    本文到此结束  感谢您的阅读    -------------
0%