ReentrantLock

文章目录
  1. 1. 类关系与锁初始化
  2. 2. 方法调用关系
  3. 3. lock之公平与非公平
  4. 4. unlock执行逻辑
  5. 5. LockSupport与Unsafe

类关系与锁初始化

ReentrantLock类内部有一个属性,Sync:sync,该属性对应的类是定义在ReentrantLock内部的一个静态类(抽象类)。ReentrantLock内部又为Sync定义了两个子类NonfairSyncFairSync。根据类名便可看到非公平与公平之分。我们通过api使用ReentrantLock类时,会调用其构造函数创建一个该类型的对象,构造函数可传参数true|false,确定创建公平或非公平锁。若没传递参数,会默认创建非公平锁。

// 几个类定义关系图
public class ReentrantLock{
...
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {

}
static final class NonfairSync extends Sync {
...
}
static final class FairSync extends Sync {
...
}
}

public abstract class AbstractQueuedSynchronizer{
...
}

// 默认创建非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 参数为 true 创建公平锁,false 创建非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

方法调用关系

调用ReentrantLock:lock方法或是ReentrantLock:unlock方法时,ReentrantLock会委托Sync来执行具体的上锁或者是释放锁的逻辑。① 关于lock,在Sync内部定义了一个抽象方法Sync:lock,在其子类FairSyncNonfairSync均实现了这个方法。这两个实现类的lock方法最终又会调用Sync的父类AQSacquire方法。② 关于unlock,实际上调用的是Sync的父类AQSrelease方法。

/*****ReentrantLock*****/
// 上锁,调用sync的lock方法
public void lock() {
sync.lock();
}
// 解锁,调用sync的父类aqs的release方法
public void unlock() {
sync.release(1);
}
/*****Sync*****/
// 上锁-> 抽象方法,具体逻辑由子类FairSync或NonfairSync来实现
abstract void lock();
// 解锁-> Sync的父类AbstractQueuedSynchronizer定义的方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// FairSync类实现的lock方法
final void lock() {
acquire(1);
}
// NonfairSync类实现的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

lock之公平与非公平

NonfairSync:lock方法在执行时,首先会通过CAS操作直接修改锁状态state的值,若是执行成功,则成功获取到锁,若是执行失败,调用AbstractQueuedSynchronizer:acquire(1)方法来执行普通的上锁逻辑。FairSync:lock方法在执行时,直接调用 AbstractQueuedSynchronizer:acquire(1)方法来执行普通的上锁逻辑。(非公平锁直接忽略了排队等待获取锁的线程)

/*****NonfairSync*****/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/*****FairSync*****/
final void lock() {
acquire(1);
}

② 在上面的分析中,非公平锁的实现类在尝试设置锁失败时,直接调用AQSacquire方法,而公平锁的实现类直接调用AQSacquire方法。AQS:acquire方法属于一个模板方法,内部定义了获取锁的操作步骤。其中tryAcquire方法是其第一个调用的方法,也是一个没有实现的空方法,具体的实现逻辑由其子类来实现。

NonfairSync:tryAcquire方法内部会调用其父类的Sync:nonfairTryAcquire方法,该方法语义为非公平尝试获取。具体逻辑:首先判断锁的状态,若是没有线程获取到锁,执行通过CAS操作修改锁状态,若是成功返回true。若是未成功,继续判断获取锁的线程是否为当前线程,若是当前线程,将锁状态值加上方法传入的值,并将锁状态值更新(可重入),返回true,否则返回false

FairSync:tryAcquire方法的执行逻辑:判断是 否有线程获取到锁,若是没有线程获取到锁,继续判断是否有线程等待获取锁,若是无线程等待获取锁,CAS操作获取锁,成功返回true。操作获取锁失败,或者有线程等待获取锁,或者已有线程获取到锁,继续判断当前获取到锁的线程是否为当前线程,是则执行可重入锁操作并返回true,否则返回false

/*****AbstractQueuedSynchronizer *****/
// AQS中定义的获取锁的模板方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 一个未实现的方法,具体逻辑由子类来实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/*****NonfairSync*****/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/*****Sync*****/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

/*****FairSync*****/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

unlock执行逻辑

调用ReentrantLock:unlock方法时,Sync会调用父类的AbstractQueuedSynchronizer:release方法。

具体逻辑是:① 首先tryRelease方法会被调用,而这个方法在AQS中是一个没有具体实现的空方法,具体逻辑由子类Sync来实现。② 子类中首先会去判断当前线程是否就是获取到锁的线程,若不是直接抛出异常,否则执行下一步的逻辑。③ 获取到锁状态state的值,并用state值减去方法传的参数值(释放多少次),计算出一个结果。④ 若是计算出来的这个结果为0,将state的值设置为0,并将获取锁的线程设置为null,并返回true。⑤ 若是计算出来的这个结果不为0,将state的值设置为计算得到的值,返回false

子类执行完tryRelease方法后,会返回一个值。① 若是该值为trueAQS:release方法会执行其它逻辑,执行完返回true。② 若是该值为falseAQS:release方法直接返回false

/*****ReentrantLock *****/
public void unlock() {
sync.release(1);
}

/*****AbstractQueuedSynchronizer *****/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

/*****Sync*****/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

LockSupport与Unsafe

LockSupport中提供了两个方法,parkunpark方法,而这两个方法实际上又是调用的Unsafe类中的native方法。park是将当前调用线程阻塞,而unpark方法则是唤醒指定的线程。

public class LockSupport {
// jdk内部用于操作内存地址的类
private static final sun.misc.Unsafe UNSAFE;

static {
try {
...
UNSAFE = sun.misc.Unsafe.getUnsafe();
...
} catch (Exception ex) { throw new Error(ex); }
}

// 调用 Unsafe 类的park方法将当前线程阻塞
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 调用 Unsafe 类的unpark方法唤醒指定的线程
public static void unpark(Thread thread) {
if (thread != null) {
UNSAFE.unpark(thread);
}
}
}

// jdk核心类库可操作的api
public final class Unsafe {

private static final Unsafe theUnsafe;
static {
...
theUnsafe = new Unsafe();
...
}
private Unsafe() {}

// 该类中的 theUnsafe 属性为该类的一个实例,对外只提供给jdk核心类库使用。我们在平常的开发中无法通过该方法来获取 theUnsafe 对象。(可以通过万能的反射方式获取)
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

// native方法,阻塞当前线程
public native void park(boolean var1, long var2);
// native方法,唤醒指定的线程
public native void unpark(Object var1);
}

关于可重入锁的Condition分析—-> 等待队列

// ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}

// Sync
final ConditionObject newCondition() {
return new ConditionObject();
}

// AQS->ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;

public ConditionObject() { }

// 添加一个等待的线程
// 若是最后一个节点不为null,并且其 waitStatus 状态值不为 CONDITION。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 添加之前先清理一番
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个节点, 将其加入到同步队列的尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}

// 这个方法的目的就一个: 从等待队列中清除 waitStatus 状态值不为 CONDITION 的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 在节点 t 不为 null 的情况下,不断的执行循环操作
while (t != null) {
// 首先保存下一个节点,然后判断当前节点
Node next = t.nextWaiter;
// 若是节点的 waitStatus 状态值不为 CONDITION,将节点 t 的 nextWaiter指向为null
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 这种情况是: 第一个节点的 waitStatus状态值已经不为 CONDITION 了,可能 next节点为等待节点,故先将 firstWaiter指向 next。若是当前的 trail不为null, 则 trail的nextWaiter也应该指向 next
if (trail == null) {
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
// 这种情况是: 遍历到了最后一个节点,并且其 waitStatus 状态值不为 CONDITION,故需要在跳出循环之前将lastWaiter节点更新为 trail节点。
if (next == null) {
lastWaiter = trail;
}
// 若是节点的 waitStatus 状态值为 CONDITION
} else {
trail = t;
}
// 下一次循环遍历的节点
t = next;
}
}

/****************************************************/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取锁状态标识(0: 未上锁 >=1 上锁)
int savedState = getState();
// 释放成功,返回释放前 state 的值
if (release(savedState)) {
failed = false;
return savedState;
// 否则直接抛出异常
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 若是操作失败,将当前节点的waitStatus状态值设置未CANCELLED
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}

// 缩放锁->在调用await方法时,释放掉当前线程所持有的锁
public final boolean release(int arg) {
// tryRelease时 ReentrantLock的内部类Sync的方法,成功释放锁返回true,否则返回 false。
if (tryRelease(arg)) {
Node h = head;
// 当前线程已经进入到await状态,唤醒啦一个线程来获取锁
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
// 其它情况返回 false
return false;
}

// 节点添加到同步队列和条件队列的区别(两点区别判断给定节点当前所处位置)
// 同步队列: 双向列表,节点有 prev 和 next 指针指向前驱节点和后继节点。nextWaiter 状态值为 null。
// 条件队列: 单向列表,节点有 nextWaiter 指针指向后继节点。waitStatus 状态值为 Node.CONDITION
// 若是节点已经在同步队列上,则返回 true
final boolean isOnSyncQueue(Node node) {
// 若是 waitStatus 为 Node.CONDITION,证明在条件队列,返回 false。
// 若不是在条件队列,但是节点的 prev 为 null,也返回 false。
if (node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
// 若是节点的 next 指针不指向 null,直接返回 true
if (node.next != null) {
return true;
}
// 从同步队列的尾部找
return findNodeFromTail(node);
}
// 1. 若是在同步队列中找到了,返回 true。
// 2. 若是在同步队列中没有找到,返回 false。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// 若是找到的化,直接返回 true
if (t == node) {
return true;
}
// 已经找了一遍,但还是没有找到,返回 false
if (t == null) {
return false;
}
// 获取前一个节点判断
t = t.prev;
}
}

//
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}


// 关于await方法
// 1. 若是当前线程已经被中断,直接抛出异常,结束方法的运行。
// 2. 调用 addConditionWaiter 方法, 将当前的线程封装成为一个 Node 节点,添加到等待队列的尾部。
// 3. 调用 fullyRelease 方法,释放当前线程所获取到的锁。
// 4. while循环,调用 isOnSyncQueue 方法判断节点是否在同步队列,没在同步队列执行 while 循环体。挂起当前线程。
//
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 加入到等待队列
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 线程没在同步队列。
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
// 当前线程被唤醒时,执行。检查线程是否被打断,若是已经被打断,结束执行。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}

//
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 前驱节点为 head,尝试获取锁,并获取成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败执行 park 操作。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}





// signal 方法分析
public final void signal() {
// 拥有锁的线程非当前线程,直接抛出异常
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取到条件队列的第一个节点,对其执行 doSignal 操作。
Node first = firstWaiter;
if (first != null) {
doSignal(first);
}
}

// signalAll 方法分析
public final void signalAll() {
// 同 signal
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取都条件队列的第一个节点,对其执行 doSignall 操作
Node first = firstWaiter;
if (first != null) {
doSignalAll(first);
}
}

// 操作是将给定的节点从条件队列转移至同步队列。若是操作失败,会去尝试给定节点的下一个节点,直到条件队列中已经没有节点了。
private void doSignal(Node first) {
do {
// 总会执行的操作: firstWaiter 指向 first 节点的 nextWaiter 节点。
if ( (firstWaiter = first.nextWaiter) == null) {
// 若是 firstWaiter 为 null 了,则 lastWaiter 也将指向 null。
lastWaiter = null;
}
// 下面的操作是将 first 节点添加到 同步队列,故将其 nextWaiter 属性值指向为 null。
first.nextWaiter = null;
// 若是将 first 节点从条件队列转移至同步队列失败并且 firstWaiter 不为 null。
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// Node 节点 waitStatus 属性的几个状态值。
// static final int CANCELLED = 1; 表示节点已经被取消
// static final int SIGNAL = -1; 表示后面节点的线程需要被唤醒。
// static final int CONDITION = -2; 表示节点在条件队列上
// static final int PROPAGATE = -3; 关于共享锁的状态
// 0; 非以上值,取0
// Node 节点 nextWaiter 属性的几个状态值
// static final Node SHARED = new Node(); 标记节点在共享模式下等待
// static final Node EXCLUSIVE = null; 标记节点在排他模式下等待

// 操作: 将节点从条件队列转移至同步队列,操作成功返回 true。
final boolean transferForSignal(Node node) {
// case 操作尝试,失败返回 false。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
// 将该节点添加到同步队列的尾部,并返回该节点的前一个节点
Node p = enq(node);
// 获取返回节点的 waitStatus 状态值
int ws = p.waitStatus;
// ws 节点被取消
// ws 未被取消,尝试将其waitStatus状态值从 ws 设置为 SIGNAL失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
// 唤醒该节点的线程
LockSupport.unpark(node.thread);
}
// 返回 true。
return true;
}

// 将节点插入到队列,必要时初始化。 返回值为 node 节点的前一个节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 同步队列为 null,初始化 head 和 tail。
if (t == null) {
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

// doSignalAll, 唤醒条件队列上的所有线程
private void doSignalAll(Node first) {
// 清除条件队列
lastWaiter = firstWaiter = null;
// 循环操作,从第一个节点开始,将条件队列上的所有节点添加到同步队列
do {
// 保存下一个节点
Node next = first.nextWaiter;
// 清除 first 节点的 nextWaiter属性
first.nextWaiter = null;
// 将 first 节点转移至同不队列
transferForSignal(first);
// 更新 first 节点
first = next;
} while (first != null);
}
}