🔶 CyclicBarrier

吞佛童子2022年10月10日
  • Java
  • concurrency
大约 2 分钟

🔶 CyclicBarrier

1. 类注释

  1. 基于 Rel 实现 非公平锁

  2. 示例用法:

    • 以下是在并行分解设计中使用屏障的示例:
 class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;

   // 自定义工作线程
   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await();
         } catch (InterruptedException ex) {
           return;
         } catch (BrokenBarrierException ex) {
           return;
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length; // 行数
     Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); }}; // count == 0 时执行的汇总任务
     barrier = new CyclicBarrier(N, barrierAction); // 每个线程执行 一行任务的多个任务,每行全部执行完成后,执行 barrierAction 任务

     List<Thread> threads = new ArrayList<Thread>(N);
     for (int i = 0; i < N; i++) {
       Thread thread = new Thread(new Worker(i)); // 每个线程执行一行任务
       threads.add(thread);
       thread.start();
     }

     // wait until done
     for (Thread thread : threads)
       thread.join();
   }
 }

2. 类图

public class CyclicBarrier {
    // ...
}

img_8.png


3. 属性

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition(); // 条件变量

    private final int parties; // 初始总数
    private final Runnable barrierCommand; // 被触发时,会执行的 任务
    private int count; // 当前等待的数量,当 count == 0 时,会进入下一代
    
    private Generation generation = new Generation(); // 当前分代
    // 静态内部类
    private static class Generation {
        boolean broken = false;
    }
    private void nextGeneration() {
        trip.signalAll(); // 唤醒 条件队列中的所有线程
        count = parties; // 重置 count
        generation = new Generation(); // 进入下一个分代
    }

4. 构造函数

    /**
     * 指定 parties & 触发时执行的任务 
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * 只指定 parties
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

5. 常见方法

    /**
     * count --,返回 count
     *
     * @return 返回当前 count
     * @throws InterruptedException 当前线程在等待时被中断
     * @throws BrokenBarrierException 当前线程在等待时,有其他线程被中断 | 等待超时 | CyclicBarrier 被重置 | CyclicBarrier 处于 broken
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    /**
     * 等待,直到 count == 0
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
    /**
     * 核心代码
     */
    private int dowait(boolean timed, long nanos)
                            throws InterruptedException, 
                                   BrokenBarrierException,
                                   TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier(); // 设置当前代 broken 状态为 true & 重置 count & 唤醒所有等待的线程
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // count == 0 时,执行 runnable 任务
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run(); // 有要执行的 runnable 任务,执行
                    ranAction = true;
                    nextGeneration(); // 进入下一代
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 循环,直到 count == 0 | 处于 broken 状态 | 被中断 | 超时
            for (;;) {
                try {
                    if (!timed) // 不允许超时,加入条件队列中
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index; // 返回还需要等待的线程数

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 设置当前代 broken 状态为 true & 重置 count & 唤醒所有等待的线程
     */
    private void breakBarrier() {
        generation.broken = true; // 设置当前代 broken 状态为 true
        count = parties; // 重置 count
        trip.signalAll(); // 唤醒所有等待的线程
    }
    
    // 重置
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou