🔸 CountDownLatch

吞佛童子2022年10月10日
  • Java
  • concurrency
大约 3 分钟

🔸 CountDownLatch

1. 类注释

  • 基于 AQS 实现
  • 使用给定的计数初始化 CountDownLatch。
  • 等待方法阻塞,直到当前计数由于调用倒计时方法而达到零,之后释放所有等待线程,并立即返回任何后续的等待调用。
  • 这是一种一次性现象——计数无法重置。如果需要重置计数的版本,请考虑使用 CyclicBarrier。
  • join 的优点在于,join 在使用 线程池 的情况下,无法使用,它只能使用 Thread.join() 实现
  • 示例用法 1:
    • 这里有一对类,其中一组工作线程使用两个倒计时锁存器:
    • 第一个是启动信号,它阻止任何工人继续工作,直到驾驶员准备好让他们继续工作;
    • 第二个是完成信号,允许驾驶员等待所有工人完成。
class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1); // 表示只需要驾驶员下达一次指令
     CountDownLatch doneSignal = new CountDownLatch(N); // 只有减了 N 次才会继续执行,表示 N 个工人全部完成工作

     // 创建 N 个线程,表示 N 个工人
     for (int i = 0; i < N; ++i) 
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // 下达开始 指令,使得 startSignal 降到 0,所有线程执行 run()
     doSomethingElse();
     doneSignal.await();           // 等待所有 工人 执行完毕
   }
 }
 
 // 工人线程
 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }
  • 示例用法 2:
    • 将问题划分为N个部分,每个 Runnable 执行其中一部分,并将所有 Runnable 排队给执行器。
    • 当所有子部分完成时,协调线程将能够通过等待。(若线程必须以这种方式重复倒计时时,请考虑使用 CyclicBarrier。)
 class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ... // 创建线程池

     // 创建 N 个线程,将工作的 每个小部分由 一个线程完成,将 N 个线程提交给线程池执行
     for (int i = 0; i < N; ++i) 
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // 阻塞,直到所有工作执行完成
   }
 }

 // 自定义工作线程
 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   
   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }
   public void run() {
     try {
       doWork(i); // 做整个工作的第 i 部分
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

2. 类图

public class CountDownLatch {
    // ...
}

img_7.png


3. 内部类

    /**
     * 继承 AQS 实现
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        // 构造函数
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

4. 属性

    private final Sync sync;

5. 构造函数

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

6. 常见方法

    /**
     * 当前线程等待,直到 count == 0,除非线程被中断
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 当前线程等待,直到 count == 0,除非线程被中断 | 等待时长达到
     */
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * count --
     */
    public void countDown() {
        sync.releaseShared(1);
    }
    
    // 只有 get 而没有 set 方法,说明当 CountDownLatch 降到 0 时,无法重置,因此只能使用一次
    public long getCount() {
        return sync.getCount();
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou