(juc系列)优先级阻塞队列-PriotiryBlockingQueue

本文源码基于: JDK13

PriorityBlockingQueue 优先级阻塞队列

官方注释翻译

一个无界的阻塞队列,使用相同的排队规则PriorityQueue并且提供阻塞的操作. 因为这个队列逻辑上是误解的,尝试添加操作可能会失败,由于资环耗尽了(比如OOM).

这个类不接受null元素. 一个优先级队列依赖于自然序并且不保证non-comparable(不支持比较的元素)的元素顺序.

这个类和他的迭代器实现了CollectionIterator接口所有可选的方法,这个迭代器提供了iterator()spliterator(), 不保证遍历元素的顺序.

如果你需要排序的遍历,可以使用Arrays.sort(pq.toArray()). 另外,方法drainTo可以用来移除一些元素,并且把他们放到另外一个集合中.

这个类的操作,不保证相同优先级的元素的顺序. 如果你需要强制一个顺序,你可以定义定制化的类或者比较器,使用第二个key来打破第一个key相同的情况.

举个例子,这里有一个类提供了FIFO顺序去比较元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class FIFOEntry<E extends Comparable<? super E>>
implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
}
public E getEntry() { return entry; }

public int compareTo(FIFOEntry<E> other) {
// 首先调用`CompareTo`来获取优先级
int res = entry.compareTo(other.entry);
// 如果第一个优先级一样, 就根据seqNum再给定一个优先级.
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}
}

实现了CompareTo,首先使用原始类的CompareTo,如果优先级相等,就是用内部自定义的seqNum来比较优先级.

这个类也是java集合框架的一个成员.

源码

定义

1
2
3
@SuppressWarnings("unchecked")
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

实现了队列的接口以及阻塞队列的接口.

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

// 实际保存数据的数组
private transient Object[] queue;

// 元素数量
private transient int size;

// 比较器,定义了元素的优先级
private transient Comparator<? super E> comparator;

// 锁
private final ReentrantLock lock = new ReentrantLock();

// 不为空的等待条件
private final Condition notEmpty = lock.newCondition();

// 锁
private transient volatile int allocationSpinLock;

// 用于帮助序列化的一个类,没啥用
private PriorityQueue<E> q;

使用数组来保存元素,保存了当前的数量,以及一个比较器,用于定义元素之间的优先级.

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] es = c.toArray();
int n = es.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (es.getClass() != Object[].class)
es = Arrays.copyOf(es, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (Object e : es)
if (e == null)
throw new NullPointerException();
}
this.queue = ensureNonEmpty(es);
this.size = n;
if (heapify)
heapify();
}

实现了四个构造方法,前三个都是对初始容量及比较器的赋值. 第四个构造函数支持将给定集合中的元素初始化到队列中.

入队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public boolean add(E e) {
return offer(e);
}

public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
// 扩容
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
// 根据是否有特定的比较器,将当前元素上浮到正确的优先级位置.
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
// 数量+1,通知不为空的等待线程
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}

public void put(E e) {
offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}

add, offer, put, offer(time,unit)四个方法,本质上都是调用的同一个offer,为啥呢?

因为这个优先级队列,本质上是无界的,也就是说,没有队列满了的情况,因此前面的等待条件,只有notEmpty而没有和其他队列一样的notFull

这个方法比较简单:

  1. 如果容量不够扩容
  2. 直接放进队列中,然后根据是否有特定的比较其,进行上浮,一直到自己的优先级应该在的位置
  3. 通知所有等待队列不为空的线程即可.

两个上浮操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
// 遍历
while (k > 0) {
// 父节点
int parent = (k - 1) >>> 1;
Object e = es[parent];
// 父节点和当前节点对比
if (key.compareTo((T) e) >= 0)
break;
es[k] = e;
k = parent;
}
// 找到的位置给新的节点
es[k] = key;
}

// 和上面的方法一样,只不过比较器是给定的,不是用元素本身的CompareTo。
private static <T> void siftUpUsingComparator(
int k, T x, Object[] es, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = x;
}

因为队列中的元素,其实是一个平衡的二叉堆,因此在给定的元素,寻找优先级所在的位置时, 使用类似于堆的上浮操作即可.

出队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

// 如果为空,返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}

队列的几个出队方法,核心都是调用dequeue()方法,只是在获取元素为空时,处理策略不一致.

  • poll 返回null
  • take 永久阻塞
  • poll(time,unit) 阻塞给定时间.

核心的出队方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

private E dequeue() {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;

// 获取数组第一个,也就是堆顶的元素
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
// 最后一个元素
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
// 将他放在堆顶,然后下沉,使堆符合优先级
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}

也算是常见的堆的出堆代码了,首先获取堆顶元素,之后将堆的最后一个元素,放在堆顶,进行下沉,使整个堆符合优先级.

下沉代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
// assert n > 0;
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
// 堆顶元素的孩子节点
int child = (k << 1) + 1; // assume left child is least
Object c = es[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
if (key.compareTo((T) c) <= 0)
break;
es[k] = c;
k = child;
}
es[k] = key;
}

将给定节点与右边子节点进行比较,如果不符合优先级,交换位置. 递归执行.

总结

一个带有优先级的阻塞队列. 支持使用元素本身的CompareTo以及给定比较器Comparator.

优先级的实现,使用堆. 因此内部保存元素的载体是一个数组.

由于设计是无界的队列,因此入队方法永远不会阻塞,只会逐渐撑爆内存. put方法不会阻塞. 出队方法像其他阻塞队列一样,会阻塞.

对数组的读写使用ReentrantLock来保证线程安全性.

阻塞操作使用Condition来实现阻塞等待与唤醒.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十