🔶 CyclicBarrier
2022年10月10日
- Java
🔶 CyclicBarrier
1. 类注释
基于
Rel
实现 非公平锁示例用法:
- 以下是在并行分解设计中使用屏障的示例:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
// 自定义工作线程
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length; // 行数
Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); }}; // count == 0 时执行的汇总任务
barrier = new CyclicBarrier(N, barrierAction); // 每个线程执行 一行任务的多个任务,每行全部执行完成后,执行 barrierAction 任务
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i)); // 每个线程执行一行任务
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}
2. 类图
public class CyclicBarrier {
// ...
}
3. 属性
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition(); // 条件变量
private final int parties; // 初始总数
private final Runnable barrierCommand; // 被触发时,会执行的 任务
private int count; // 当前等待的数量,当 count == 0 时,会进入下一代
private Generation generation = new Generation(); // 当前分代
// 静态内部类
private static class Generation {
boolean broken = false;
}
private void nextGeneration() {
trip.signalAll(); // 唤醒 条件队列中的所有线程
count = parties; // 重置 count
generation = new Generation(); // 进入下一个分代
}
4. 构造函数
/**
* 指定 parties & 触发时执行的任务
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* 只指定 parties
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
5. 常见方法
/**
* count --,返回 count
*
* @return 返回当前 count
* @throws InterruptedException 当前线程在等待时被中断
* @throws BrokenBarrierException 当前线程在等待时,有其他线程被中断 | 等待超时 | CyclicBarrier 被重置 | CyclicBarrier 处于 broken
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* 等待,直到 count == 0
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/**
* 核心代码
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); // 设置当前代 broken 状态为 true & 重置 count & 唤醒所有等待的线程
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // count == 0 时,执行 runnable 任务
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 有要执行的 runnable 任务,执行
ranAction = true;
nextGeneration(); // 进入下一代
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 循环,直到 count == 0 | 处于 broken 状态 | 被中断 | 超时
for (;;) {
try {
if (!timed) // 不允许超时,加入条件队列中
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index; // 返回还需要等待的线程数
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* 设置当前代 broken 状态为 true & 重置 count & 唤醒所有等待的线程
*/
private void breakBarrier() {
generation.broken = true; // 设置当前代 broken 状态为 true
count = parties; // 重置 count
trip.signalAll(); // 唤醒所有等待的线程
}
// 重置
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}