起因
springboot定时任务异常引发的针对@Scheduled注解进行的原理分析
异常发生记录
由于服务器时间同步出现异常,导致时间设置为2000年,后排查发现定时任务在执行之后发生中断,无异常信息,推测定时任务内部执行逻辑的中断原因与服务器时间变更有关
源码分析
详细探究@Scheduled内部执行逻辑
首先看一下@Scheduled的注解定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package org.springframework.scheduling.annotation;
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled {
String cron() default "";
String zone() default "";
long fixedDelay() default -1;
String fixedDelayString() default "";
long fixedRate() default -1;
String fixedRateString() default "";
long initialDelay() default -1;
String initialDelayString() default ""; }
|
说明@Scheduled注解,支持三种调度方式:
cron表达式(”cron” expression)、固定频率(fixedRate)、固定延时(fixedDelay)
核心类摘要:
- ScheduledAnnotationBeanPostProcessor
- ScheduledTaskRegistrar
- TaskScheduler
- ReschedulingRunnable
ScheduledAnnotationBeanPostProcessor
ScheduledAnnotationBeanPostProcessor是@scheduled注解处理类,实现BeanPostProcessor接口(postProcessAfterInitialization方法实现注解扫描和类实例创建)、ApplicationContextAware接口(setApplicationContext方法设置当前ApplicationContext)、org.springframework.context.ApplicationListener(观察者模式,onApplicationEvent方法会被回调)。
该类的定义如下图:
1 2 3 4 5 6 7 8 9 10
| package org.springframework.scheduling.annotation;
public class ScheduledAnnotationBeanPostProcessor implements MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
}
|
会发现ScheduledAnnotationBeanPostProcessor足足实现了9个生命周期有关的接口类。
- ApplicationContextAware的setApplicationContext,EmbeddedValueResolverAware的setEmbeddedValueResolver
- MergedBeanDefinitionPostProcessor的postProcessMergedBeanDefinition
- BeanNameAware,BeanFactoryAware
- BeanPostProcessor的postProcessAfterInitialization
- SmartInitializingSingleton的afterSingletonsInstantiated
- DestructionAwareBeanPostProcessor的requiresDestruction
- ApplicationListener的事件
- DestructionAwareBeanPostProcessor的requiresDestruction
- DisposableBean的destroy
我们关注的重点便是:在bean初始化之后的@Scheduled以及@Schedules 注解的解析
先看看一个bean在初始化之后步骤。
postProcessAfterInitialization扫描所有@scheduled注解,区分cronTasks、fixedDelayTasks、fixedRateTasks。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { return bean; }
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
|
这个方法的主要逻辑就是从已经初始化完毕之后的bean中寻找贴有@Scheduled注解的方法,然后迭代这些方法上的注解列表,并解析这些注解内部的属性并进一步的处理。处理的内部属性的方法在ScheduledAnnotationBeanPostProcessor的另外一个方法中。这里跟进继续分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } }
String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } }
if (initialDelay < 0) { initialDelay = 0; }
long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } }
long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } }
Assert.isTrue(processedSchedule, errorMessage);
synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
|
上面的这个方法可以分为两个大的步骤:
- 解析@Scheduled注解中的内部属性,然后创建定时任务,然后加入到任务列表中;
- 将任务保存在内部的一个bean与其相关的定时任务方法集合scheduledTasks中。
而这里的第一步又分了4个属性的解析。
- initialDelay是否延迟执行任务的属性
- cron表达式的解析,这里需要注意的cron表达式不能跟延迟一起使用
- fixedDelay 在上一次调用完毕之后经过多久再次调用的属性
- fixedRate 方法调用的频率属性
这里需要重点关注的两个位置在创建定时任务的时候的三个方法。
- createRunnable 方法
- CronTrigger对象的创建
- CronTask对象的创建
- scheduleCronTask方法的调用。
接下来就是分析这个方法
1 2 3 4 5 6 7 8 9 10
| protected Runnable createRunnable(Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); }
|
发现就是获取需要定时调用的方法,然后创建一个ScheduledMethodRunnable对象。而这个对象又实现了Runnable接口,因此可以 定时的执行其内保存的调用方法。
根据表达式创建触发器CronTrigger
在创建CronTrigger对象的时候会把表达式跟时区这两个属性传入,在CronTrigger会创建一个CronSequenceGenerator对象,而这个对象在初始化的时候会解析对应的表达式以及对应的时区,然后保存对象的信息,在后面计算下次定时执行时间的时候用到
创建包含触发器的定时任务对象CronTask
CronTask对象中保存了触发器对象,用来在后面计算定时执行时间的时候用。然后将需要定时执行的ScheduledMethodRunnable对象保存了起来,在后面执行的时候使用。
接下来就到了执行定时任务,逻辑比较冗长
scheduleCronTask方法在ScheduledTaskRegistrar类中。
ScheduledTaskRegistrar
ScheduledTaskRegistrar类实现了InitializingBean接口的afterPropertiesSet方法,而在afterPropertiesSet方法中会有实例化ConcurrentTaskScheduler类,而实例化的过程也是一个重要的过程,因为这个里面涉及到了定时任务调度用的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Override public void afterPropertiesSet() { scheduleTasks(); }
@SuppressWarnings("deprecation") protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
|
到这里就可以知道,在Spring中用于进行定时任务调度用的默认执行器是ScheduledExecutorService类。我们同时看到对于根据@Schedule属性形成的不同的调度对象,会有不同的处理形式进行调度,但是在这些处理的方法中都会用到同一个对象进行调度,就是上面的ConcurrentTaskScheduler对象,他们的共同点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public ScheduledTask scheduleCronTask(CronTask task) { ....... scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); ....... }
public ScheduledTask scheduleFixedRateTask(FixedRateTask task) { ...... scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval()); ...... } public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) { ...... scheduledTask.future =this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval()); ...... }
|
可以看到这里主要就是设置调度定时任务用的执行器。现在看看上面在ScheduledTaskRegistrar中调用ConcurrentTaskScheduler中的方法,这里就列举一个schedule方法,其他的方法逻辑大同小异。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { if (this.enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } }
|
这里就是创建一个调度的时候出现异常的时候的异常处理器,然后会创建一个ReschedulingRunnable对象,然后调用这个对象的调度方法,进行任务的调用
接下来看看ReschedulingRunnable具体信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public ReschedulingRunnable( Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) { super(delegate, errorHandler); this.trigger = trigger; this.executor = executor; }
@Nullable public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } }
public void run() { Date actualExecutionTime = new Date(); super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { Assert.state(this.scheduledExecutionTime != null, "No scheduled execution"); this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { schedule(); } } }
|
到这里整个调度的分析过程完成的差不多,这里会计算出来下一次的调度时间,然后同时会把ReschedulingRunnable对象本身给传到对应的ScheduledExecutorService中,而ReschedulingRunnable对象间接的实现了Runnable对象,因此这个对象的run方法对定时的被调用,而且这个run方法中必须包含对本身的schedule调用才能实现往复循环的调用。
同时我们也发现了罪魁祸首,
1
| long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
|
在这里当服务器时间变更为2000的期间,计算得到的下次任务的执行推迟时间变更为了负值
流程图
最后补一个task的循环执行的流程图:

结束!