起因

springboot定时任务异常引发的针对@Scheduled注解进行的原理分析

异常发生记录

由于服务器时间同步出现异常,导致时间设置为2000年,后排查发现定时任务在执行之后发生中断,无异常信息,推测定时任务内部执行逻辑的中断原因与服务器时间变更有关

源码分析

详细探究@Scheduled内部执行逻辑

首先看一下@Scheduled的注解定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

package org.springframework.scheduling.annotation;

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {

String cron() default "";

String zone() default "";

long fixedDelay() default -1;

String fixedDelayString() default "";

long fixedRate() default -1;

String fixedRateString() default "";

long initialDelay() default -1;

String initialDelayString() default "";
}

说明@Scheduled注解,支持三种调度方式:

cron表达式(”cron” expression)、固定频率(fixedRate)、固定延时(fixedDelay)

核心类摘要:

  1. ScheduledAnnotationBeanPostProcessor
  2. ScheduledTaskRegistrar
  3. TaskScheduler
  4. ReschedulingRunnable

ScheduledAnnotationBeanPostProcessor

ScheduledAnnotationBeanPostProcessor是@scheduled注解处理类,实现BeanPostProcessor接口(postProcessAfterInitialization方法实现注解扫描和类实例创建)、ApplicationContextAware接口(setApplicationContext方法设置当前ApplicationContext)、org.springframework.context.ApplicationListener(观察者模式,onApplicationEvent方法会被回调)。

该类的定义如下图:

1
2
3
4
5
6
7
8
9
10

package org.springframework.scheduling.annotation;

public class ScheduledAnnotationBeanPostProcessor
implements MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

}

会发现ScheduledAnnotationBeanPostProcessor足足实现了9个生命周期有关的接口类。

  1. ApplicationContextAware的setApplicationContext,EmbeddedValueResolverAware的setEmbeddedValueResolver
  2. MergedBeanDefinitionPostProcessor的postProcessMergedBeanDefinition
  3. BeanNameAware,BeanFactoryAware
  4. BeanPostProcessor的postProcessAfterInitialization
  5. SmartInitializingSingleton的afterSingletonsInstantiated
  6. DestructionAwareBeanPostProcessor的requiresDestruction
  7. ApplicationListener的事件
  8. DestructionAwareBeanPostProcessor的requiresDestruction
  9. DisposableBean的destroy

我们关注的重点便是:在bean初始化之后的@Scheduled以及@Schedules 注解的解析

先看看一个bean在初始化之后步骤。

postProcessAfterInitialization扫描所有@scheduled注解,区分cronTasks、fixedDelayTasks、fixedRateTasks。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}

Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}

这个方法的主要逻辑就是从已经初始化完毕之后的bean中寻找贴有@Scheduled注解的方法,然后迭代这些方法上的注解列表,并解析这些注解内部的属性并进一步的处理。处理的内部属性的方法在ScheduledAnnotationBeanPostProcessor的另外一个方法中。这里跟进继续分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}

// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}

// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}

// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}

// Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}

// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);

// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}

上面的这个方法可以分为两个大的步骤:

  1. 解析@Scheduled注解中的内部属性,然后创建定时任务,然后加入到任务列表中;
  2. 将任务保存在内部的一个bean与其相关的定时任务方法集合scheduledTasks中。

而这里的第一步又分了4个属性的解析。

  1. initialDelay是否延迟执行任务的属性
  2. cron表达式的解析,这里需要注意的cron表达式不能跟延迟一起使用
  3. fixedDelay 在上一次调用完毕之后经过多久再次调用的属性
  4. fixedRate 方法调用的频率属性

这里需要重点关注的两个位置在创建定时任务的时候的三个方法。

  1. createRunnable 方法
  2. CronTrigger对象的创建
  3. CronTask对象的创建
  4. scheduleCronTask方法的调用。

接下来就是分析这个方法

1
2
3
4
5
6
7
8
9
10

protected Runnable createRunnable(Object target, Method method) {
//贴有@Scheduled注解的方法需要是无参方法
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
//从目标类(bean)中寻找一个对应的可以调用的方法
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
//创建一个ScheduledMethodRunnable实现了Runable接口
return new ScheduledMethodRunnable(target, invocableMethod);
}

发现就是获取需要定时调用的方法,然后创建一个ScheduledMethodRunnable对象。而这个对象又实现了Runnable接口,因此可以 定时的执行其内保存的调用方法。

根据表达式创建触发器CronTrigger

在创建CronTrigger对象的时候会把表达式跟时区这两个属性传入,在CronTrigger会创建一个CronSequenceGenerator对象,而这个对象在初始化的时候会解析对应的表达式以及对应的时区,然后保存对象的信息,在后面计算下次定时执行时间的时候用到

创建包含触发器的定时任务对象CronTask

CronTask对象中保存了触发器对象,用来在后面计算定时执行时间的时候用。然后将需要定时执行的ScheduledMethodRunnable对象保存了起来,在后面执行的时候使用。

接下来就到了执行定时任务,逻辑比较冗长

scheduleCronTask方法在ScheduledTaskRegistrar类中。

ScheduledTaskRegistrar

ScheduledTaskRegistrar类实现了InitializingBean接口的afterPropertiesSet方法,而在afterPropertiesSet方法中会有实例化ConcurrentTaskScheduler类,而实例化的过程也是一个重要的过程,因为这个里面涉及到了定时任务调度用的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

@Override
public void afterPropertiesSet() {
scheduleTasks();
}

@SuppressWarnings("deprecation")
protected void scheduleTasks() {
//TaskScheduler还没创建则进行创建
if (this.taskScheduler == null) {
//创建用于定时调用的执行器,使用的是jdk自带的ScheduledExecutorService
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
//创建ConcurrentTaskScheduler,用于执行任务的调度
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
//这里面放根据cron表达式创建的定时任务对象
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
//根据fixRate属性创建的定时任务对象
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
//根据fixDelay属性创建的定时任务对象
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}

到这里就可以知道,在Spring中用于进行定时任务调度用的默认执行器是ScheduledExecutorService类。我们同时看到对于根据@Schedule属性形成的不同的调度对象,会有不同的处理形式进行调度,但是在这些处理的方法中都会用到同一个对象进行调度,就是上面的ConcurrentTaskScheduler对象,他们的共同点如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public ScheduledTask scheduleCronTask(CronTask task) {
.......
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
.......
}

public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
......
scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
......
}
public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
......
scheduledTask.future =this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
......
}

可以看到这里主要就是设置调度定时任务用的执行器。现在看看上面在ScheduledTaskRegistrar中调用ConcurrentTaskScheduler中的方法,这里就列举一个schedule方法,其他的方法逻辑大同小异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
//创建一个ReschedulingRunnable然后进行调度,并返回一个ScheduledFuture
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}


这里就是创建一个调度的时候出现异常的时候的异常处理器,然后会创建一个ReschedulingRunnable对象,然后调用这个对象的调度方法,进行任务的调用

接下来看看ReschedulingRunnable具体信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

public ReschedulingRunnable(
Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {
//设置调度的任务跟错误处理器
super(delegate, errorHandler);
this.trigger = trigger;
this.executor = executor;
}

@Nullable
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//计算出下一次的调用时间,这里会用到前面保存过的调用时间信息,然后进行计算。
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
//罪魁祸首!!!!
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
//进行任务的调度,这里调用的不是根据需要调用的方法创建出来的调度对象,是这个类本身
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}

public void run() {
//获取当前的饿时间
Date actualExecutionTime = new Date();
//调用父类的run方法,父类的fun方法会调用需要调用的方法创建出来的调度对象,也就是我们贴有注解的方法
super.run();
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
//更新调用时间
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
//检查这个方法是否被取消了
if (!obtainCurrentFuture().isCancelled()) {
//进行类本身的调用,这里主要是刷新下次的调用时间
schedule();
}
}
}

到这里整个调度的分析过程完成的差不多,这里会计算出来下一次的调度时间,然后同时会把ReschedulingRunnable对象本身给传到对应的ScheduledExecutorService中,而ReschedulingRunnable对象间接的实现了Runnable对象,因此这个对象的run方法对定时的被调用,而且这个run方法中必须包含对本身的schedule调用才能实现往复循环的调用。

同时我们也发现了罪魁祸首,

1
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();

在这里当服务器时间变更为2000的期间,计算得到的下次任务的执行推迟时间变更为了负值

流程图

最后补一个task的循环执行的流程图:

taskImg

结束!