🔵 AQS

吞佛童子2022年6月20日
  • Java
  • concurrency
大约 11 分钟

🔵 AQS

1. 类注释

  1. 概述

    • 为阻塞锁 & 同步器(举例:信号量)提供了一种框架,它依赖于一个 FIFO 等待队列
    • 该类借助一个原子类 int 来表示状态,可以成为多种同步器的基础
    • 该类的子类必须实现本类的 protected 方法来改变这个状态值
  2. 两种模式

    • exclusive 独占模式
      • 只能有一个线程获取
    • shared 共享模式
      • 可以被多个线程同时获取

2. 类图

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer 
    implements java.io.Serializable {
// .......
}

img_5.png


3. 内部类

    /**
     * 等待队列的节点类
     *
     * 该等待队列是 "CLH" (Craig, Landin, and Hagersten) 的变形
     * CLH 通常用来自旋,能确保无饥饿性,提供先来先服务的公平性
     * CLH 锁也是一种基于链表的可扩展、高性能、公平的自旋锁,
     * 申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
     *
     * 添加节点,这个节点将成为新的 tail,弹出节点,需要重新设置 head 
     *
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
     *
     *
     * 当一个节点被取消,那么它的继任者节点将被重新连接到一个没有被取消的前驱者身后
     * 虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系
     */
    static final class Node {
        static final Node SHARED = new Node(); //标识共享锁
        static final Node EXCLUSIVE = null; // 标识独占锁

        /** 表示当前结点已取消调度。当超时或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 */
        static final int CANCELLED =  1;
        /** 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。 */
        static final int SIGNAL    = -1;
        /** 表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 结点将从等待队列转移到同步队列中 */
        static final int CONDITION = -2;
        /** 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 */
        static final int PROPAGATE = -3;

        /**
         * 在Node节点中一般通过 waitStatus 获得节点不同的状态
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 将节点入队的线程,创建时被初始化,使用之后变为 null
         */
        volatile Thread thread;

        /**
         * 条件队列中等待的下一个节点,在共享模式下为 SHARED
         * 只有在独占模式下,才会有条件队列
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回当前节点的前驱节点,若前驱节点为空,抛异常
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

4. 属性

    /**
     * 等待队列队头,懒初始化
     * 只有 setHead 方法才会对其进行修改
     */
    private transient volatile Node head;

    /**
     * 等待队列队尾,懒初始化
     * 只有通过 enq 方法添加新等待节点时才会对其进行修改
     */
    private transient volatile Node tail;

    /**
     * 同步状态,在子类重写各种 try() 时会进行 state 的 get() & set()
     * 在不同子类中,可充当不同作用,例:
     * Rel 中表示 重入次数
     * Semaphore 中表示 还可以通行的令牌数
     */
    private volatile int state;

    /**
     * 返回当前状态
     */
    protected final int getState() {
        return state;
    }

    /**
     * 设置当前状态
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * CAS 修改 state 值
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // Queuing utilities

    /**
     * 小于这个值,单位为纳秒,时,更倾向于自旋,大于这个值,更倾向于 park() 阻塞线程
     */
    static final long spinForTimeoutThreshold = 1000L;

5. 模版方法

模版方法被 final 修饰,不可被重写

1) acquire(int arg)

    /**
     * 独占模式下获取锁,不响应中断
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        // 1. 当前尝试获取锁失败
        // 2. 将当前线程封装成一个独占节点,并将其加入 CLH 队尾成功
        // 3. 自旋,只有 前置节点为头节点 && 当前尝试获锁成功 才返回,若返回 true,说明当前线程被中断
        // 满足以上条件后,中断当前线程
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
     * 将节点加入 CLH 队尾,CAS 保证同步
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);

        Node pred = tail;
        if (pred != null) { // 尾节点非空,在尾节点后添加 node 作为新的尾节点
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾节点为空,执行 enq,将 node 节点插入到队列
        enq(node);
        return node;
    }
    
    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    
    /**
     * 将 node 节点插入到队列
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 之前没有任何节点 - 初始化
                if (compareAndSetHead(new Node()))
                    tail = head; // head == tail == new Node()
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 只有 前置节点为头节点 && 尝试获锁成功 才返回,若返回 true,说明当前线程被中断;若 false,说明未被中断
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // p 为前置节点
                // 前置节点为头节点 && 尝试获锁成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 前置节点状态为 signal && 阻塞当前线程 && 当前线程被中断
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 需要被子类重写的方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
     * 根据前置节点状态判断当前节点是否需要 park 阻塞,只有前置节点状态为 signal 时,才会返回 true
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 前置节点正等待被 signal 唤醒,因此当前节点同样需要阻塞
             */
            return true;
        if (ws > 0) {
            /*
             * 前置节点因为某些原本被 cancel,因此需要再往前继续查找,直到遇到非 cancel 掉的节点,将其后置节点设为当前节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 前置节点状态 == 0 | 3,此时需要设置前置节点状态为 signal,表示前置节点等待被唤醒
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
        
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 阻塞当前线程
        return Thread.interrupted(); // 返回线程是否被中断,同时清空中断标志位
    }
    
    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }
    
    /**
     * 取消当前节点的 acquire 过程
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;

        Node pred = node.prev;
        while (pred.waitStatus > 0) // 前置节点处于 cancel 状态
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED; // 当前节点状态为 cancel 

        // 当前节点就是 tail 那么设置 tail 节点为上一个节点,CAS 删除当前节点
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
    
    /**
     * 中断当前线程
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

2) acquireInterruptibly(int arg)

    /**
     * 独占模式下获取锁,支持中断
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 当前线程被中断,同时清空中断标志位,抛出异常
        // 当前线程尝试获锁失败
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    /**
     * 当前线程被中断时,立即抛出异常,因此为 支持中断模式
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3) acquireShared(int arg)

    /**
     * 共享模式下获取变量,忽略中断
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    // 等待子类重写
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); // 在 CLH 队列尾部添加 共享节点,返回当前节点
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 前置节点为 头节点
                if (p == head) {
                    // 当前线程再次尝试获锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 设置头节点,唤醒头节点的一个后继结点
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * 设置头节点,唤醒头节点的一个后继结点
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 暂存原头节点
        setHead(node); // 当前节点设为头节点

        // 满足以下任一条件,会尝试唤醒其后续节点
        // 1. propagate > 0
        // 2. 原 头节点为空
        // 3. 原 头节点状态 < 0 --> 处于 signal | condition | propagate 状态
        // 4. 当前 头节点为空
        // 5. 当前 头节点状态 < 0
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next; // 后续节点
            if (s == null || s.isShared())
                doReleaseShared(); // 唤醒头节点的一个后继结点
        }
    }
    
    /**
     * 设置头节点
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    /**
     * 唤醒头节点的一个后继结点
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 头节点状态为 signal
                if (ws == Node.SIGNAL) {
                    // 尝试将头节点状态设置为 0,设置失败,跳过当前循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;   
                    // 设置成功,唤醒一个头节点的后继结点         
                    unparkSuccessor(h);
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // 头节点状态 == 0 && 设置头节点状态为 PROPAGATE 失败,跳出当前循环,重试
            }
            // head == null || head == tail 的情况下
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    /**
     * 唤醒一个满足条件的 后继结点,如果能找到的话
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) // 要唤醒后继结点,那么当前节点状态必定 >= 0,若否,CAS 设置其状态为 0
            compareAndSetWaitStatus(node, ws, 0); 

        Node s = node.next;
        // 后继结点 == null || 后继结点状态为 cancel
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从后往前找到首个非 cancel 状态的节点
            // 这里不能从 s 往后找,因为 s == null,那么肯定无法执行 null.next 
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 找到满足条件的后继结点,解除其阻塞状态
        if (s != null)
            LockSupport.unpark(s.thread);
    }

4) acquireSharedInterruptibly(int arg)

    /**
     * 共享模式下的获锁,响应中断
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 当前线程发生中断,立即抛出异常
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

5) release(int arg)

    /**
     * 在独占模式下释放共享变量  
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        // 尝试释放锁成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒一个满足条件的 后继结点
            return true;
        }
        return false;
    }
    
    // 独占模式下,尝试释放锁,等待子类重写
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

6) releaseShared(int arg)

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    // 共享模式下的尝试释放锁,需要子类重写
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
     * 唤醒头节点的一个后继结点
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 头节点状态为 signal
                if (ws == Node.SIGNAL) {
                    // 尝试将头节点状态设置为 0,设置失败,跳过当前循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;   
                    // 设置成功,唤醒一个头节点的后继结点         
                    unparkSuccessor(h);
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // 头节点状态 == 0 && 设置头节点状态为 PROPAGATE 失败,跳出当前循环,重试
            }
            // head == null || head == tail 的情况下
            if (h == head)                   // loop if head changed
                break;
        }
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou