(juc系列)ScheduledThreadPoolExecutor源码阅读

前言

这是Java中常用的另外一个线程池,主要用于实现任务的延迟执行及周期性执行.

听起来与Timer很相似,但是比Timer更加健壮,灵活一些。

简介

官方注释翻译:

一个可以延迟执行命令,或者周期性执行命令的ThreadPoolExecutor.
如果需要多个工作线程,这个类就比Timer更加好用了,或者当你需要比Timer更加灵活,健壮的线程池时,也使用这个类.

延迟任务在他们可用之后很快被执行,但是不完全保证实时. 任务被严格按照FIFO的顺序进行调度。

当一个提交的任务在执行前被取消了,执行就不会进行了. 默认情况下,一个取消了的任务在他的延迟时间到达之前,不会从工作队列中移除.
因为没有启用一些检查和监控,这会导致保留了过多的已取消任务。为了避免这种情况,使用setRemoveOnCancelPolicy方法来让任务在取消时立即从工作队列中移除.

不同的执行,可能会使用不同的工作线程.

因为这个类继承了ThreadPoolExecutor. 一些继承的方法没有用.

ThreadPoolExecutor一样,如果没有特殊情况,这个类也使用Executors.defaultThreadFactory来创建线程,也使用ThreadPoolExecutor.AbortPolicy作为默认的拒绝策略.

注意事项: 这个类重写了executesubmit方法,去生成内部的ScheduledFuture对象,以此来控制每个任务的延时和调度.
所有重写了这两个方法的子类,必须要调用父类的方法。

简单地说,这个类继承自ThreadPoolExecutor,父类有的他都有。
除此之外.添加了对任务的延迟执行及周期性执行。

来看看实现~.

源码

ScheduledFutureTask 任务结构

为了实现延迟及周期性执行,实现了一个基于FutureTask的任务结构.

定义

1
2
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {

这个任务类,继承自FutureTask,同时实现了RunnableScheduleFuture接口.

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// 为了保证FIFO.设置的一个序列号
private final long sequenceNumber;

// 延迟执行的纳秒数字
private volatile long time;

// 周期性执行的任务,周期的纳秒数
private final long period;

// 实际执行的任务,也就是要被调度的任务
RunnableScheduledFuture<V> outerTask = this;

// 在延迟队列中的索引,方便用来快速的取消任务
int heapIndex;

可以看到,除了FutureTask父类的属性外,额外保存了延迟时间和周期时间的参数,而由于周期性任务是要被不断的执行的,因为使用outerTask来保存实际的任务.

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

ScheduledFutureTask(Runnable r, V result, long triggerTime,
long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}

ScheduledFutureTask(Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = period;
this.sequenceNumber = sequenceNumber;
}

ScheduledFutureTask(Callable<V> callable, long triggerTime,
long sequenceNumber) {
super(callable);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}

提供了三个构造方法,全是赋值型的方法重载,没啥说的.

compareTo 优先级

由于在线程池内部的优先级队列中,需要一个优先级. 这里通过重写compareTo方法,实现了优先级的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

优先级使用三个属性定义:

  1. 下一次执行时间
  2. 序列号
  3. 任务提交时间

所以说优先级队列的队首,放的永远是下一个要被执行的任务.

也许和第二名的执行时间一样,但是序列号晚,或者提交时间晚.

run 核心执行方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public void run() {
// 当前不能执行,取消
if (!canRunInCurrentRunState(this))
cancel(false);
// 不是周期性任务,执行一次
else if (!isPeriodic())
super.run();
// 是周期性任务,执行一次,且将状态设置为最初的样子,也就是reset.
else if (super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}

这里重写了FutureTaskrun方法,为了支持周期性的一个属性设置.

有三个分支:

  1. 当前任务不能执行,直接取消任务
  2. 当前任务可以执行,不是周期性任务,调用run执行一次即可。
  3. 当前任务可以执行,是周期性任务.
    1. 执行一次run.且将当前Future的状态设置为最初的样子.
    2. 设置下一次运行的时间.
    3. 将下一次的任务放入工作队列.

其中涉及到另外两个方法:

  • setNextRunTime

设置下一次的执行时间,对于周期性任务来说,下一次执行时间就是当前时间+周期时间

1
2
3
4
5
6
7
8
9

private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

  • reExecutePeriodic

将当前任务再次入队,等待调度执行.

1
2
3
4
5
6
7
8
9
10
11

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(task)) {
super.getQueue().add(task);
if (canRunInCurrentRunState(task) || !remove(task)) {
ensurePrestart();
return;
}
}
task.cancel(false);
}

如果当前状态没问题,就将任务放入工作队列,然后确保有工作线程是活着的.

否则就取消任务.

取消 cancel

1
2
3
4
5
6
7
8
9
10

public boolean cancel(boolean mayInterruptIfRunning) {
// The racy read of heapIndex below is benign:
// if heapIndex < 0, then OOTA guarantees that we have surely
// been removed; else we recheck under lock in remove()
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}

调用父类cancel取消掉当前任务,如果需要立即移除掉工作队列中的任务,就移除.

ScheduleThreadPoolExecutor 构造方法

这个类没有什么属性,完全应用了父类的属性,因此构造方法也是对父类参数的一些赋值操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

提供了4个构造方法,对于线程工厂拒绝策略这两个值,与父类并没有什么不同。

值得注意的是,所有的工作队列,都使用了DelayedWorkQueue,这是一个特意实现的内部类.

DelayedWorkQueue 源码

定义

1
2
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {

作为一个工作队列,他实现了队列和阻塞队列的接口,这两个在本文不详细介绍,认为大家都有所了解.

作为一个队列,最重要的就是入队出队两个方法.

入队 系列方法

提供了三个入队的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13

public void put(Runnable e) {
offer(e);
}

public boolean add(Runnable e) {
return offer(e);
}

public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}

可以看到,三个方法调用的都是底层的offer实现.

需要注意:

由于所有任务的入队,都是ScheduledThreadPoolExecutor类中自己进行的,因此认为是可信的.
所以工作队列是一个无界的队列,所有入队操作,不会超时,不会阻塞,绝对会成功.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
int i = size;
// 需要扩容就扩容
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
// 如果当前队列为空,放在第一位.
queue[0] = e;
// 给这个任务记录一下自己所在的index
setIndex(e, 0);
} else {
// 上浮,根据compareTo给的优先级,将当前任务放在合适的位置上.
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
// 解锁
lock.unlock();
}
return true;
}

入队方法比较粗暴,主要分为两个分支:

  1. 如果队列为空,就直接放在第一位.
  2. 如果队列不为空,就要根据CompareTo的值,将任务放在合适的位置上,以符合优先级队列的特性.

出队系列

出队系列复杂一点,分开讲.

peek 获取队首
1
2
3
4
5
6
7
8
9
10
11

public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}

直接返回了队首的元素. 不判断是否到了执行时间,因为没有弹出,只是返回一下,让调用方自己看看.

poll 获取并弹出队首元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队首的元素,
RunnableScheduledFuture<?> first = queue[0];
// 如果队首的元素,没到执行时间,就返回null,拒绝弹出
// 到了执行时间就调用finishPoll,进行实际的弹出.
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: finishPoll(first);
} finally {
lock.unlock();
}
}

// 实际的弹出方法
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// size减1
int s = --size;
// 队尾的元素
RunnableScheduledFuture<?> x = queue[s];
// 队尾为空
queue[s] = null;
if (s != 0)
// 将队尾的元素,从队头开始下沉,以继续保证优先级队列的规则
siftDown(0, x);
// 返回队头元素
setIndex(f, -1);
return f;
}

poll方法是核心的弹出队首的方法.这里就要判断时间了。

  1. 如果队首的元素,还没到执行的时间,就返回Null.拒绝弹出.
  2. 队首的元素应该执行,就弹出队首元素,同时将队尾元素放在队首,进行下沉操作,以保证优先级队列的合理性.
take 阻塞等待弹出队首元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取一个可中断的锁
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 队首
RunnableScheduledFuture<?> first = queue[0];
// 如果队首为空,直接休眠
if (first == null)
available.await();
else {
// 队首还有多久可以执行
long delay = first.getDelay(NANOSECONDS);
// 如果已经可以执行,就弹出
if (delay <= 0L)
return finishPoll(first);
//
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
// 当前线程等待队首元素还差着的时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 解锁,并且唤醒等待的其他线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

带有超时时间的版本,以阻塞等待队首元素的成功弹出.

  1. 如果队首为空,直接不限期的进行休眠等待.
  2. 如果队首已经可以执行了,就弹出.
  3. 队首不可以执行,就休眠等待队首剩余的时间.
  4. 如果队首不为空,就唤醒其他的休眠等待的线程.

ScheduledThreadPoolExecutor 核心调度方法 schedule

分为两类,一种是只延迟执行,没有周期执行。

仅延迟执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 创建一个任务,调用延迟执行
RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
  1. 创建了一个ScheduleFutureTask. 具体的参数有.
    • 执行的命令
    • 触发的时间
    • 序列号
  2. 调用delayedExecute进行延迟执行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池终止了,就拒绝任务
if (isShutdown())
reject(task);
else {
// 添加到队列中
super.getQueue().add(task);
// 如果状态有问题,就取消任务
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
// 确保线程池有线程活着
ensurePrestart();
}
}

约等于直接将任务放进工作队列中.

延迟执行一次,之后周期执行

有两个版本,

  • 延迟执行一次,之后以固定比例周期执行,等待时间越来越长
  • 延迟执行一次,之后以固定的周期时间进行执行,每次等待时间一样

这里以第二个为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0L)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
-unit.toNanos(delay),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

也是首先计算参数,之后延迟执行一次,与上面的不进行周期执行的区别就是,这个方法会算出一个周期时间,然后传递给任务.

源码总结

有没有发现JDK这些代码的一个特点,很多基础设施,方法特别复杂,支持多种情况,而顶层的API很多都只是简单的转发调用等等. 在我理解,这其实是一种良好设计的体现,充分应对了可能出现需求扩展.

比如在上方的调度方法中,作为最核心的方法,却只是简单计算参数,传递给底层的队列,基本就完成了。

总结

本文并不算特别细致,对很多细节没有深究,这里尝试从原理上总结下ScheduledThreadPoolExecutor

首先,它是一个ThreadPoolExecutor。因此之前学习的,所有线程池的属性,他都有,什么线程工厂,拒绝策略,核心线程,最大线程,线程活跃时间等等特性,他都有。

他额外实现了延迟执行与周期性执行两个特点.依赖什么实现的呢?

独特的延迟优先级队列

众所周知,线程池内部有一个工作队列,这个类实现了自己DelayWorkQueue

  1. 确保队列中的顺序,是有优先级的,按照触发时间的顺序排列,这里需要ScheduledFutureTask配合实现,提供compareTo
  2. 延迟队列,实现了特殊的poll方法,在队首(最早触发的任务)还没有到达触发时间时, 无法从队列弹出任务去执行.

独特的任务结构封装

FutureTask有很多实现方法,对于调度来说,什么最重要. 这个类实现了自己的ScheduledFutureTask.

  1. 提供自己的触发时间,以及compareTo方法,两个任务之间要能够计算优先级。
  2. 可重复调用. FutureTask是一次性的,在done之后,内部的状态已经是完成,不能再次执行了.ScheduledFutureTask方法调用的是父类的runAndReset方法,可以执行完一次后,将状态重置,等待下一次的调用.

由以上两点,结合父类的ThreadPoolExecutor,实现了一个可以延迟执行及周期性执行的线程池.

接下来模拟一个任务的全生命周期。懒得画图,手写吧.

  1. 新建一个线程池.
  2. 调度一个任务,并让他初次延迟10分钟,之后每1分钟执行一次. (假设当前时间是0分钟)
  3. 调度开始,计算任务参数. 任务的time是十分钟后,period是一分钟.
  4. 在调度方法中,执行了一次延迟计算。向工作队列中放入了当前的任务.
  5. 时间来到第5分钟,线程池中的多个线程,尝试调用队列的poll方法获取任务,由于队列中只有一个任务,且没到时间,拿不到,继续等待.
  6. 时间来到第10分钟,终于有个线程拿到了队首的任务,执行了一次,执行后将状态重置,计算下一次的时间,10+1,下一次执行时间在11分钟. 再次将这个任务放入工作队列中.
  7. 时间来到第11分钟,又有一个线程拿到了队首的任务,重复上面的步骤.
  8. 之后每一个分钟都会执行一次了.

…太简单了,我们再加一个任务.

  1. 在第15分钟,向线程池调度一个任务,初次延时5分钟,之后每30s周期性执行一次.
  2. 计算参数,任务2的初次触发时间是20分钟. 将任务2放入队列中, 由于他比任务1执行的晚,因此他一直在队列尾部,拿不到他.
  3. 在15-19分钟,还是任务1优先级比较高,每次都拿到了任务1执行一次,再放入队列.
  4. 在第20分钟,由两个线程(看你配置,也可能是一个线程两次获取),分别获取到了任务1,任务2.
  5. 任务1还是老样子,执行一次,再放回去.
  6. 任务2执行一次后,计算下一次的时间.20+0.5=20.5, 新的任务的触发时间是20分30秒. 放入队列中,此时,任务2优先级高于任务1,放到了队列的第一个.
  7. 20分30秒,有线程拿到了任务2(他在队首),执行一次,计算下一次时间,放入队列….
  8. 21分,有两个线程拿到了任务1,任务2,分别执行一次,计算下一次时间,之后放入队列
  9. 21分30秒,有线程拿到了任务2……

之后,每个半分钟,也就是30秒,都会执行一次任务2.每个整分钟,都会任务1和任务2各自执行一次。

完美运行.

最后这块流程梳理,主要是为了方便自己理解,如果写的过于抽象,而屏幕前的你已经理解的比较透彻了,可以不用看~.


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十