🔸 CountDownLatch
2022年10月10日
- Java
🔸 CountDownLatch
1. 类注释
- 基于
AQS
实现 - 使用给定的计数初始化 CountDownLatch。
- 等待方法阻塞,直到当前计数由于调用倒计时方法而达到零,之后释放所有等待线程,并立即返回任何后续的等待调用。
- 这是一种一次性现象——计数无法重置。如果需要重置计数的版本,请考虑使用 CyclicBarrier。
- 比
join
的优点在于,join
在使用 线程池 的情况下,无法使用,它只能使用Thread.join()
实现 - 示例用法 1:
- 这里有一对类,其中一组工作线程使用两个倒计时锁存器:
- 第一个是启动信号,它阻止任何工人继续工作,直到驾驶员准备好让他们继续工作;
- 第二个是完成信号,允许驾驶员等待所有工人完成。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1); // 表示只需要驾驶员下达一次指令
CountDownLatch doneSignal = new CountDownLatch(N); // 只有减了 N 次才会继续执行,表示 N 个工人全部完成工作
// 创建 N 个线程,表示 N 个工人
for (int i = 0; i < N; ++i)
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // 下达开始 指令,使得 startSignal 降到 0,所有线程执行 run()
doSomethingElse();
doneSignal.await(); // 等待所有 工人 执行完毕
}
}
// 工人线程
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
- 示例用法 2:
- 将问题划分为N个部分,每个 Runnable 执行其中一部分,并将所有 Runnable 排队给执行器。
- 当所有子部分完成时,协调线程将能够通过等待。(若线程必须以这种方式重复倒计时时,请考虑使用 CyclicBarrier。)
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ... // 创建线程池
// 创建 N 个线程,将工作的 每个小部分由 一个线程完成,将 N 个线程提交给线程池执行
for (int i = 0; i < N; ++i)
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // 阻塞,直到所有工作执行完成
}
}
// 自定义工作线程
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i); // 做整个工作的第 i 部分
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
2. 类图
public class CountDownLatch {
// ...
}
3. 内部类
/**
* 继承 AQS 实现
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 构造函数
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
4. 属性
private final Sync sync;
5. 构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
6. 常见方法
/**
* 当前线程等待,直到 count == 0,除非线程被中断
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 当前线程等待,直到 count == 0,除非线程被中断 | 等待时长达到
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* count --
*/
public void countDown() {
sync.releaseShared(1);
}
// 只有 get 而没有 set 方法,说明当 CountDownLatch 降到 0 时,无法重置,因此只能使用一次
public long getCount() {
return sync.getCount();
}