(juc系列)延迟队列DelayQueue

本文源码基于: JDK13

DelayQueue 延迟队列

官方注释翻译

用于延迟元素的一个无界的阻塞队列实现. 延迟元素只有在他的延迟过期之后,才可以被获取.

队头的元素,是队列中过期最早的元素。如果没有元素过期,那么将没有队头元素,poll方法将会返回一个null.

过期操作只有元素的getDelay方法返回一个小于等于0的数值时才会起作用.

尽管没有过期的元素,不能通过take或者poll来获取, 其他方面和正常的元素是一样的.

比如,size()返回过期和未过期的元素的计数,同时,这个队列也是不接受空元素.

这个类和他的迭代器实现了CollectionIterator接口的所有可选方法.

这个类也是Java集合框架的一部分噢。

源码

定义

1
2
3
4

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

首先是一个普通队列, 且还是阻塞队列. 拥有他们的所有属性,同时,还要求放入的元素,是实现了Delayed接口的. 该接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13

public interface Delayed extends Comparable<Delayed> {

/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}

根据给定的时间单位,返回剩余的延迟时间.

属性

1
2
3
4
5
6
7
8
9
10
11

// 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 正在等待队头元素的线程
private Thread leader;

// 有元素可用的等待条件
private final Condition available = lock.newCondition();

使用优先级队列来保存元素,同时记录等待队首元素的线程.

这个优先级队列,是java.util包里的,暂不做详细解释,相信大家都懂哈.

提供了等待条件available来负责阻塞线程与唤醒.

构造方法

1
2
3
4
5
6
public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

提供两个构造方法,分别构造一个空的延迟队列和一个加载给定集合的阻塞队列.

入队系列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public boolean add(E e) {
return offer(e);
}

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入元素
q.offer(e);
// 队头元素是刚才插入的元素,说明有可用元素,唤醒等待线程们
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

public void put(E e) {
offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}

4个入队系列的方法,本质上都是调用了offer. 直接调用内部优先级队列的offer,无脑写入即可.

可以看到,该方法永远返回ture. 因为这个延迟队列也是无界的,因此不需要阻塞,不会插入失败.

插入只有两种可能:

  1. 成功
  2. 内存爆了,程序死掉.

出队系列

poll 没有元素返回Null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取第一个元素
E first = q.peek();
// 第一个元素为空,或者第一个元素的延迟时间没有到期,返回null.
// 否则返回该元素
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}

首先查看第一个元素,如果不为空且已经过期了,那就弹出进行返回. 否则就返回null.

take 阻塞等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 查看第一个元素
E first = q.peek();
// 第一个元素为空,直接等待
if (first == null)
available.await();
else {
// 第一个元素已经超时,可用了,就进行弹出
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
// 等待
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
// 如果当前是第一个等待队首元素的线程,记录一下当前线程,且只阻塞剩余的时间,就苏醒来检查一下是否可用了
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 拿到元素后,协助唤醒一下等待线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

这个阻塞版本的获取元素复杂一点.

  1. 如果第一个元素为空, 就让当前线程阻塞等待.
  2. 不为空,且已经过期,直接弹出,进行返回,此时获取元素成功.
  3. 不为空,且没有过期,如果当前线程,是第一个等待队首元素的线程, 就阻塞第一个元素剩余的延迟时间, 到期后苏醒来检查队首元素的状态.
  4. 不是第一个等待的线程,直接阻塞,等待第一个线程来唤醒.
  5. 获取元素成功后,如果还有可用元素,协助唤醒一下其余的等待线程.
poll(time,unit) 超时阻塞版本

和上面的take代码很像,只是在每一个线程的阻塞时都加上了时间限制,就不重复讲了.

查看系列

size 查看元素数量

这个简单的方法为啥要写呢,因为要注意: 返回的size,是所有过期的,未过期的总数.

1
2
3
4
5
6
7
8
9
10

public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}

直接调用了内部的优先级队列的size()方法,没有判断是否过期.

peek() 查看队首元素,不弹出

由于在延迟队列中,总是需要看一下,队首元素,如果已经过期,就弹出,没过期,就不处理. 因此也简单看一下peek()方法.

1
2
3
4
5
6
7
8
9
10

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}

没啥,加锁,然后调用优先级队列的peek完事了。

总结

延迟队列,本质上是一个带有优先级的阻塞队列,且根据延迟限制队首元素的出队.

  • 优先级队列的实验,使用了java.util.PriorityQueue,本质上实现应该也是一个堆实现的.
  • 阻塞队列的实现,使用Condition条件. 由于是无界队列,入队操作不会阻塞. 出队行为在条件上等待,当有符合条件的元素时,唤醒所有等待线程.
  • 延迟属性的实现,在出队时,对队首元素进行额外的过期判断,如果过期,就弹出,没有过期,就返回null.
  • 线程安全方面,由于java.util.PriorityQueue不是线程安全的,因此使用额外的一个ReentrantLock来限制对数据的读写访问.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十