1、认识AQS
使用过Java中的锁对象,一定会对一个锁很熟悉—ReentrantLock。这是一个可重入的锁。大部分情况是作为一些情况替换synchronized这个关键字的方案。synchronized这个关键字是Java内部实现同步机制的,那么ReentrantLock实现的方式是什么?
它背后的大佬就是大名鼎鼎的AQS(AbstractQueuedSynchronizer)
AQS——简单来说,就是提供一个实现阻塞式锁和相关同步器的框架。它的内部是依赖一个FIFO阻塞队列实现以上功能的。
1.1 、AQS的两种模式
AQS中有两种获取和释放资源的模式,两个方式都很容易理解。
独占式:只允许同时一个线程获取同一个许可。可以对应为一些锁ReetrantLock
共享式:允许同时多个线程获取同一个许可。可以对应一些同步器类似Semaphore
Exclusive vs Shared 模式
在此队列中,通过一个非常简单的方式区分两种模式。就是以下两个静态常量。
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
1.2、AQS的子类
除了ReentrantLock之外还有以下的类使用到了AQS。这个版本是jdk1.8
基于AQS的相关类.png
2 、AQS内部结构
AQS内部的结构不算太复杂,这里先将其中重点的几个字段列出来
public abstract class AbstractQueuedSynchronizer{
private volatile int state;
private volatile Node head; // 队列的头节点
private volatile Node tail; // 队列的尾节点
...
}
state字段代表的同步的状态,它有什么用?我们暂时留个疑问,只需要知道它是用来决定同步的一个状态。后面分析流程时,再来分析这个字段。
这里我们先重点分析AQS的内部类Node
2.1、锁队列节点Node
AQS的锁队列就是依靠此内部类实现的。它是一个双向队列,内部存储了线程的信息。
static final class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 节点的线程信息
volatile int waitStatus; // 线程的等待状态
Node nextWaiter; // 用于条件队列使用
...
}
前三个字段其实很好理解,既然是双向队列,那就会有前驱和后继节点,而线程信息也是需要保存的。waitStatus这个字段是什么作用的?
waitStatus
首先我们看看这个状态的值有什么意义。它在内部中定义了waitStatus只能有以下四个值。
CANCELLED:值为1。代表此节点中的线程因为超时或者中断而被取消了。这种线程不会被再次阻塞。
SIGNAL:值为-1。代表此节点的后继结点马上要或者已经阻塞了。所以,当前节点必须在它释放或者取消的时候,取消它后继结点的阻塞状态。为了避免出现竞争,获取许可的时候先要表明需要一个信号signal。然后开始原子式的获取,失败后就阻塞。
CONDITION:值为-2。队列在Condition Queue中,等待某一个条件。
PROPAGATE:值为-3。代表后继节点会传播唤醒的操作,此值在共享模式下启用。
对于一个线程来说,判断它能否获得许可,就需要根据waitStatus的值。
2.2、条件队列ConditionObject
除了锁队列之外,AQS还有一个条件队列,本文暂不分析。
3 、独占式AQS解析
介绍上述内容后,依然对上面提到的变量还有模式很模糊,后面我们开始分析具体的AQS流程,在流程中,我们将逐一讲解上述的内容的实际意义。
3.1、acquire操作
在AQS中的入口即是acquire。AQS 获取独占式的许可,需要调用acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里方法传进来了一个arg参数,arg的意义AQS不做具体定义和处理。简单来说,它是可以跟AQS 中的state字段进行联合使用的,用来判断现在是否能完成同步操作,而怎么使用state这个字段应该是交给子类去实现的,
3.1.1、tryAcquire
tryAcquire方法是AQS交给子类处理state与传入的arg的方法,子类需要判断是否允许当前线程在独占模式下进行acquire。
tryAcquire是一个空方法,具体实现方式在子类中。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里我们以ReentrantLock内的公平锁作为例子,分析具体情况下,子类的tryAcquire应该做什么工作。
protected final boolean tryAcquire(int acquires) {
...
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
...
setState(nextc);
return true;
}
return false;
}
这里我们不展开分析方法的具体意义。在ReentrantLock的公平锁中,state代表的是可重入的次数,简单来说就是多少对lock和unlock方法。
如果state等于0代表没有使用过lock方法,那就会进行两个条件判断。
AQS队列中是不是没有其他还在等待的线程;
CAS更新State字段成功与否。
如果上面两个条件都是true,那么就认为当前线程可以持有这个锁。
如果state不等于0,这就代表此次调用的lock方法不是第一对。那么只要当前线程与存储的当前持有锁的线程为同一个线程,就会state+1。表示当前lock的对数为2对。以此来实现可重入的锁!
3.1.2、addWaiter
在tryAcquire获取许可权失败后,addWaiter就是用来将当前失败的线程,添加进等待队列。传入的参数也就是AQS的模式,在1.1小节中提到的final两个字段。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 当等待队列尾部不为空时,有在等待的线程
if (pred != null) {
// 将新增的节点的前驱节点链接到尾部节点。
node.prev = pred;
// 采用CAS的方式进行数据的更新,更新成功后,则将原尾节点的后继节点链接到新增的节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队列为空或者当前的尾节点已经变化了,则直接向队列中添加这个新的节点
enq(node);
return node;
}
addWaiter的大致流程在代码中注释中已经说明了,需要解释的一点是为什么采用CAS更新并连接线程节点的前驱节点。Doug Lea在AQS的文章中提到过,目前并没有很好的原子方式去更新双向队列。所以采用了上述的择中方案。
Q1:为什么采用CAS更新前驱节点保证其原子性,而对后继节点采用普通的赋值方式?
AQS采用的CLH锁的变体.
CLH锁中每个节点都回去对自身的前驱节点判断是否可以访问然后进行自旋,而AQS将自旋改为了LockSupport的park阻塞过程(这个过程在后文会解释!)。因此需要保证前驱节点一定是原子性,在不断的访问前驱节点过程中一定是最新的。这样即使next域不是最新的,也并不影响在队列中阻塞的过程,因为他们都是访问前驱节点。
然后,我们重点关注的是enq过程。
// 将新的节点插入到等待队列的尾部。如果等待队列不存在,一定要对其初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空情况下,CAS操作将头节点进行更新,将头节点设置为空节点。
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾节点存在的情况下,代表AQS等待队列已经初始化完成。
// 与addWaiter一样,先将新增节点的前驱节点链接到尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
// 当更新尾节点成功后,就将原尾节点的后继节点连接为新增的节点
t.next = node;
return t;
}
}
}
}
AQS中对锁队列的初始化过程并不是在类建立的时候初始化,而是第一次有线程开始获取同步资源时。它初始化时将头节点设定为一个空的Node节点,新增的节点都添加在头节点后的后继节点中。
Q2:为什么要初始化成一个空节点作为头节点?
这个与后面文章的内容acquireQueue方法中入队操作有关。Q1问题提到了AQS借用了CLH的思想,也就是不断对前驱节点的状态进行访问。那么在判断是否可以获取资源时,也需要再次访问前驱节点。如果初始化的时候不将头节点赋值为空节点,那么前驱节点为null,这个与不是初始化阶段的队列情况不同,势必会复杂后面的流程。这也是AQS的简单优化之一。
AQS的初始化结构.png
新增线程节点的AQS结构图.png
3.1.3、acquireQueued
在将新的线程节点入队后,这个方法将会执行阻塞,并根据当前线程信息更新队列。它采用一个循环去处理队列中的信息,只有队列中获取节点成功才能从循环中弹出。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 该节点的前驱节点是头节点,就尝试去获取一下许可
// 当获取许可成功后,就可以将头节点设置为当前的节点
// 并将后继节点置空,重置状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当获取许可失败后,会判断需不需要对线程进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果执行过程中出现了了错误,或者中断后,就会对数据进行处理,完成销毁的过程。
if (failed)
cancelAcquire(node);
}
}
这里的核心在于两个if语句的内容:
第一个if语句主要实现对前驱节点的判定,只有是头节点才会再次尝试能否拿到资源的许可访问权。当拿到了许可权后,也就是持有独占权,那么就会将头节点更新为当前的线程节点。返回的状态为interrupted状态。
第二个if语句是前驱节点不是头节点或者在许可获取失败后,对线程节点的状态进行更新,并对合适的线程进行阻塞操作。
3.1.4、shouldParkAfterFailedAcquire
在线程不是头节点或者许可获取失败后,会调用此方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 只有SIGNAL状态才能阻塞线程
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// 向队列的前驱节点找,一直到找到一个状态码小于0的,也就是非CANCEL状态的
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把找到节点后继节点链接到当前节点
pred.next = node;
} else {
// 如果前驱节点的状态是小于0的,CAS方式将前驱节点的状态更新为SIGNAL。同时不对线程阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这个函数是整个循环中处理信号状态的唯一方法。信号是什么?也就是我们在Node章节提到的waitStatus状态。
waitStatus状态初始化的值是0。如代码中描述的,waitStatus只有前驱节点是SIGNAL状态才会将当前的线程阻塞住。其他情况下,waitStatus为非正数时,就会将前驱节点的状态更新为SIGNAL。而状态为正数时,代表节点被取消了,就会不断向前寻找,直到找到未被取消的节点。取消的节点通通会被垃圾回收机制处理掉。
【举个栗子!就是一个栗子】这样理解起来可能还是比较困难,我们可以试想一个情形,两个线程依次争夺独占权,线程A先获得了独占权,获得后的AQS等待队列的状态是如下图所示
线程A获得了独占权后的等待队列结构.png
在线程A释放独占权之前,线程B也发起了独占权的争夺。于是,按照之前的过程分析,在获取占有权失败后,会调用addWaiter方法,线程B的节点会被添加到等待队列中。添加后的结构就如下图所示