🔹 StampedLock
2022年10月10日
- Java
🔹 StampedLock
1. 类注释
三种锁:
- 写锁
- 独占 & 不可重入
- 相同时间只能有一个线程获取锁,其他线程请求读锁和写锁都会被阻塞,当前没有线程持有读锁或写锁的时候才可以获得获取到该锁
- 读锁
- 悲观读锁是一个共享锁,没有线程占用写锁的情况下,多个线程可以同时获取读锁。如果其他线程已经获得了写锁,则阻塞当前线程
- 乐观读锁
- 在操作数据前并没有加锁(使用CAS方式更新锁的状态),而是采用试探的方式。只要当前没有写锁就可以获得一个非0的stamp,如果已经存在写锁则返回一个为0的stamp
- 由于没有使用
CAS
方法,也没有真正的加锁,所以并发性能要比 readLock 还要高。 - 但是由于没有使用真正的锁,如果数据中途被修改,就会造成数据不一致问题。
- 如果某个线程已经获取了写锁,这时候再尝试获取乐观锁也是可以获取的,只是得到的stamp为0,无法通过validate验证。
- 如果因为其他线程增加写锁,则会导致stamp发生变化,从而validate失败。这种情况下需要重新获取乐观读锁。
使用
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // 写锁模式
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // 读模式
long stamp = sl.tryOptimisticRead(); // 尝试乐观读
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { // 乐观读不满足条件,升级为 普通读
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) { // 锁的升级
long stamp = sl.readLock(); // 普通读
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp); // 升级为 写锁
if (ws != 0L) { // 升级成功,替换 标记戳
stamp = ws;
x = newX;
y = newY;
break;
}
else { // 升级失败,释放读锁,重新生成 读锁 标记戳,循环
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
2. 类图
public class StampedLock implements java.io.Serializable {
// 构造函数
public StampedLock() {
state = ORIGIN;
}
}
3. 属性
private static final long serialVersionUID = -6001602636862214147L;
/** 处理器数量,用于自旋控制 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 入队 前的最大重试次数 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** aqcuire 阻塞前的最大重试次数 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** 重新阻塞前的最大重试次数 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** 等待溢出自旋锁时的让步周期 */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
/** 溢出前用于 读锁 的位数 */
private static final int LG_READERS = 7;
// 锁定状态和标记操作的值
private static final long RUNIT = 1L;
private static final long WBIT = 1L << LG_READERS; // 写锁占据除了 低 7 位以外的高位
private static final long RBITS = WBIT - 1L; // 读锁占据 低 7 位
private static final long RFULL = RBITS - 1L;
private static final long ABITS = RBITS | WBIT;
private static final long SBITS = ~RBITS; // note overlap with ABITS
// 锁定状态的初始值;避免失败值为零
private static final long ORIGIN = WBIT << 1;
// 取消获取方法的特殊值,因此调用者可以抛出 IE
private static final long INTERRUPTED = 1L;
// 节点状态的值
private static final int WAITING = -1;
private static final int CANCELLED = 1;
// 节点的模式(int 不是布尔值以允许算术)
private static final int RMODE = 0;
private static final int WMODE = 1;
/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
/** Lock sequence/state */
private transient volatile long state;
/** 读锁已满时,这个记录超出读锁最大个数的 个数 */
private transient int readerOverflow;
4. 内部类
// 读锁
final class ReadLockView implements Lock {
public void lock() { readLock(); } // 调用主类的 方法
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
public boolean tryLock() { return tryReadLock() != 0L; } // 不可重入
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockRead(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
// 写锁
final class WriteLockView implements Lock {
public void lock() { writeLock(); }
public void lockInterruptibly() throws InterruptedException {
writeLockInterruptibly();
}
public boolean tryLock() { return tryWriteLock() != 0L; } // 不可重入
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryWriteLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockWrite(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
// 读写锁
final class ReadWriteLockView implements ReadWriteLock {
public Lock readLock() { return asReadLock(); }
public Lock writeLock() { return asWriteLock(); }
}
5. 常用方法
1) 写锁相关
public long writeLock() {
long s, next;
// state == 0 说明没有线程占据
// CAS 修改 state ++
// 修改成功,则返回 state ++ 的值
// 修改失败,执行 acquireWrite(false, 0L) 自旋入队
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
// 尝试获取 写锁
// 获取成功,返回 state ++ 的值;获取失败,返回 0
public long tryWriteLock() {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : 0L);
}
// 指定时间内尝试获取写锁
public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
long next, deadline;
if ((next = tryWriteLock()) != 0L)
return next; // 尝试获取成功,返回
if (nanos <= 0L)
return 0L; // 等待时间到,返回
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L; // 获取当前剩余等待时间
if ((next = acquireWrite(true, deadline)) != INTERRUPTED) // 指定时间内获锁
return next;
}
throw new InterruptedException();
}
// 独占获取锁,必要时阻塞,直到可用或当前线程被中断。
public long writeLockInterruptibly() throws InterruptedException {
long next;
if (!Thread.interrupted() &&
(next = acquireWrite(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}
// 释放写锁
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
// 尝试释放写锁
public boolean tryUnlockWrite() {
long s; WNode h;
if (((s = state) & WBIT) != 0L) {
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return true;
}
return false;
}
2) 读锁相关
public long readLock() {
long s = state, next;
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
public long tryReadLock() {
for (;;) {
long s, m, next;
if ((m = (s = state) & ABITS) == WBIT) // 超出读锁最大占用数
return 0L;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
} // m == RFULL,用 readerOverflow 记录超出范围的读锁数量
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}
public long readLockInterruptibly() throws InterruptedException {
long next;
if (!Thread.interrupted() &&
(next = acquireRead(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}
// 释放读锁
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
// 尝试释放读锁
public boolean tryUnlockRead() {
long s, m; WNode h;
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return true;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}
3) 乐观锁相关
public long tryOptimisticRead() {
long s;
// 没有被写锁占据,返回 一个标记戳,用于后续的 validate;否则,返回 0
// 没有加锁,没有 CAS 操作,之间返回当前生成的 标记戳,需要和 validate 搭配使用
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
// 根据给定 标记戳,和当前状态得到的 标记戳,对比,如果期间没有改变,则说明成功获取 乐观锁
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
// 如果锁状态与给定的标记匹配,则释放相应的锁模式。
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L)
break;
else if (m == WBIT) {
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}
// 如果锁定状态与给定标记匹配,则执行以下操作之一。
// 1. 如果标记表示持有写锁,则返回它。
// 2. 如果读锁,如果写锁可用,则释放读锁并返回写戳。
// 3. 如果是乐观读取,则仅在立即可用时才返回写入戳记。
// 4. 在所有其他情况下,此方法返回零。
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
else if (m == WBIT) {
if (a != m)
break;
return stamp;
}
else if (m == RUNIT && a != 0L) {
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
return next;
}
else
break;
}
return 0L;
}
// 如果锁定状态与给定标记匹配,则执行以下操作之一。
// 1. 如果标记表示持有写锁,则释放它并获得读锁。
// 2. 如果是读锁,则返回它。
// 3. 如果是乐观读取,则获取读取锁并仅在立即可用时返回读取标记。
// 4. 在所有其他情况下,此方法返回零。
public long tryConvertToReadLock(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = s + (WBIT + RUNIT);
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a != 0L && a < WBIT)
return stamp;
else
break;
}
return 0L;
}
// 如果锁定状态与给定标记匹配,则
// 1. 如果标记表示持有锁,则释放它并返回观察标记。
// 2. 如果是乐观读取,则在验证后返回。
// 3. 此方法在所有其他情况下都返回零,因此可用作“tryUnlock”的一种形式。
public long tryConvertToOptimisticRead(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
U.loadFence();
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS))
break;
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
return s;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return next & SBITS;
}
}
else if ((next = tryDecReaderOverflow(s)) != 0L)
return next & SBITS;
}
return 0L;
}