线程池之ScheduledExecutorService

一直以来,我都没用过这个线程池,都是使用那个简单的newFixedThreadPool来做线程池,定时任务就是使用quartz或者Spring-scheduled或者xxl-job来做,几乎没有使用jdk自带的定时任务,当我在读eureka源码的时候,发现使用了这个定时任务线程池来完成,对这个ScheduledExecutorService做了一些了解。

线程池

有大概好几种线程池,算是四种吧

1
2
3
4
5
6
7
8
// 单线程池,永远只有一个线程在跑
Executors.newSingleThreadExecutor()
// 回收型线程池,可以重复使用原来旧的已经空闲下来的线程,不够就创建新的
Executors.newCachedThreadPool()
// 固定大小线程池,也可以用重复使用原来旧的已经空闲下来的线程,可以限制同时运行的线程数据
Executors.newFixedThreadPool()
// 定时任务线程池,可以循环跑任务,也可以定时跑任务
Executors.newScheduledThreadPool()

使用一个定时任务线程池可以用如下的方法,这段代码是大佬Doug Lea写在ScheduledExecutorService注释中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = new Runnable() {
public void run() { System.out.println("beep"); }
};
final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(new Runnable() {
public void run() { beeperHandle.cancel(true); }
}, 60 * 60, SECONDS);
}
}

通过future的方式来对任务进行调度,上面实现的效果就是主线程开启schedule之后的10s,开始每隔10s打印一次beep,一直打印一小时后停掉这个任务。这个定时任务的线程池中,永远只有一个可以复用的线程,任务运行完之后线程空闲,等下一次任务来临,就可以复用这个线程。这样的话在1个小时后的那一时刻,10s打印先进行,还是任务停止先进行,取决于他俩代码的执行先后,毕竟只有一个活动的线程。

使用ScheduledExecutorService

在ScheduledExecutorService中,只有三种方法

1
2
3
4
5
6
7
8
9
10

// 1. 一次性任务,在delay时间之后执行,执行完就完
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
// 1. 一次性任务,在delay时间之后执行,可以获取返回值,这个可以和上一个认为是一样的
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
// 2. 多次循环任务,按照上一次任务的发起时间延时指定时长执行下一次任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
// 3. 多次循环任务,按照上一次任务的结束时间延时指定时长执行下一次任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

以下是一个针对scheduleAtFixedRate 和 scheduleWithFixedDelay的简单例子,简单改造下 Doug Lea的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class BeeperControl {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
public void beepForAnHour() {
final Runnable beeper = () -> {
System.out.println(DateUtil.now() + " ===> beep ===> start");
// 这里模拟缓慢操作,经测试我的PC执行耗时大概为2-3s,不使用Thread.Sleep避免引入其他线程问题
for (long i = 0; i < 9999999999L; i++) {

}
System.out.println(DateUtil.now() + " ===> beep ===> stop");
};
// final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 0, 1, SECONDS);
final ScheduledFuture<?> beeperHandle = scheduler.scheduleWithFixedDelay(beeper, 0, 1, SECONDS);
scheduler.schedule(() -> { beeperHandle.cancel(true); }, 60 * 60, SECONDS);
}
}

public static void main(String[] args) {
BeeperControl beeperControl = new BeeperControl();
beeperControl.beepForAnHour();
}

线程池的数据是够的,但是scheduleWithFixedDelay按照执行结束时间来做开启下一次任务,因此出现了这样的输入日志

1
2
3
4
5
6
7
8
2021-07-04 16:57:16 ===> beep ===> start
2021-07-04 16:57:18 ===> beep ===> stop
2021-07-04 16:57:19 ===> beep ===> start
2021-07-04 16:57:22 ===> beep ===> stop
2021-07-04 16:57:23 ===> beep ===> start
2021-07-04 16:57:25 ===> beep ===> stop
2021-07-04 16:57:26 ===> beep ===> start
2021-07-04 16:57:28 ===> beep ===> stop

可以发现,虽然规定了一秒执行一次,但是他是根据线程结束的时候来计算下一次的执行时间,所以当stop打印之后,下一次beep的时间才确定,而且是比较精准的一秒。

程序没按照我朴素的思路运行

那么,在这种任务线程比较慢的(比任务间隔时间还长)极端情况下,scheduleAtFixedRate又是什么样的效果呢,按照我自己的理解应该是如下的输出结果。

1
2
3
4
5
6
7
8
9
2021-07-04 16:57:16 ===> beep ===> start  // 第1次任务执行
2021-07-04 16:57:17 ===> beep ===> start // 第1次1秒钟之后第2次任务执行
2021-07-04 16:57:18 ===> beep ===> stop // 第1次任务执行结束
2021-07-04 16:57:18 ===> beep ===> start // 第2次1秒钟之后第3次任务执行
2021-07-04 16:57:19 ===> beep ===> stop // 第2次任务执行结束
2021-07-04 16:57:19 ===> beep ===> start // 第3次1秒钟之后第4次任务执行
2021-07-04 16:57:20 ===> beep ===> stop // 第3次任务执行结束
2021-07-04 16:57:20 ===> beep ===> start // 第4次1秒钟之后第5次任务执行
2021-07-04 16:57:21 ===> beep ===> stop // 第4次任务执行结束

这是我自己按照代码意思理解出来的结果,但是实际执行的时候却没有发生这个,反而是下面这种状态

1
2
3
4
5
6
7
8
2021-07-04 17:02:44 ===> beep ===> start
2021-07-04 17:02:47 ===> beep ===> stop
2021-07-04 17:02:47 ===> beep ===> start
2021-07-04 17:02:49 ===> beep ===> stop
2021-07-04 17:02:49 ===> beep ===> start
2021-07-04 17:02:51 ===> beep ===> stop
2021-07-04 17:02:51 ===> beep ===> start
2021-07-04 17:02:54 ===> beep ===> stop

没有按照我朴素的思维取走,就感觉很奇怪,完全是按照一个任务无论有多长,它执行完之后,下一个任务调度才会去执行,看去来好像ScheduledExecutorService就只能执行一个任务,在执行过程中,其余的任务都进入等待状态(进队列),执行完从队列里取任务,看取到的任务是否已经到达执行时间(此时已经错过了最开始设定的执行时间)。

延时任务的原理和源码

这是为什么呢,我点开了ScheduledExecutorService的实现 ScheduledThreadPoolExecutor读了它的源码,希望能够找到原理,但我非常自信就是我上面猜测的那个原理。

看到构造方法没什么奇特的,就调用了父类的ThreadPoolExecutor的构造方法

1
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());

看起了也没什么特殊的,还蛮常规,就看到了一个DelayedWorkQueue,看名字猜测和延时队列或者优先队列有类似功能。

根据ThreadPoolExecutor的工作原理,肯定是从任务队列中拿出队头,我看了DelayedWorkQueue的take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 已经超时了,限制就要弹出,拿去执行
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

/**
* 时间减去当前时间,小于等于0代表已经过了当前时间
*/
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

这里为啥写了个新的队列不用优先队列PriorityQueue大概是因为优先队列线程不安全并且没办法阻塞读写吧,这是我的猜测。

至于怎么完成的周期性执行,写在了ScheduledFutureTask对象的run方法里,scheduleAtFixedRate方法只是把对应的属性和任务放到队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 无间隔执行,立马运行run方法
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 这一行就是能够周期性执行的逻辑
setNextRunTime();
// 从新把任务丢到队列里
reExecutePeriodic(outerTask);
}
}
/**
* 这个period 在scheduleWithFixedDelay执行的时候是负值,就在被调用的时候,延时p才执行
* 在scheduleAtFixedRate执行的时候是正值,在被调用的时候,把上次执行的时间加上个p作为新的执行时间
* setNextRunTime执行时机都是队头take之后,修改执行时间,再次放到队列中
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

就是单纯的多维护了一个DelayedWorkQueue,其余逻辑还是靠ThreadPoolExecutor。

执行逻辑的流程图

ScheduledExecutorService的流程图

Ps. 唉。这段时间真的超级废啊,在公司忙着和那几个自大的人吵架,忙着一堆无法推进的事情。这博客五月份写了个开头,7月份才提交。