CountDownLatch

文章目录
  1. 1. 应用场景
  2. 2. 源码分析
应用场景

① 下一步的任务需要等待上一步任务执行完成才能执行的场景。比如:有一批任务交给线程池来处理,我们需要知道任务从开始到结束一共执行了多长的时间。下面的代码展示了这种场景的使用。

public void unite(JobExecutionContext jobContext) throws Exception {
Set<String> batchNoSet = getBatchSet(jobContext);
Set<String> idCardSet = updateLoanInfoFromSrc(batchNoSet);
// 任务开始之前获取到系统现在的时间
Long start = System.currentTimeMillis();
// 创建 CountDownLatch 对象,以任务个数做为该对象的初始化参数
CountDownLatch latch = new CountDownLatch(idCardSet.size());
for (String idCard : idCardSet) {
// 创建任务线程
CaseUnitedThread thread = new CaseUnitedThread(idCard.trim(), latch);
// 任务交给线程池处理->每执行一次任务,CountDownLatch对象的 state值减去1
threadPool.execute(thread);
}
// 等待归户完毕
latch.await();
logger.info("case united finish: total time = {}", System.currentTimeMillis() - start);
}
源码分析

类结构

CountDownLatch类内部定义比较简单,有一个类型为Sync:sync属性,而CountDownLatch最重要的两个方法countDownawait的内部实现是由Sync:sync来完成的。而Sync类又继承自AQS

// jdk1.8
public class CountDownLatch {
// 内部属性,相关操作会交由该对象来完成
private final Sync sync;
// 构造函数,初始化 sync 属性
public CountDownLatch(int count) {
// 参数值必须大于等于0
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

...
...
// 静态内部类,继承了AQS
private static final class Sync extends AbstractQueuedSynchronizer {
// 对外提供的构造函数,初始化对象时,会将 AQS 中的 state 值设置为 count
Sync(int count) {
setState(count);
}
...
// 尝试获取共享锁,若是 state 值为0,返回 1,否则返回 -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享锁
protected boolean tryReleaseShared(int releases) {
// 自旋,可能多个线程同时尝试释放共享锁,同时修改 state 值,造成线程安全问题
for (;;) {
int c = getState();
// 若是 state 的值已经为 0,则尝试释放共享锁失败。
if (c == 0) {
return false;
}
// 通过 CAS 操作将原有 state 的值减去1,失败重试
int nextc = c-1;
if (compareAndSetState(c, nextc)) {
// state值为0时,返回 true
return nextc == 0;
}
}
}
}
}

countDown方法

当某个线程调用CountDownLatchcountDown方法时,内部操作的原理其实是将AQS中的state属性值减1


① 当一个线程调用countDown方法时,该方法内部会调用Sync:syncreleaseShared(int arg)方法。而releaseShared(int arg)方法则是由Sync的父类AQS定义的方法。

AQSreleaseShared(int arg)方法首先调用tryReleaseShared(int arg)方法,该方法在AQS中是一个空方法,具体逻辑在其子类Sync中实现。

SynctryReleaseShared(int arg)方法尝试释放共享锁。由于可能会有多个线程同时调用countDown方法修改state的值,故对state值的修改需要使用自旋CAS操作。若修改前读到state的值为0,直接返回false。若修改前state的值不为0,通过CAS操作将其值修改为state-1,若修改后的值为0,返回true

④ 第 ③ 方法其实是执行了两个操作,第一是将state的值减去1,第二是判断修改后的state值是否为0。若是第 ③ 步返回值为true,证明state的值已经为0,可以做一些操作来唤醒调用await方法的线程。故第 ② 步判断返回值为true的话,执行doReleaseShared()方法。


doReleaseShared释放共享锁,一段自旋操作。① 该方法首先获取到头节点,使用变量h保存。若是当前h节点不为空并且h节点不等于尾节点时,执行操作(这么判断的原因在于,若是头节点等于尾节点,就没必要唤醒线程。因为若是只有一个头节点时,没有需要唤醒的线程)。② 若节点hwaitStatus属性值为Node.SIGNAL,则CAS操作将其设置为0。若设置成功,调用unparkSuccessor方法唤醒线程。若设置失败,继续执行for循环。③ 若节点hwaitStatus属性值为0,则通过CAS操作将其设置为Node.PROPAGATE。若设置失败,继续执行for循环。若设置成功,继续执行后面的语句,即当h==head时,跳出for循环。

⑥ 在上述步骤 ⑤ 中,将waitStatus状态值由Node.SIGNAL设置为0成功后,调用了unparkSuccessor方法,该方法会唤醒传入节点的下一个节点的线程。① 该方法首先会获取到传入节点nodewaitStatus属性值,若是该值小于0,通过CAS操作将该值修改为0。② 获取到node节点的下一个节点s,该节点需要是waitStatus属性值小于等于0的。③ 调用LockSupportunpark方法唤醒s节点所对应的线程。

// CountDownLatch
public void countDown() {
// 调用sync的releaseShared方法
sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
// tryReleaseShared
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// AQS
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

// Sync
protected boolean tryReleaseShared(int releases) {
// 自旋,可能多个线程同时尝试释放共享锁,同时修改 state 值,造成线程安全问题
for (;;) {
int c = getState();
// 若是 state 的值已经为 0,则尝试释放共享锁失败。
if (c == 0) {
return false;
}
// 通过 CAS 操作将原有 state 的值减去1,失败重试
int nextc = c-1;
if (compareAndSetState(c, nextc)) {
// state值为0时,返回 true
return nextc == 0;
}
}
}

// AQS-> 释放共享模式的动作
private void doReleaseShared() {
for (;;) {
// 保存头节点
Node h = head;
// 头节点不为空并且头节点不为尾节点
if (h != null && h != tail) {
// 获取到头节点的等待状态
int ws = h.waitStatus;
// 若是头节点的等待状态为SIGNAL
if (ws == Node.SIGNAL) {
// 节点的状态由 Node.SIGNAL更新为0失败,继续执行自旋
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
// 节点状态由 Node.SIGNAL更新为0成功,调用unparkSuccessor(Node node)唤醒后继节点
unparkSuccessor(h);
// 若是该节点的状态为0,并且由0设置为PROPAGATE失败,继续执行自旋
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
}
if (h == head) {
break;
}
}
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// ws小于0的状态有三个 SIGNAL(-1) CONDITION(-2) PROPAGATE(-3)
// 若是 ws 小于0,CAS操作将其修改为0
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// ws大于0仅有一种情况,即 CANCELLED(1)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点找到一个距离node节点最近的状态值<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 若是该节点不为null,直接唤醒该节点线程
if (s != null) {
LockSupport.unpark(s.thread);
}
}

await

当一个线程调用一个CountDownLatch对象的await方法时,该线程会等到其state值为0时执行,否则不会执行。

① 当调用 CountDownLatchawait 方法时,会调用Sync:syncacquireSharedInterruptibly(int arg)方法,而acquireSharedInterruptibly(int arg)方法是由Sync:sync的父类AQS定义的方法。

AQSacquireSharedInterruptibly(int arg)方法首先会去判断当前线程是否被中断,若是当前线程已经被中断,则直接抛出中断异常。若是当前线程没被中断,会去调用tryAcquireShared(int arg)方法。而该tryAcquireShared(int arg)方法在AQS中是一个空方法,具体的逻辑是由子类Sync来实现的。

Sync重写父类的tryAcquireShared(int arg)方法,其方法的目的就是判断当前state的值是否为0,若是该值为0,返回1,否则返回-1

④ 步骤 ② 会根据步骤 ③ 方法返回的值进行判断。若是返回值不小于0,该方法什么也不做(不会挂起当前的线程);若是返回值小于0,证明state的值不为0,当前线程不能执行,需要执行进一步的操作,即调用doAcquireSharedInterruptibly(int arg)方法。


doAcquireSharedInterruptibly方法,添加到等待队列,AQS里面的方法。① 该方法首先会在队列中添加一个共享模式的节点。② 一段自旋操作,获取到该节点node的前一个节点p,若是p节点正好为head节点,调用tryAcquireShared方法尝试获取共享锁(state0返回1,否则返回-1),若是获取共享锁成功,调用setHeadAndPropagate方法,该方法可能会唤醒当前线程。 ③ 若是上一步操做获取共享锁失败,首先调用shouldParkAfterFailedAcquire方法判断当前线程是否可以被挂起。若是该方法返回值为true,调用parkAndCheckInterrupt方法挂起当前线程,并返回当前线程的中断标识。若是中断标识为true,立即抛出中断异常。

// CountDownLatch
// 1.委托给sync的acquireSharedInterruptibly方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS
// 2.尝试获取,若获取不到在调用doAcquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 若是当前线程已经被中断,直接抛出中断异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 若是当前 state的值不为0
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
// AQS
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// CountDownLatch-Sync
protected int tryAcquireShared(int acquires) {
// 若是当前 state 的值为0,返回1,否则返回-1
return (getState() == 0) ? 1 : -1;
}

// AQS->获取共享锁的方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 将当前线程封装成为 Node 节点,添加到等待队列(此时waitStatus值为0)
final Node node = addWaiter(Node.SHARED);
// 判断是否执行失败的标识,失败时会将该节点清除
boolean failed = true;
try {
for (;;) {
// 获取 node 节点的前一个节点p
final Node p = node.predecessor();
// 若是节点p为等待队列的头节点
if (p == head) {
// 若是p为头节点,尝试一下获取共享锁(具体逻辑由Sync实现)
int r = tryAcquireShared(arg);
// r >= 0 证明 state 的值已经为 0,即获取到了共享锁。CountDownLatch:Sync返回1
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 1. 当获取锁失败后判断是否需要挂起线程
// 2. 若是需要挂起线程,则挂起线程并检查中断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
// 若是执行失败,调用 cancelAcquire(Node node)方法将这个节点从等待队列中删除
if (failed) {
cancelAcquire(node);
}
}
}
// 在自旋操作期间成功获取到共享锁执行的操作(此处传入的propagate值为1,节点为当前节点新加入队列的节点)
private void setHeadAndPropagate(Node node, int propagate) {
// 首先保留原始头节点,然后将当前的节点设置为头节点
Node h = head;
setHead(node);
// 已经将node节点设置为了头结点,需要唤醒下一个节点啦
// 1.若是传入的值大于0
// 2.若是传入的值小于等于0,原始头结点为null
// 3.若是传入的值小于等于0,原始头节点不为null,原始头结点的waitStatus状态值小于0
// 4.给h重新赋了一次值后h为null
// 5.给h重新赋了一次值后h不为null,其waitStatus状态值小于0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取传入节点的下一个节点s
Node s = node.next;
// 若是s为null或者是s节点是共享模式,执行doReleaseShared方法
if (s == null || s.isShared()) {
doReleaseShared();
}
}
}


private void doReleaseShared() {
for (;;) {
// 首先保存头结点
Node h = head;
// 若是头结点不为空并且头结点不等于尾节点(证明不只有一个节点)
if (h != null && h != tail) {
// 获取头结点的waitStatus状态值
int ws = h.waitStatus;
// 若是该状态值为SIGNAL
if (ws == Node.SIGNAL) {
// 尝试将节点h的waitStatus状态值由SIGNAL修改为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
// 若是修改成功,执行unparkSuccessor方法,唤醒当前节点的下一个节点
unparkSuccessor(h);
// 若是该状态值为0,将其设置为PROPAGATE
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
}
if (h == head) {
break;
}
}
}

// 唤醒node节点的下一个节点
private void unparkSuccessor(Node node) {
// ws小于0,存在三种状态
// SIGNAL(-1) CONDITION(-2) PROPAGATE(-3)
int = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 获取node节点的下一个节点
Node s = node.next;
// 去除一些无用的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 唤醒操作
if (s != null) {
LockSupport.unpark(s.thread);
}
}