详解JDK锁02:AQS
详解JDK锁02:AQS
1. AQS简述
这一部分,我将从是什么、干什么、怎么用三个角度简单讲述一下AQS
1.1 是什么?
AQS全称为AbstractQueuedSynchronizer,中文名称为队列同步器。
拆分一下中文就可知,一定离不开 队列 与 同步 这两个概念,下面进一步讲解其作用。
1.2 干什么?
AQS是用来构建锁或者其他同步组件的基础框架。
学Java并发的话就重点关住于AQS是如何构建锁的,因为同步器是实现锁的关键!
AQS用一个
int
成员变量来表示同步状态。通过修改同步状态,以此达到获取锁与释放锁的目的。比如说一个线程获取到了锁,那么就相当于它此时获取到了同步状态。
一个线程执行完了它的任务,它去释放锁,就相当于释放同步状态。
AQS通过内置的
FIFO队列
完成线程的排队工作。这一点其实并不难理解。当一个线程获取锁失败之后,可以选择陷入阻塞状态,也可以进行非阻塞地自旋重试;当有多个线程独占式地去获取锁时,只有一个线程可以获取成功,其它均会失败。那么应当如何管理这些竞争失败的锁呢?这便是队列的作用。
- 线程获取锁失败时,便进行入队列操作,成为队列的尾结点,进入等待状态
1.3 怎么用?
AQS的实现方式是继承:子类通过继承同步器并实现它的抽象方法来管理同步状态。
AQS支持独占式地获取同步状态与共享式地获取同步状态
其实AQS的精髓就在于它简化了锁的实现方式,我们不需要关心同步状态管理、线程排队、等待与唤醒等底层操作逻辑,我们只需要将精力放在锁的核心功能:加锁与解锁。
可以这样理解:
- 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节
- AQS是面向锁的实现者,它定义了锁的实现者与同步器交互的接口,隐藏了实现细节
之后会写一个实战案例去用AQS实现一个锁。
2. AQS方法简述
下面三个方法是用于管理同步状态
getState()
:用于获取同步状态setState(int newState)
:用于设置同步状态compareAndSetState(int expect, int update)
:使用CAS设置同步状态,保证原子性
之前第一部分提到:AQS已经帮我们实现了队列的维护逻辑,我们实现锁时只需要重写获取锁的方法
tryAcquire(int arg)
:独占式地获取同步状态,返回值为布尔类型,true为获取成功,false为获取失败。tryRelease(int arg)
:独占式地释放同步状态,返回值为布尔类型,true为释放成功,false为释放失败。tryAcquireShared(int arg)
:共享式地获取同步状态,返回值为int类型。- 返回0表示成功,且没有剩余资源
- 返回大于0的值表示成功,仍有剩余资源
- 返回负数代表获取失败
tryReleaseShared(int arg)
:共享式地释放同步状态,返回值为布尔类型。- 如果释放后允许唤醒后续等待节点时,返回true;否则返回false
isHeldExclusively()
:当前同步器是否被线程独占
通过对于这些方法进行简单理解,便能初步体会到:
当同步状态state为0时,其他线程才有可能获取到同步状态,即获取到锁。
对于可重入锁,当线程独占锁之后,会将同步状态state进行自增。如果该线程一直重复地获取该锁,则state会一直累加;该线程去释放该锁时,必须将state自减到0,才算是完全释放成功。
3. AQS实战案例
通过使用AQS,简单地实现一个独占不可重入锁,也就是说该锁的state只有0与1两种状态。
重点关注继承自AQS的Sync内部类,这里面自定义了获取同步状态与释放同步状态的核心逻辑。
这个案例印证了这句话:AQS是面向锁的实现者,它定义了锁的实现者与同步器交互的接口,隐藏了实现细节
1 |
|
4. AQS源码详解
4.1 同步队列
之前提到,同步器通过同步队列实现了对于线程与同步状态的管理。
进一步解释:
- 当当前线程获取同步状态(锁)失败时,同步器会将当前线程构造为一个节点Node,并加入到同步队列中,阻塞当前线程
- 当同步状态(锁)被释放后,会将队列首节点线程唤醒,然后使该线程再次去尝试获取同步状态
如下图所示:head
指向当前队列的头节点,其已获取到同步状态(锁)。
获取同步状态失败的节点,会将其依次添加到队列中,tail
维护尾节点。
队列节点状态:
- CANCELLED(1):值为1,表示当前节点由于等待超时或被中断,需要取消等待,节点进入该状态后将不会再变化。
- SIGNAL(-1):值为-1,表示后继节点线程处于等待状态,等待当前节点唤醒。
- CONDITION(-2):值为-2,表示节点等待在Condition上,如果其他线程对Condition调用了signal方法后,CONDITION状态的节点会从等待队列转移到同步队列中。
- PROPAGATE(-3):值为-3,共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- INITIAL(0):值为0,代表初始状态
观察节点状态发现:负值表示处于有效等待状态,而正值表示节点已被取消。
4.2 独占式获取
核心方法为 acquire()
,如下所示
1 |
|
而该方法又由 tryAcquire
、 acquireQueued
与 addWaiter
三个方法组成。
这里先给出该方法的执行流程:
- 调用
tryAcquire
方法尝试去获取同步状态,其返回值为boolean类型:- 若返回True,代表获取同步状态成功。而其取反后,为False,该方法便到此为止了,直接返回
- 若返回False,代表获取同步状态失败,取反后为True,则需要进一步执行之后的方法
tryAcquire
返回False之后,继续执行addWaiter
方法,构造同步节点,并通过该方法,将构造好的Node节点以独占模式加入到队列的尾部acquireQueued
方法使得该节点以死循环方式获取同步状态,一直到获取到才返回- 如果在死循环的过程中被中断过,则返回True,否则返回False
- 如果线程节点在等待过程中被中断,是不会响应的。只有等到获取到同步状态后,才会执行
selfInterrupt
方法
接着,一起来看一看这三个核心方法的源码
tryAcquire
1
2
3protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}这个核心方法竟然只有一行代码,而且是抛出异常的代码。如果只看源码,确实很让人疑惑。
但是结合之前第一部分的内容以及实战案例:我们在实现一个锁时,需要在锁的内部创建一个内部类,并让其继承AQS,重写AQS中的核心方法。而这些核心方法中就包括
tryAcquire
。这个获取同步状态(锁)的逻辑需要我们自己实现。这也就进一步解释了:AQS简化了锁的实现方式,我们不需要关心同步状态管理、线程排队、等待与唤醒等底层操作逻辑,我们只需要将精力放在锁的核心功能加锁与解锁。
addWaiter
:直接看注释其中if块中执行的内容就是将当前节点添加队列尾部
注意需要使用
compareAndSetTail
方法来确保节点以线程安全的方式被添加到尾部。如果仅使用简单的LinkedList来处理节点关系,则会导致多个线程并发地被添加到链表中,造成节点数量和顺序混乱。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private Node addWaiter(Node mode) {
// 将当前线程构造为同步队列中的节点
Node node = new Node(Thread.currentThread(), mode);
// 获取尾结点,tail为AQS类的成员变量
Node pred = tail;
// 若尾节点不为空
if (pred != null) {
// 将当前节点的prev设为尾结点
node.prev = pred;
// 将当前节点设置为新的尾结点
if (compareAndSetTail(pred, node)) {
// 将旧的尾结点的next设为当前节点
pred.next = node;
return node;
}
}
// 若尾节点为空,则调用enq自旋进行入队列操作
enq(node);
return node;
}enq
:直接看注释1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private Node enq(final Node node) {
// 这是一种自旋方式。一直自旋,直到Node被添加到队尾
for (;;) {
// 获取当前队列的尾结点
Node t = tail;
if (t == null) {
// 如果tail为空,表示队列为空,需要新创建一个空节点作为head与tail节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果tail不为空,说明队列中存在线程节点,进行入队列操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}这里需要注意的是:当
addWaiter
中尾结点为空时,又调用了enq
方法以自旋方式添加节点,这样可以保证节点被添加成功
acquireQueued
:直接看注释1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33final boolean acquireQueued(final Node node, int arg) {
// failed表示是否获取同步状态(锁)失败
boolean failed = true;
try {
// interrupted表示在获取过程中是否被打断
boolean interrupted = false;
for (;;) {
// 通过node.predecessor()拿到其前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点head,代表当前节点可以尝试去获取同步状态(锁)
if (p == head && tryAcquire(arg)) {
// 如果获取成功,则当前节点成为新的头节点head
// setHead方法会将当前节点的prev设置为null
setHead(node);
// 将旧的头节点的next设置为null,有助于gc回收旧的head节点
p.next = null;
// 代表成功获取到同步状态(锁)
failed = false;
// 返回是否被中断
return interrupted;
}
// 如果当前线程节点可以休息,那就进入等待状态,直至被unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果等待过程被中断,则标记为true
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取到同步状态(被中断或超时),则将该线程节点设置为CANCELLED状态
if (failed)
cancelAcquire(node);
}
}shouldParkAfterFailedAcquire:该方法用于检查状态,判断当前线程节点的前驱节点是否仍然有效。看注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点状态有效,返回true,当前线程节点便可进入等待状态,等待唤醒
if (ws == Node.SIGNAL)
return true;
// 前驱节点状态大于0,说明无效。
if (ws > 0) {
// while循环继续寻找前驱节点之前的有效节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将当前线程节点排到有效节点的后面
pred.next = node;
} else {
// 如果前驱节点有效,将其状态以CAS方式设置为SIGNAL,以便于释放同步状态后通知当前线程节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,在acquireQueued方法内继续自旋检查状态
return false;
}parkAndCheckInterrupt:该方法用于进入等待状态
1
2
3
4private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
4.3 独占式释放
release方法用于释放同步状态,并且会唤醒当前线程节点的后继节点,使得后继节点尝试去获取同步状态。源码如下:
1 |
|
其中tryRelease的设计模式与tryAcquire一样,均需要我们自己去实现
unparkSuccessor:唤醒后继节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private void unparkSuccessor(Node node) {
// node为当前线程节点,获取其状态
int ws = node.waitStatus;
// 如果状态小于0,需要将其置为0(从有效到无效),该过程允许失败
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的后继节点,一般节点不为空且有效
Node s = node.next;
// 但如果该节点为空或无效(处于CANCELLED状态),则需要找到有效节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
4.4 共享式获取
先来看看共享式与独占式地获取同步状态的区别:
- 在同一时刻,共享式获取允许多个线程同时获取到同步状态,而独占式则只有一个线程可以获取到,其他线程均阻塞
共享式获取的一个经典应用便是Semaphore,其作用为控制同时访问某一特定资源的线程数量。
因此,在接下来的解释中,用 资源
代替 同步状态
。
调用acquireShared方法共享式地获取资源
1 |
|
其中tryAcquireShared需要我们自己去手动实现,但是返回结果类型已经被定义好了:
- 返回负数时,代表获取失败,剩余资源不足
- 返回非负数时,代表获取成功。如果为0,说明已没有剩余资源了;如果为正数,则代表还有剩余资源。
当为负数时,需要调用doAcquireShared方法,进入等待队列,直到获取到资源时才返回。
1 |
|
4.5 共享式释放
共享式释放会调用releaseShared方法,释放指定数量的资源。其中tryReleaseShared方法仍然需要我们自己去实现
1 |
|
其中doReleaseShared方法主要用于释放完资源后,去唤醒后续处于等待状态的各个节点
1 |
|
5. 写在后面
参考文献:
JDK5.0源码
《Java并发编程的艺术》
这个系列大概会有5篇左右的样子,我尽可能把自己对于JUC的理解通俗易懂地写出来
但如果有错误的地方,请大家指出来,我会及时去学习与改进~
如果大家觉得我的内容写的还不错,可以在评论区留言支持一下呀~
欢迎大家来逛一逛我的个人博客~
此外,从2022.10.1开始,为激励自己持续刷题,我开始持续日更Leetcode题解啦~
所有题解均已放到Github仓库中,欢迎大家Star并提出Issue~