🔹 StampedLock

吞佛童子2022年10月10日
  • Java
  • Concurrency
大约 8 分钟

🔹 StampedLock

1. 类注释

三种锁:

  • 写锁
    • 独占 & 不可重入
    • 相同时间只能有一个线程获取锁,其他线程请求读锁和写锁都会被阻塞,当前没有线程持有读锁或写锁的时候才可以获得获取到该锁
  • 读锁
    • 悲观读锁是一个共享锁,没有线程占用写锁的情况下,多个线程可以同时获取读锁。如果其他线程已经获得了写锁,则阻塞当前线程
  • 乐观读锁
    • 在操作数据前并没有加锁(使用CAS方式更新锁的状态),而是采用试探的方式。只要当前没有写锁就可以获得一个非0的stamp,如果已经存在写锁则返回一个为0的stamp
    • 由于没有使用 CAS 方法,也没有真正的加锁,所以并发性能要比 readLock 还要高。
    • 但是由于没有使用真正的锁,如果数据中途被修改,就会造成数据不一致问题。
    • 如果某个线程已经获取了写锁,这时候再尝试获取乐观锁也是可以获取的,只是得到的stamp为0,无法通过validate验证。
    • 如果因为其他线程增加写锁,则会导致stamp发生变化,从而validate失败。这种情况下需要重新获取乐观读锁。

使用

 class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();

   void move(double deltaX, double deltaY) { // 写锁模式
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }

   double distanceFromOrigin() { // 读模式
     long stamp = sl.tryOptimisticRead(); // 尝试乐观读
     double currentX = x, currentY = y;
     if (!sl.validate(stamp)) { // 乐观读不满足条件,升级为 普通读
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

   void moveIfAtOrigin(double newX, double newY) { // 锁的升级
     long stamp = sl.readLock(); // 普通读
     try {
       while (x == 0.0 && y == 0.0) {
         long ws = sl.tryConvertToWriteLock(stamp); // 升级为 写锁
         if (ws != 0L) { // 升级成功,替换 标记戳
           stamp = ws;
           x = newX;
           y = newY;
           break;
         }
         else { // 升级失败,释放读锁,重新生成 读锁 标记戳,循环
           sl.unlockRead(stamp);
           stamp = sl.writeLock();
         }
       }
     } finally {
       sl.unlock(stamp);
     }
   }
 }

2. 类图

public class StampedLock implements java.io.Serializable {
    // 构造函数
    public StampedLock() {
        state = ORIGIN;
    }
}

3. 属性

    private static final long serialVersionUID = -6001602636862214147L;

    /** 处理器数量,用于自旋控制 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** 入队 前的最大重试次数 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

    /** aqcuire 阻塞前的最大重试次数 */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

    /** 重新阻塞前的最大重试次数 */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

    /** 等待溢出自旋锁时的让步周期 */
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

    /** 溢出前用于 读锁 的位数 */
    private static final int LG_READERS = 7;

    // 锁定状态和标记操作的值
    private static final long RUNIT = 1L;
    private static final long WBIT  = 1L << LG_READERS; // 写锁占据除了 低 7 位以外的高位
    private static final long RBITS = WBIT - 1L; // 读锁占据 低 7 位
    private static final long RFULL = RBITS - 1L;
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    // 锁定状态的初始值;避免失败值为零
    private static final long ORIGIN = WBIT << 1;

    // 取消获取方法的特殊值,因此调用者可以抛出 IE
    private static final long INTERRUPTED = 1L;

    // 节点状态的值
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;

    // 节点的模式(int 不是布尔值以允许算术)
    private static final int RMODE = 0;
    private static final int WMODE = 1;

    /** Wait nodes */
    static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;

    // views
    transient ReadLockView readLockView;
    transient WriteLockView writeLockView;
    transient ReadWriteLockView readWriteLockView;

    /** Lock sequence/state */
    private transient volatile long state;
    /** 读锁已满时,这个记录超出读锁最大个数的 个数 */
    private transient int readerOverflow;

4. 内部类

    // 读锁
    final class ReadLockView implements Lock {
        public void lock() { readLock(); } // 调用主类的 方法
        public void lockInterruptibly() throws InterruptedException {
            readLockInterruptibly();
        }
        public boolean tryLock() { return tryReadLock() != 0L; } // 不可重入
        public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
            return tryReadLock(time, unit) != 0L;
        }
        public void unlock() { unstampedUnlockRead(); }
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }

    // 写锁
    final class WriteLockView implements Lock {
        public void lock() { writeLock(); }
        public void lockInterruptibly() throws InterruptedException {
            writeLockInterruptibly();
        }
        public boolean tryLock() { return tryWriteLock() != 0L; } // 不可重入
        public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
            return tryWriteLock(time, unit) != 0L;
        }
        public void unlock() { unstampedUnlockWrite(); }
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }

    // 读写锁
    final class ReadWriteLockView implements ReadWriteLock {
        public Lock readLock() { return asReadLock(); }
        public Lock writeLock() { return asWriteLock(); }
    }

5. 常用方法

1) 写锁相关

    public long writeLock() {
        long s, next;  
        // state == 0 说明没有线程占据
        // CAS 修改 state ++
        // 修改成功,则返回 state ++ 的值
        // 修改失败,执行 acquireWrite(false, 0L) 自旋入队
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : acquireWrite(false, 0L));
    }
    
    // 尝试获取 写锁
    // 获取成功,返回 state ++ 的值;获取失败,返回 0
    public long tryWriteLock() {
        long s, next;
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : 0L);
    }
    
    // 指定时间内尝试获取写锁
    public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(time);
        if (!Thread.interrupted()) {
            long next, deadline;
            if ((next = tryWriteLock()) != 0L)
                return next; // 尝试获取成功,返回
            if (nanos <= 0L)
                return 0L; // 等待时间到,返回
            if ((deadline = System.nanoTime() + nanos) == 0L)
                deadline = 1L; // 获取当前剩余等待时间
            if ((next = acquireWrite(true, deadline)) != INTERRUPTED) // 指定时间内获锁
                return next;
        }
        throw new InterruptedException();
    }
    
    // 独占获取锁,必要时阻塞,直到可用或当前线程被中断。
    public long writeLockInterruptibly() throws InterruptedException {
        long next;
        if (!Thread.interrupted() &&
            (next = acquireWrite(true, 0L)) != INTERRUPTED)
            return next;
        throw new InterruptedException();
    }
    
    // 释放写锁
    public void unlockWrite(long stamp) {
        WNode h;
        if (state != stamp || (stamp & WBIT) == 0L)
            throw new IllegalMonitorStateException();
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        if ((h = whead) != null && h.status != 0)
            release(h);
    }
    
    // 尝试释放写锁
    public boolean tryUnlockWrite() {
        long s; WNode h;
        if (((s = state) & WBIT) != 0L) {
            state = (s += WBIT) == 0L ? ORIGIN : s;
            if ((h = whead) != null && h.status != 0)
                release(h);
            return true;
        }
        return false;
    }

2) 读锁相关

    public long readLock() {
        long s = state, next;  
        return ((whead == wtail && (s & ABITS) < RFULL &&
                 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }
    
    public long tryReadLock() {
        for (;;) {
            long s, m, next;
            if ((m = (s = state) & ABITS) == WBIT) // 超出读锁最大占用数
                return 0L;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            } // m == RFULL,用 readerOverflow 记录超出范围的读锁数量
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
    }
    
    public long readLockInterruptibly() throws InterruptedException {
        long next;
        if (!Thread.interrupted() &&
            (next = acquireRead(true, 0L)) != INTERRUPTED)
            return next;
        throw new InterruptedException();
    }
    
    // 释放读锁
    public void unlockRead(long stamp) {
        long s, m; WNode h;
        for (;;) {
            if (((s = state) & SBITS) != (stamp & SBITS) ||
                (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                throw new IllegalMonitorStateException();
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    break;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                break;
        }
    }
    
    // 尝试释放读锁
    public boolean tryUnlockRead() {
        long s, m; WNode h;
        while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return true;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                return true;
        }
        return false;
    }

3) 乐观锁相关

    public long tryOptimisticRead() {
        long s;
        // 没有被写锁占据,返回 一个标记戳,用于后续的 validate;否则,返回 0
        // 没有加锁,没有 CAS 操作,之间返回当前生成的 标记戳,需要和 validate 搭配使用
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }
    
    // 根据给定 标记戳,和当前状态得到的 标记戳,对比,如果期间没有改变,则说明成功获取 乐观锁
    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }
    
    // 如果锁状态与给定的标记匹配,则释放相应的锁模式。
    public void unlock(long stamp) {
        long a = stamp & ABITS, m, s; WNode h;
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L)
                break;
            else if (m == WBIT) {
                if (a != m)
                    break;
                state = (s += WBIT) == 0L ? ORIGIN : s;
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
            else if (a == 0L || a >= WBIT)
                break;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                return;
        }
        throw new IllegalMonitorStateException();
    }
    
    // 如果锁定状态与给定标记匹配,则执行以下操作之一。
    // 1. 如果标记表示持有写锁,则返回它。
    // 2. 如果读锁,如果写锁可用,则释放读锁并返回写戳。
    // 3. 如果是乐观读取,则仅在立即可用时才返回写入戳记。
    // 4. 在所有其他情况下,此方法返回零。
    public long tryConvertToWriteLock(long stamp) {
        long a = stamp & ABITS, m, s, next;
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                    return next;
            }
            else if (m == WBIT) {
                if (a != m)
                    break;
                return stamp;
            }
            else if (m == RUNIT && a != 0L) {
                if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
                    return next;
            }
            else
                break;
        }
        return 0L;
    }
    
    // 如果锁定状态与给定标记匹配,则执行以下操作之一。
    // 1. 如果标记表示持有写锁,则释放它并获得读锁。
    // 2. 如果是读锁,则返回它。
    // 3. 如果是乐观读取,则获取读取锁并仅在立即可用时返回读取标记。
    // 4. 在所有其他情况下,此方法返回零。
    public long tryConvertToReadLock(long stamp) {
        long a = stamp & ABITS, m, s, next; WNode h;
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                else if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                        return next;
                }
                else if ((next = tryIncReaderOverflow(s)) != 0L)
                    return next;
            }
            else if (m == WBIT) {
                if (a != m)
                    break;
                state = next = s + (WBIT + RUNIT);
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return next;
            }
            else if (a != 0L && a < WBIT)
                return stamp;
            else
                break;
        }
        return 0L;
    }
    
    // 如果锁定状态与给定标记匹配,则
    // 1. 如果标记表示持有锁,则释放它并返回观察标记。
    // 2. 如果是乐观读取,则在验证后返回。
    // 3. 此方法在所有其他情况下都返回零,因此可用作“tryUnlock”的一种形式。
    public long tryConvertToOptimisticRead(long stamp) {
        long a = stamp & ABITS, m, s, next; WNode h;
        U.loadFence();
        for (;;) {
            if (((s = state) & SBITS) != (stamp & SBITS))
                break;
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                return s;
            }
            else if (m == WBIT) {
                if (a != m)
                    break;
                state = next = (s += WBIT) == 0L ? ORIGIN : s;
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return next;
            }
            else if (a == 0L || a >= WBIT)
                break;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return next & SBITS;
                }
            }
            else if ((next = tryDecReaderOverflow(s)) != 0L)
                return next & SBITS;
        }
        return 0L;
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou