ReentrantLock 源码分析

前言

本文主要分析 ReentrantLock 是如何实现非公平锁和公平锁的

源码版本为 jdk1.8

构造方法

ReentrantLock 通过在构造方法里传入 boolean 参数来决定锁是否是公平的。

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

sync 是一个 Sync 类型的成员变量,Sync 是 ReentrantLock 的抽象静态内部类

1
2
3
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {}

Sync 的两个实现类 NonfairSync 和 FairSync 分别对应非公平锁和公平锁。

AQS(AbstractQueuedSynchronizer)相关知识

Sync 继承自 AbstractQueuedSynchronizer(以下简称 AQS),AQS 通过内部的同步队列进行管理,向实现者提供了线程阻塞和唤醒的具体方法。

与 ReentrantLock 相关的变量

1
private volatile int state;

在 ReentrantLock 中,state 为 0 表示该锁不被任何线程持有,state 为 1 表示该锁被线程持有(未重入),state 大于 1 表示该锁被线程持有并重入 state 次。state 会被并发访问,用 volatile + cas 的方式保证线程安全。

同步等待队列

AQS 同步等待队列的实现是一个带头尾指针的双向链表

1
2
3
4
5
// 指向队头节点
private transient volatile Node head;

// 指向队尾节点
private transient volatile Node tail;

Node 内部类是链表节点的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class Node {
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

// 等待状态
volatile int waitStatus;

// 指向前一节点
volatile Node prev;
// 指向后一节点
volatile Node next;
// 当前节点代表的线程
volatile Thread thread;

Node nextWaiter;

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
}

主要看下 waitStatus 变量,对于 ReentrantLock,主要有这几种状态:

  • 0:初始化状态
  • -1(SIGNAL):当前节点表示的线程在释放锁后需要唤醒后续节点的线程
  • 1(CANCELLED):等待超时或被中断,取消继续等待

相关方法

compareAndSetHead

通过 CAS 的方式设置队头节点 head

1
2
3
4
5
6
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

compareAndSetTail

通过 CAS 的方式设置队尾节点 tail

1
2
3
4
5
6
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

compareAndSetWaitStatus

通过 CAS 的方式设置等待状态 waitStatus

1
2
3
4
5
6
7
8
9
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}

lock

lock 方法进行加锁:

1
2
3
public void lock() {
sync.lock();
}

调用 sync 的 lock 方法,先看非公平锁的 lock 过程:

NonfairSync#lock

1
2
3
4
5
6
7
8
9
final void lock() {
// 通过 cas 的方式将 state 由 0 置为 1
if (compareAndSetState(0, 1))
// 将锁的持有者设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
// 获取锁失败后执行 acquire 方法
else
acquire(1);
}

当 state 为 0 时,说明当前没有线程持有锁,通过 cas 的方式将 state 置为 1,并将锁的持有者设置为当前线程。获取锁失败时(state >= 1),调用 AQS 的 acquire 方法:

AbstractQueuedSynchronizer#acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

AQS 的 tryAcquire 方法是个空实现,具体实现在 NonfairSync 和 FairSync 中,这里看 NonfairSync 的实现:

NonfairSync#acquire

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

继续调用 Sync 的 nonfairTryAcquire 方法:

Sync#nonfairTryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获得 state 的值
// 再次尝试获得锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程持有锁,进行重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 更新 state 的值(重入时 state + 1)
return true;
}
return false;
}

主要判断当前持有锁的线程,如果是当前线程持有锁,那么可以再次获得锁(可重入)

回到 AQS 的 acquire 方法:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

当 tryAcquire 返回 false 时(当前锁被其他线程持有),继续执行 acquireQueued 方法,先看里面的 addWaiter 方法

AbstractQueuedSynchronizer#addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  private Node addWaiter(Node mode) {
// 创建新节点
Node node = new Node(Thread.currentThread(), mode);
// 得到队尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通过 CAS 的方式更新队尾节点
if (compareAndSetTail(pred, node)) {
// 更新成功后返回
pred.next = node;
return node;
}
}
// 如果更新队尾节点失败(当前没有队尾节点或者其他线程成为队尾节点),继续执行 enq 方法
enq(node);
return node;
}

继续看 enq 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 当前链表为空,设置一个新的节点作为队头和队尾节点,代表当前正在获得锁的线程
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将当前节点设置为新的队尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

小结一下,addWaiter 方法主要做了这些事:创建表示当前线程的节点,当前链表为空时,需要先创建一个新节点作为队头和队尾节点,代表当前正在获得锁的线程,然后当前线程的节点成为新的队尾节点,通过 CAS 加失败重试的方式设置队头和队尾节点,保证了入队过程的线程安全

回到 AQS 的 acquire 方法:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

完成 addWaiter 后,继续看 acquireQueued 方法:

AbstractQueuedSynchronizer#acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获得当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头结点,尝试获得锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否需要阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

得到新节点的前驱节点,如果前驱节点为头结点,就尝试获得锁。如果成功获取到锁,那么就将当前节点设置为新的头结点。如果没有获取到,就会判断是否需要阻塞当前线程,该过程调用 shouldParkAfterFailedAcquire 方法:

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 上一节点的等待状态
if (ws == Node.SIGNAL) // 状态为 SIGNAL
return true;
if (ws > 0) { // 状态为 CANCELLED
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 状态为 0 或 PROPAGATE(ReentrantLock 的节点在初始时状态是 0)
// 将上一节点的等待状态设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

判断前驱节点的等待状态,ReentrantLock 的节点在初始时等待状态为 0,这时将等待状态设置为 SIGNAL,不会阻塞当前线程。如果等待状态为 SIGNAL,这时就会阻塞当前线程,调用 parkAndCheckInterrupt 方法进行阻塞:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞当前线程
return Thread.interrupted();
}

FairSync#lock

看完非公平锁的加锁操作后,再来看下公平锁的加锁操作,从 FairSync 的 lock 方法开始:

1
2
3
final void lock() {
acquire(1);
}

直接进入 AQS 的 acquire 方法:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

继续看 FairSync 的 tryAcquire 方法:

FairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 比起非公平锁,在通过 CAS 获取锁前,先调用 hasQueuedPredecessors 方法
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

比起非公平锁,在通过 CAS 获取锁前,先调用了 AQS 的 hasQueuedPredecessors 方法:

1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

该方法判断等待队列中是否有优先级比当前线程更高的等待线程,只要等待队列有第二个节点且第二个节点不是当前线程,就说明等待队列中有更高优先级的线程,当前线程不能抢夺锁,需要先进入等待队列进行等待

unlock

通过 unlock 方法释放锁:

1
2
3
public void unlock() {
sync.release(1);
}

调用 AQS 的 release 方法:

AbstractQueuedSynchronizer#release

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

AQS 的 tryRelease 是一个空方法,Sync 类重写了这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

在这个方法里,state 减一,如果 state 等于 0,说明当前线程不再持有锁,返回 true。这时如果等待队列不为空,并且头结点的等待状态不为 0,也就是说等待队列里存在阻塞的线程时,就调用 unparkSuccessor 方法唤醒阻塞线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
// 头结点的等待状态设置回初始状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 找到距离头结点最近的正常节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到后唤醒该节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}

找到距离头结点最近的正常节点并唤醒其对应的线程

小结

非公平锁的实现

ReentrantLock 使用同步等待队列来存储等待的线程,由于队列先进先出的特点,先加入等待的线程将会更早地被唤醒,如果严格按照队列的顺序获得锁的话,是公平的。但是在非公平锁的实现里,在进入队列之前,还有两次机会可以获得锁,第一次是在检查是否重入前,第二次是在检查重入的时候。所以假设一个线程执行完毕释放锁后,唤醒等待队列中的下一个线程,然而这时有一个新的线程想要获得锁,并且这个新线程在进入队列前就获得了锁,那么从等待队列唤醒的线程就会获取失败而继续阻塞下去。这样的话显然就不公平了,新来的线程比在等待队列等待了很久的线程更快获得锁

公平锁的实现

和非公平锁不同,使用公平锁时,即使当前没有线程拥有锁,在获取锁之前都要先判断一下同步等待队列里是否有优先级更高的线程,只要等待队列有第二个节点且第二个节点对应的不是当前线程,就说明等待队列中有更高优先级的线程,这时就不能获取锁,只能先加入等待队列,等队列前面的线程都执行完后,再执行当前线程,这样就保证了公平性

为什么非公平锁性能好

非公平锁对锁的竞争是抢占式的,在进入同步等待队列前就有两次机会可以直接获得锁,这样就省去了构造节点以及加入队列的同步开销。在高并发的情况下,如果线程持有锁的时间非常短,短过线程入队的过程,那么这种方式对性能的提升就会非常明显

参考

-------------    本文到此结束  感谢您的阅读    -------------
0%