详解JDK锁02:AQS

详解JDK锁02:AQS

1. AQS简述

这一部分,我将从是什么、干什么、怎么用三个角度简单讲述一下AQS

1.1 是什么?

AQS全称为AbstractQueuedSynchronizer,中文名称为队列同步器。

拆分一下中文就可知,一定离不开 队列同步 这两个概念,下面进一步讲解其作用。

1.2 干什么?

AQS是用来构建锁或者其他同步组件的基础框架。

学Java并发的话就重点关住于AQS是如何构建锁的,因为同步器是实现锁的关键!

  1. AQS用一个 int 成员变量来表示同步状态。通过修改同步状态,以此达到获取锁与释放锁的目的。

    比如说一个线程获取到了锁,那么就相当于它此时获取到了同步状态。

    一个线程执行完了它的任务,它去释放锁,就相当于释放同步状态。

  2. AQS通过内置的 FIFO队列 完成线程的排队工作。

    这一点其实并不难理解。当一个线程获取锁失败之后,可以选择陷入阻塞状态,也可以进行非阻塞地自旋重试;当有多个线程独占式地去获取锁时,只有一个线程可以获取成功,其它均会失败。那么应当如何管理这些竞争失败的锁呢?这便是队列的作用。

    • 线程获取锁失败时,便进行入队列操作,成为队列的尾结点,进入等待状态

1.3 怎么用?

AQS的实现方式是继承:子类通过继承同步器并实现它的抽象方法来管理同步状态。

AQS支持独占式地获取同步状态与共享式地获取同步状态

其实AQS的精髓就在于它简化了锁的实现方式,我们不需要关心同步状态管理、线程排队、等待与唤醒等底层操作逻辑,我们只需要将精力放在锁的核心功能:加锁与解锁。

可以这样理解:

  • 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节
  • AQS是面向锁的实现者,它定义了锁的实现者与同步器交互的接口,隐藏了实现细节

之后会写一个实战案例去用AQS实现一个锁。

2. AQS方法简述

下面三个方法是用于管理同步状态

  1. getState():用于获取同步状态
  2. setState(int newState):用于设置同步状态
  3. compareAndSetState(int expect, int update):使用CAS设置同步状态,保证原子性

之前第一部分提到:AQS已经帮我们实现了队列的维护逻辑,我们实现锁时只需要重写获取锁的方法

  1. tryAcquire(int arg):独占式地获取同步状态,返回值为布尔类型,true为获取成功,false为获取失败。
  2. tryRelease(int arg):独占式地释放同步状态,返回值为布尔类型,true为释放成功,false为释放失败。
  3. tryAcquireShared(int arg):共享式地获取同步状态,返回值为int类型。
    • 返回0表示成功,且没有剩余资源
    • 返回大于0的值表示成功,仍有剩余资源
    • 返回负数代表获取失败
  4. tryReleaseShared(int arg):共享式地释放同步状态,返回值为布尔类型。
    • 如果释放后允许唤醒后续等待节点时,返回true;否则返回false
  5. isHeldExclusively():当前同步器是否被线程独占

通过对于这些方法进行简单理解,便能初步体会到:

  • 当同步状态state为0时,其他线程才有可能获取到同步状态,即获取到锁。

  • 对于可重入锁,当线程独占锁之后,会将同步状态state进行自增。如果该线程一直重复地获取该锁,则state会一直累加;该线程去释放该锁时,必须将state自减到0,才算是完全释放成功。

3. AQS实战案例

通过使用AQS,简单地实现一个独占不可重入锁,也就是说该锁的state只有0与1两种状态。

重点关注继承自AQS的Sync内部类,这里面自定义了获取同步状态与释放同步状态的核心逻辑。

这个案例印证了这句话:AQS是面向锁的实现者,它定义了锁的实现者与同步器交互的接口,隐藏了实现细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ExclusiveLock implements Lock {

private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 当通过CAS设置state为1时,代表加锁成功
if (compareAndSetState(0 ,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
// 释放锁时如果发现该锁已被释放,说明有异常
if (getState() == 0) throw new IllegalArgumentException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 当state==1时表示处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

4. AQS源码详解

4.1 同步队列

之前提到,同步器通过同步队列实现了对于线程与同步状态的管理。

进一步解释:

  1. 当当前线程获取同步状态(锁)失败时,同步器会将当前线程构造为一个节点Node,并加入到同步队列中,阻塞当前线程
  2. 当同步状态(锁)被释放后,会将队列首节点线程唤醒,然后使该线程再次去尝试获取同步状态

如下图所示:head 指向当前队列的头节点,其已获取到同步状态(锁)。

获取同步状态失败的节点,会将其依次添加到队列中,tail 维护尾节点。

image-20221017231708935

队列节点状态:

  1. CANCELLED(1):值为1,表示当前节点由于等待超时或被中断,需要取消等待,节点进入该状态后将不会再变化。
  2. SIGNAL(-1):值为-1,表示后继节点线程处于等待状态,等待当前节点唤醒。
  3. CONDITION(-2):值为-2,表示节点等待在Condition上,如果其他线程对Condition调用了signal方法后,CONDITION状态的节点会从等待队列转移到同步队列中。
  4. PROPAGATE(-3):值为-3,共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  5. INITIAL(0):值为0,代表初始状态

观察节点状态发现:负值表示处于有效等待状态,而正值表示节点已被取消。

4.2 独占式获取

核心方法为 acquire() ,如下所示

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

而该方法又由 tryAcquireacquireQueuedaddWaiter 三个方法组成。

这里先给出该方法的执行流程:

  1. 调用 tryAcquire 方法尝试去获取同步状态,其返回值为boolean类型:
    • 若返回True,代表获取同步状态成功。而其取反后,为False,该方法便到此为止了,直接返回
    • 若返回False,代表获取同步状态失败,取反后为True,则需要进一步执行之后的方法
  2. tryAcquire 返回False之后,继续执行 addWaiter 方法,构造同步节点,并通过该方法,将构造好的Node节点以独占模式加入到队列的尾部
  3. acquireQueued 方法使得该节点以死循环方式获取同步状态,一直到获取到才返回
    • 如果在死循环的过程中被中断过,则返回True,否则返回False
  4. 如果线程节点在等待过程中被中断,是不会响应的。只有等到获取到同步状态后,才会执行 selfInterrupt 方法

接着,一起来看一看这三个核心方法的源码

  • tryAcquire

    1
    2
    3
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }

    这个核心方法竟然只有一行代码,而且是抛出异常的代码。如果只看源码,确实很让人疑惑。

    但是结合之前第一部分的内容以及实战案例:我们在实现一个锁时,需要在锁的内部创建一个内部类,并让其继承AQS,重写AQS中的核心方法。而这些核心方法中就包括 tryAcquire。这个获取同步状态(锁)的逻辑需要我们自己实现。

    这也就进一步解释了:AQS简化了锁的实现方式,我们不需要关心同步状态管理、线程排队、等待与唤醒等底层操作逻辑,我们只需要将精力放在锁的核心功能加锁与解锁。

  • addWaiter:直接看注释

    其中if块中执行的内容就是将当前节点添加队列尾部

    • 注意需要使用 compareAndSetTail 方法来确保节点以线程安全的方式被添加到尾部。

      如果仅使用简单的LinkedList来处理节点关系,则会导致多个线程并发地被添加到链表中,造成节点数量和顺序混乱。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private Node addWaiter(Node mode) {
    // 将当前线程构造为同步队列中的节点
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾结点,tail为AQS类的成员变量
    Node pred = tail;
    // 若尾节点不为空
    if (pred != null) {
    // 将当前节点的prev设为尾结点
    node.prev = pred;
    // 将当前节点设置为新的尾结点
    if (compareAndSetTail(pred, node)) {
    // 将旧的尾结点的next设为当前节点
    pred.next = node;
    return node;
    }
    }
    // 若尾节点为空,则调用enq自旋进行入队列操作
    enq(node);
    return node;
    }
    • enq:直接看注释

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      private Node enq(final Node node) {
      // 这是一种自旋方式。一直自旋,直到Node被添加到队尾
      for (;;) {
      // 获取当前队列的尾结点
      Node t = tail;
      if (t == null) {
      // 如果tail为空,表示队列为空,需要新创建一个空节点作为head与tail节点
      if (compareAndSetHead(new Node()))
      tail = head;
      } else {
      // 如果tail不为空,说明队列中存在线程节点,进行入队列操作
      node.prev = t;
      if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
      }
      }
      }
      }

      这里需要注意的是:当 addWaiter 中尾结点为空时,又调用了 enq 方法以自旋方式添加节点,这样可以保证节点被添加成功

  • acquireQueued:直接看注释

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    final boolean acquireQueued(final Node node, int arg) {
    // failed表示是否获取同步状态(锁)失败
    boolean failed = true;
    try {
    // interrupted表示在获取过程中是否被打断
    boolean interrupted = false;
    for (;;) {
    // 通过node.predecessor()拿到其前驱节点
    final Node p = node.predecessor();
    // 如果前驱节点为头节点head,代表当前节点可以尝试去获取同步状态(锁)
    if (p == head && tryAcquire(arg)) {
    // 如果获取成功,则当前节点成为新的头节点head
    // setHead方法会将当前节点的prev设置为null
    setHead(node);
    // 将旧的头节点的next设置为null,有助于gc回收旧的head节点
    p.next = null;
    // 代表成功获取到同步状态(锁)
    failed = false;
    // 返回是否被中断
    return interrupted;
    }
    // 如果当前线程节点可以休息,那就进入等待状态,直至被unpark
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    // 如果等待过程被中断,则标记为true
    interrupted = true;
    }
    } finally {
    // 如果等待过程中没有成功获取到同步状态(被中断或超时),则将该线程节点设置为CANCELLED状态
    if (failed)
    cancelAcquire(node);
    }
    }
    • shouldParkAfterFailedAcquire:该方法用于检查状态,判断当前线程节点的前驱节点是否仍然有效。看注释

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      // 前驱节点状态有效,返回true,当前线程节点便可进入等待状态,等待唤醒
      if (ws == Node.SIGNAL)
      return true;
      // 前驱节点状态大于0,说明无效。
      if (ws > 0) {
      // while循环继续寻找前驱节点之前的有效节点
      do {
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      // 将当前线程节点排到有效节点的后面
      pred.next = node;
      } else {
      // 如果前驱节点有效,将其状态以CAS方式设置为SIGNAL,以便于释放同步状态后通知当前线程节点
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      // 返回false,在acquireQueued方法内继续自旋检查状态
      return false;
      }
    • parkAndCheckInterrupt:该方法用于进入等待状态

      1
      2
      3
      4
      private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);
      return Thread.interrupted();
      }

4.3 独占式释放

release方法用于释放同步状态,并且会唤醒当前线程节点的后继节点,使得后继节点尝试去获取同步状态。源码如下:

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
  • 其中tryRelease的设计模式与tryAcquire一样,均需要我们自己去实现

  • unparkSuccessor:唤醒后继节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private void unparkSuccessor(Node node) {
    // node为当前线程节点,获取其状态
    int ws = node.waitStatus;
    // 如果状态小于0,需要将其置为0(从有效到无效),该过程允许失败
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    // 获取当前节点的后继节点,一般节点不为空且有效
    Node s = node.next;
    // 但如果该节点为空或无效(处于CANCELLED状态),则需要找到有效节点
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }

4.4 共享式获取

先来看看共享式与独占式地获取同步状态的区别:

  • 在同一时刻,共享式获取允许多个线程同时获取到同步状态,而独占式则只有一个线程可以获取到,其他线程均阻塞

共享式获取的一个经典应用便是Semaphore,其作用为控制同时访问某一特定资源的线程数量。

因此,在接下来的解释中,用 资源 代替 同步状态

调用acquireShared方法共享式地获取资源

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

其中tryAcquireShared需要我们自己去手动实现,但是返回结果类型已经被定义好了:

  • 返回负数时,代表获取失败,剩余资源不足
  • 返回非负数时,代表获取成功。如果为0,说明已没有剩余资源了;如果为正数,则代表还有剩余资源。

当为负数时,需要调用doAcquireShared方法,进入等待队列,直到获取到资源时才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void doAcquireShared(int arg) {
// 将当前线程节点添加到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋重复尝试获取资源
for (;;) {
final Node p = node.predecessor();
// 当前线程节点的前驱节点为头节点时
if (p == head) {
// 再次尝试获取资源
int r = tryAcquireShared(arg);
// 资源数大于等于0,说明获取成功
if (r >= 0) {
// 设置头节点,若r > 0即还有剩余资源,则唤醒之后的线程节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

4.5 共享式释放

共享式释放会调用releaseShared方法,释放指定数量的资源。其中tryReleaseShared方法仍然需要我们自己去实现

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

其中doReleaseShared方法主要用于释放完资源后,去唤醒后续处于等待状态的各个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void doReleaseShared() {
// 通过循环与CAS操作确保资源被线程安全地释放
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

5. 写在后面

参考文献:

  1. JDK5.0源码

  2. 《Java并发编程的艺术》

  3. 黑马Java并发编程教程

这个系列大概会有5篇左右的样子,我尽可能把自己对于JUC的理解通俗易懂地写出来

但如果有错误的地方,请大家指出来,我会及时去学习与改进~

如果大家觉得我的内容写的还不错,可以在评论区留言支持一下呀~

欢迎大家来逛一逛我的个人博客~

此外,从2022.10.1开始,为激励自己持续刷题,我开始持续日更Leetcode题解啦~

所有题解均已放到Github仓库中,欢迎大家Star并提出Issue~


详解JDK锁02:AQS
https://ltyzzzxxx.github.io/2022/10/21/详解JDK锁02:AQS/
作者
ltyzzz
发布于
2022年10月21日
许可协议