AQS

文章目录
  1. 1. AQS类内部结构
  2. 2. acquire方法
  3. 3. release方法
  4. 4. Condition与ConditionObject
  5. 5. await方法
  6. 6. signal方法
  7. 7. signalAll方法

AQS类内部结构
public abstract class AbstractQueuedSynchronizer {

private transient volatile Node head; // 同步队列头节点
private transient volatile Node tail; // 同步队列尾节点
private volatile int state; // 锁状态值

static final class Node {
/*nextWaiter 属性的几个状态值*/
static final Node SHARED = new Node(); // 标记节点在共享模式下等待
static final Node EXCLUSIVE = null; // 标记节点在排他模式下等待
/*waitStatus 属性的几个状态值*/
static final int CANCELLED = 1; // 表示节点已经被取消
static final int SIGNAL = -1; // 表示后面节点的线程需要被唤醒。
static final int CONDITION = -2; // 表示节点在条件队列上
static final int PROPAGATE = -3; // 关于共享锁的状态

volatile int waitStatus;
volatile Node prev; // 前驱
volatile Node next; // 后继
volatile Thread thread; // 节点封装的线程
Node nextWaiter; // 下一个等待的节点
// addWaiter方法调用时,会构造一个Node节点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
}

acquire方法

该方法定义为获取,ReentrantLock内部获取锁的操作会调用该方法。该方法的执行流程为:

① 执行tryAcquire,若是该方法返回为true,直接结束。否则执行下一步操作。(tryAcquire是一个空方法,具体逻辑由子类来实现)。

② 执行addWaiter方法,参数值为Node.EXCLUSIVE。① 该操作首先会将当前线程封装成为一个Node节点,该Node节点的thread值为当前线程,nextWaiter值为null。② 将创建好的该节点添加到同步队列的尾部。③ 将该节点返回。

③ 执行 acquireQueued(Node node, int arg)方法。该方法以自旋开始。① 获取到给定节点node的前驱节点p。若是p为头节点并且尝试获取锁成功,则将node节点设置为头节点(threadprev属性设置为null),将p节点清除,返回线程中断表标识。② 若上面条件不满足或执行失败,执行shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法。③ 首先是会执行shouldParkAfterFailedAcquire(p, node)方法的,该方法是判断当前线程获取锁失败后,是否需要挂起。执行逻辑是:判断前一个节点pwaitStatus状态值,若是该值为Node.SIGNAL,返回true。其它情况返回false。④ 若是shouldParkAfterFailedAcquire(p, node)方法返回值为true,执行parkAndCheckInterrupt方法,否则执行 ①。⑤ parkAndCheckInterrupt方法首先将当前线程挂起,然后返回线程的中断标识,若是为true,会将interrupted属性设置为true

④ 由于acquireQueued(Node node, int agr)方法返回的是当前线程的中断标识,若是返回为true,立马中断当前线程。

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 一个空方法,具体逻辑由子类来实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// addWaiter方法,添加一个节点到同步队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 添加一个节点到同步队列,直到成功为止
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

// 一段自旋操作中,成功获取到锁,该方法才会结束。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}

// 将给定的节点设置为头节点,① head指向node ② node的thread属性指向null ③ node的prev属性指向null
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

// 当尝试获取锁失败后,判断是否应该将当前的线程挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前一个节点waitStatus状态值为SIGNAL(-1),线程可以被安全的挂起
if (ws == Node.SIGNAL) {
return true;
}
// 若是前一个节点被取消了(1),跳过该节点
if (ws > 0) {
do {
// pred和node.prev重新赋值
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将pred的next指向node节点
pred.next = node;
} else {
// 其它情况,CAS操作将 pred的状态值修改为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 挂起当前线程并且返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

release方法

ReentrantLockunlock方法中,会直接调用此方法。该方法也是一个模板方法,① 首先会去调用tryRelease(arg)方法,此方法在AQS中是一个空方法,具体逻辑由子类来实现。 ② 若是 ① 调用后返回值为true,执行unparkSuccessor(Node node)方法。unparkSuccessor(Node node)方法会去唤醒node节点的下一个节点。③ 若是 ① 调用后返回值为false,直接返回false

// 首先执行子类的tryRelease方法,根据其返回值执行具体的操作
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
// 抽象方法,由具体的子类来实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 若是node节点存在后继节点,则将其唤醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// 当node节点的next节点为null或者其已经被取消时,从尾节点向前遍历,找到一个节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 找到了一个不为null的节点,将其唤醒
if (s != null) {
LockSupport.unpark(s.thread);
}
}
Condition与ConditionObject

在AQS内部定义了一个类ConditionObject,该类实现了Condition接口。类直接的定义关系如下:

public abstract class AbstractQueuedSynchronizer{
...
public class ConditionObject implements Condition {
// 条件队列头节点
private transient Node firstWaiter;
// 条件队列尾节点
private transient Node lastWaiter;
}
...
}

await方法

该方法是将当前线程添加到条件队列上。执行逻辑:① 若是当前线程已经被打断,直接抛出异常。 ② 调用addConditionWaiter方法,将当前的线程封装成为一个 Node 节点,添加到等待队列的尾部,并返回该节点。③ 调用fullyRelease方法释放当前线程获取到的锁。④ while循环,调用 isOnSyncQueue 方法判断节点是否在同步队列,没在同步队列执行 while 循环体,挂起当前线程。

public final void await() throws InterruptedException {
// 判断中断,抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加到等待队列
Node node = addConditionWaiter();
// 释放该线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 添加一个新的节点到等待队列。
// 1. 清除被取消的节点
// 2. 将当前线程封装成Node节点,添加到等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 释放当前线程获取到的锁
// 1. 获取到锁状态 state 的值
// 2. 调用 release 方法释放锁
// 3. 释放成功,返回state 之前的值。
// 4. 释放失败,抛出异常。
// 5. 若是释放失败,将当前线程节点的 waitStatus 状态值设置为 Node.CANCELLED(取消)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}
// 判定给定的节点是否在同步队列上
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
if (node.next != null) {
return true;
}
return findNodeFromTail(node);
}

signal方法

① 调用isHeldExclusively方法,返回false,直接抛出异常,否则执行下一步。② 获取到等待队列的第一个节点,将其做为参数,调用doSignal方法。③ doSignal方法会将给定的节点从等待队列放到同步队列的尾部。

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) {
doSignal(first);
}
}
// 判断获取到锁的线程是否为当前线程
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// ReentrantLock的Sync的内部实现
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 将 first 节点从等待队列移动到同步队列
// 1. 将first节点从等待队列中去除
// 2. 调用 transferForSignal 方法将 first节点放到同步队列
// 3. 若是上一步失败,尝试下一个节点
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// 将 node 节点添加到同步队列的尾部
// 1. cas操作将 node 节点的 waitStatus 状态值由 Node.CONDITION修改为0,修改失败返回false。
// 2. 将node节点添加到同步队列的尾部
// 3. 若是node节点的waitStatus状态值大于0,或者将node节点的状态值设置为SIGNAL失败,唤醒该节点线程。
// 4. 返回 true。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signalAll方法

具体逻辑:大体和signal方法相同,但其会执行doSignalAll(first)方法而不是doSignal(first)方法。

public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 1. 清除等待队列 lastWaiter = firstWaiter = null;
// 2. 从 first节点开始,依次调用 transferForSignal方法,直到节点为null。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}