🔵 AQS
2022年6月20日
- Java
🔵 AQS
1. 类注释
概述
- 为阻塞锁 & 同步器(举例:信号量)提供了一种框架,它依赖于一个 FIFO 等待队列
- 该类借助一个原子类 int 来表示状态,可以成为多种同步器的基础
- 该类的子类必须实现本类的 protected 方法来改变这个状态值
两种模式
exclusive
独占模式- 只能有一个线程获取
shared
共享模式- 可以被多个线程同时获取
2. 类图
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// .......
}
3. 内部类
/**
* 等待队列的节点类
*
* 该等待队列是 "CLH" (Craig, Landin, and Hagersten) 的变形
* CLH 通常用来自旋,能确保无饥饿性,提供先来先服务的公平性
* CLH 锁也是一种基于链表的可扩展、高性能、公平的自旋锁,
* 申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
*
* 添加节点,这个节点将成为新的 tail,弹出节点,需要重新设置 head
*
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
*
* 当一个节点被取消,那么它的继任者节点将被重新连接到一个没有被取消的前驱者身后
* 虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系
*/
static final class Node {
static final Node SHARED = new Node(); //标识共享锁
static final Node EXCLUSIVE = null; // 标识独占锁
/** 表示当前结点已取消调度。当超时或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 */
static final int CANCELLED = 1;
/** 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。 */
static final int SIGNAL = -1;
/** 表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 结点将从等待队列转移到同步队列中 */
static final int CONDITION = -2;
/** 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 */
static final int PROPAGATE = -3;
/**
* 在Node节点中一般通过 waitStatus 获得节点不同的状态
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 将节点入队的线程,创建时被初始化,使用之后变为 null
*/
volatile Thread thread;
/**
* 条件队列中等待的下一个节点,在共享模式下为 SHARED
* 只有在独占模式下,才会有条件队列
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回当前节点的前驱节点,若前驱节点为空,抛异常
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
4. 属性
/**
* 等待队列队头,懒初始化
* 只有 setHead 方法才会对其进行修改
*/
private transient volatile Node head;
/**
* 等待队列队尾,懒初始化
* 只有通过 enq 方法添加新等待节点时才会对其进行修改
*/
private transient volatile Node tail;
/**
* 同步状态,在子类重写各种 try() 时会进行 state 的 get() & set()
* 在不同子类中,可充当不同作用,例:
* Rel 中表示 重入次数
* Semaphore 中表示 还可以通行的令牌数
*/
private volatile int state;
/**
* 返回当前状态
*/
protected final int getState() {
return state;
}
/**
* 设置当前状态
*/
protected final void setState(int newState) {
state = newState;
}
/**
* CAS 修改 state 值
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// Queuing utilities
/**
* 小于这个值,单位为纳秒,时,更倾向于自旋,大于这个值,更倾向于 park() 阻塞线程
*/
static final long spinForTimeoutThreshold = 1000L;
5. 模版方法
模版方法被 final 修饰,不可被重写
1) acquire(int arg)
/**
* 独占模式下获取锁,不响应中断
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// 1. 当前尝试获取锁失败
// 2. 将当前线程封装成一个独占节点,并将其加入 CLH 队尾成功
// 3. 自旋,只有 前置节点为头节点 && 当前尝试获锁成功 才返回,若返回 true,说明当前线程被中断
// 满足以上条件后,中断当前线程
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 将节点加入 CLH 队尾,CAS 保证同步
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 尾节点非空,在尾节点后添加 node 作为新的尾节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空,执行 enq,将 node 节点插入到队列
enq(node);
return node;
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 将 node 节点插入到队列
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 之前没有任何节点 - 初始化
if (compareAndSetHead(new Node()))
tail = head; // head == tail == new Node()
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 只有 前置节点为头节点 && 尝试获锁成功 才返回,若返回 true,说明当前线程被中断;若 false,说明未被中断
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // p 为前置节点
// 前置节点为头节点 && 尝试获锁成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前置节点状态为 signal && 阻塞当前线程 && 当前线程被中断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 需要被子类重写的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 根据前置节点状态判断当前节点是否需要 park 阻塞,只有前置节点状态为 signal 时,才会返回 true
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前置节点正等待被 signal 唤醒,因此当前节点同样需要阻塞
*/
return true;
if (ws > 0) {
/*
* 前置节点因为某些原本被 cancel,因此需要再往前继续查找,直到遇到非 cancel 掉的节点,将其后置节点设为当前节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 前置节点状态 == 0 | 3,此时需要设置前置节点状态为 signal,表示前置节点等待被唤醒
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞当前线程
return Thread.interrupted(); // 返回线程是否被中断,同时清空中断标志位
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
/**
* 取消当前节点的 acquire 过程
*
* @param node the node
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0) // 前置节点处于 cancel 状态
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED; // 当前节点状态为 cancel
// 当前节点就是 tail 那么设置 tail 节点为上一个节点,CAS 删除当前节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**
* 中断当前线程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
2) acquireInterruptibly(int arg)
/**
* 独占模式下获取锁,支持中断
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 当前线程被中断,同时清空中断标志位,抛出异常
// 当前线程尝试获锁失败
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* 当前线程被中断时,立即抛出异常,因此为 支持中断模式
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3) acquireShared(int arg)
/**
* 共享模式下获取变量,忽略中断
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 等待子类重写
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 在 CLH 队列尾部添加 共享节点,返回当前节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前置节点为 头节点
if (p == head) {
// 当前线程再次尝试获锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 设置头节点,唤醒头节点的一个后继结点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 设置头节点,唤醒头节点的一个后继结点
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 暂存原头节点
setHead(node); // 当前节点设为头节点
// 满足以下任一条件,会尝试唤醒其后续节点
// 1. propagate > 0
// 2. 原 头节点为空
// 3. 原 头节点状态 < 0 --> 处于 signal | condition | propagate 状态
// 4. 当前 头节点为空
// 5. 当前 头节点状态 < 0
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 后续节点
if (s == null || s.isShared())
doReleaseShared(); // 唤醒头节点的一个后继结点
}
}
/**
* 设置头节点
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 唤醒头节点的一个后继结点
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点状态为 signal
if (ws == Node.SIGNAL) {
// 尝试将头节点状态设置为 0,设置失败,跳过当前循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 设置成功,唤醒一个头节点的后继结点
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 头节点状态 == 0 && 设置头节点状态为 PROPAGATE 失败,跳出当前循环,重试
}
// head == null || head == tail 的情况下
if (h == head) // loop if head changed
break;
}
}
/**
* 唤醒一个满足条件的 后继结点,如果能找到的话
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 要唤醒后继结点,那么当前节点状态必定 >= 0,若否,CAS 设置其状态为 0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后继结点 == null || 后继结点状态为 cancel
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找到首个非 cancel 状态的节点
// 这里不能从 s 往后找,因为 s == null,那么肯定无法执行 null.next
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到满足条件的后继结点,解除其阻塞状态
if (s != null)
LockSupport.unpark(s.thread);
}
4) acquireSharedInterruptibly(int arg)
/**
* 共享模式下的获锁,响应中断
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程发生中断,立即抛出异常
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5) release(int arg)
/**
* 在独占模式下释放共享变量
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 尝试释放锁成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒一个满足条件的 后继结点
return true;
}
return false;
}
// 独占模式下,尝试释放锁,等待子类重写
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
6) releaseShared(int arg)
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 共享模式下的尝试释放锁,需要子类重写
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 唤醒头节点的一个后继结点
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点状态为 signal
if (ws == Node.SIGNAL) {
// 尝试将头节点状态设置为 0,设置失败,跳过当前循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 设置成功,唤醒一个头节点的后继结点
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 头节点状态 == 0 && 设置头节点状态为 PROPAGATE 失败,跳出当前循环,重试
}
// head == null || head == tail 的情况下
if (h == head) // loop if head changed
break;
}
}