🥌 ArrayBlockingQueue

吞佛童子2022年10月10日
  • Java
  • Collection
大约 5 分钟

🥌 ArrayBlockingQueue

1. 类注释

  • 基于 数组 构成的阻塞队列
  • 队列元素 遵循 FIFO
  • 队头元素是最先加进队列里的元素,时间最长;队尾元素是新加进入的元素;新元素入队加入队尾,从队头取出元素

  • 有界队列
  • 队列一旦创建,容量不能改变
  • 若队列已满,再加入元素,会造成阻塞;若队列为空,从中取出元素,会阻塞

  • 该类对生产者线程 & 消费者线程 提供了 公平策略
  • 默认情况下,顺序是不公平的
  • 但在构造函数中可以设置 公平策略,这样保证了线程顺序的公平性,但会造成性能的下降

  • 基于 ReL + putIndex + takeIndex 实现
  • Iterator 通过 加锁 保证遍历的安全

2. 类图

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
                                   implements BlockingQueue<E>, java.io.Serializable {
        // ...
}

3. 属性

/**
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** take | poll | peek | remove 用到的下标 */
    int takeIndex;

    /** put | offer | add 用到的下标 */
    int putIndex;

    /** 队列当前元素个数 */
    int count;

    /** ReL */
    final ReentrantLock lock;

    /** 非空条件 - 此时可以取元素 */
    private final Condition notEmpty;

    /** 非满条件 - 此时可以放入元素 */
    private final Condition notFull;

    transient Itrs itrs = null;

4. 构造函数

     /**
     * 创建一个指定容量 & 非公平 的有界队列
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * 创建一个 指定容量 & 指定公平策略 的有界队列
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * @throws IllegalArgumentException if {@code capacity} is less than
     *         {@code c.size()}, or less than 1.
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            // 若 cap 就是 C 集合的所有元素,那么下次添加元素,就从 0 开始添加;若未满,则从当前处继续添加
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

5. 常用方法

1) 元素入队

    /**
     * 若队列未满,添加元素到队尾,返回 true
     * 若队列已满,抛出 IllegalStateException 异常
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if this queue is full
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return super.add(e);
    }
    
    /**
     * 若队列未满,添加元素到队尾,返回 true
     * 若队列已满,返回 false
     * 比 add() 方法添加失败只会抛异常 比起来更友好一些
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e); // 说明 ArrayBQ 中不能添加 null 元素
        
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 在指定时间范围内,添加元素到队尾
     * 添加成功,返回 true
     * 添加失败,返回 false
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        checkNotNull(e);
        
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false; // 等待时间已到,仍未能成功添加元素,返回 false
                nanos = notFull.awaitNanos(nanos); // 等待 nanos 时间长度的 非满条件 出现
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 若元素为空,抛异常
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }
    
    /**
     * 在 putIndex 处放入元素 x,唤醒 非空条件
     * 只有在获得 ReL 锁的前提下,会调用该方法
     */
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        putIndex ++;
        if (putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal(); // 添加元素成功,说明当前条件非空
    }
    
    /**
     * 若队列未满,添加元素到队尾
     * 若队列已满,阻塞,直到队列非满,然后添加当前元素
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); 
        try {
            while (count == items.length)
                notFull.await(); // 元素已满,阻塞,等待 非满条件 被唤醒
            enqueue(e); // 队列非满,添加元素
        } finally {
            lock.unlock();
        }
    }

2) 元素出队

    // 立即取出队头元素
    // 若队列为空,返回 null
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex]; // 要取出的元素
        items[takeIndex] = null;
        takeIndex ++;
        if (takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued(); // Iterator 中清除掉该元素
        notFull.signal();
        return x;
    }
    
    // 从 队头取元素,若队列为空,阻塞
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

3) 查找元素

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Returns item at index i.
     */
    @SuppressWarnings("unchecked")
    final E itemAt(int i) {
        return (E) items[i];
    }
    
    /**
     * 求队列当前元素个数时,通过加锁保证 size 的准确性
     *
     * @return the number of elements in this queue
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 判断是否存在某个元素,存在则返回 true;否则返回 false
     *
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     */
    public boolean contains(Object o) {
        if (o == null) return false;
        
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i]))
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

4) 删除元素

/**
     * 
     * 删除某个元素
     * 存在该元素,删除一个该元素后,返回 true;不存在该元素,则返回 false
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) { // 找到要删除的元素
                        removeAt(i);
                        return true;
                    }
                    i ++:
                    if (i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 删除指定下标元素
     */
    void removeAt(final int removeIndex) {
        final Object[] items = this.items;
        
        if (removeIndex == takeIndex) { // 要删除的元素下标正是队头元素下标
            items[takeIndex] = null;
            takeIndex ++;
            if (takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued(); // 更新 Iterator
        } else {
            // 将 rvIndex 后面的元素整体前移
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou