🥌 ArrayBlockingQueue
2022年10月10日
- Java
🥌 ArrayBlockingQueue
1. 类注释
- 基于 数组 构成的阻塞队列
- 队列元素 遵循 FIFO
- 队头元素是最先加进队列里的元素,时间最长;队尾元素是新加进入的元素;新元素入队加入队尾,从队头取出元素
- 有界队列
- 队列一旦创建,容量不能改变
- 若队列已满,再加入元素,会造成阻塞;若队列为空,从中取出元素,会阻塞
- 该类对生产者线程 & 消费者线程 提供了 公平策略
- 默认情况下,顺序是不公平的
- 但在构造函数中可以设置 公平策略,这样保证了线程顺序的公平性,但会造成性能的下降
- 基于
ReL
+putIndex
+takeIndex
实现 Iterator
通过 加锁 保证遍历的安全
2. 类图
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// ...
}
3. 属性
/**
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
final Object[] items;
/** take | poll | peek | remove 用到的下标 */
int takeIndex;
/** put | offer | add 用到的下标 */
int putIndex;
/** 队列当前元素个数 */
int count;
/** ReL */
final ReentrantLock lock;
/** 非空条件 - 此时可以取元素 */
private final Condition notEmpty;
/** 非满条件 - 此时可以放入元素 */
private final Condition notFull;
transient Itrs itrs = null;
4. 构造函数
/**
* 创建一个指定容量 & 非公平 的有界队列
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 创建一个 指定容量 & 指定公平策略 的有界队列
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
// 若 cap 就是 C 集合的所有元素,那么下次添加元素,就从 0 开始添加;若未满,则从当前处继续添加
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
5. 常用方法
1) 元素入队
/**
* 若队列未满,添加元素到队尾,返回 true
* 若队列已满,抛出 IllegalStateException 异常
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return super.add(e);
}
/**
* 若队列未满,添加元素到队尾,返回 true
* 若队列已满,返回 false
* 比 add() 方法添加失败只会抛异常 比起来更友好一些
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e); // 说明 ArrayBQ 中不能添加 null 元素
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* 在指定时间范围内,添加元素到队尾
* 添加成功,返回 true
* 添加失败,返回 false
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false; // 等待时间已到,仍未能成功添加元素,返回 false
nanos = notFull.awaitNanos(nanos); // 等待 nanos 时间长度的 非满条件 出现
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
/**
* 若元素为空,抛异常
*
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
/**
* 在 putIndex 处放入元素 x,唤醒 非空条件
* 只有在获得 ReL 锁的前提下,会调用该方法
*/
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
putIndex ++;
if (putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 添加元素成功,说明当前条件非空
}
/**
* 若队列未满,添加元素到队尾
* 若队列已满,阻塞,直到队列非满,然后添加当前元素
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 元素已满,阻塞,等待 非满条件 被唤醒
enqueue(e); // 队列非满,添加元素
} finally {
lock.unlock();
}
}
2) 元素出队
// 立即取出队头元素
// 若队列为空,返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 要取出的元素
items[takeIndex] = null;
takeIndex ++;
if (takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued(); // Iterator 中清除掉该元素
notFull.signal();
return x;
}
// 从 队头取元素,若队列为空,阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
3) 查找元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
/**
* Returns item at index i.
*/
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
/**
* 求队列当前元素个数时,通过加锁保证 size 的准确性
*
* @return the number of elements in this queue
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
/**
* 判断是否存在某个元素,存在则返回 true;否则返回 false
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
4) 删除元素
/**
*
* 删除某个元素
* 存在该元素,删除一个该元素后,返回 true;不存在该元素,则返回 false
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) { // 找到要删除的元素
removeAt(i);
return true;
}
i ++:
if (i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
/**
* 删除指定下标元素
*/
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) { // 要删除的元素下标正是队头元素下标
items[takeIndex] = null;
takeIndex ++;
if (takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued(); // 更新 Iterator
} else {
// 将 rvIndex 后面的元素整体前移
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}