Spring的任务调度

Spring的任务调度

1.概述

Spring框架为任务调度提供了专门的解决方案。在Spring框架的org.springframework.scheduling包中,通过对JDK 的ScheduledExecutorService接口的实例进行封装,对外提供了一些注解和接口,为开发者处理定时任务提供了统一的配置方式。          

Spring框架借助了JDK的能力实现了任务调度,但是开发者通过Spring来实现定时任务调度是非常简单和便捷的。如果需要单机环境下要实现任务调度的功能,使用Spring 的任务调度方式无疑是首选。

2.Spring任务调度的使用

我们使用Spring来进行定时任务调度的话,主要是两种方式。1. @Scheduled注解的方式。2. 基于SchedulingConfigurer接口的方式。这两种方式都需要使用@EnableScheduling注解来开启Spring框架的定时任务调度功能。

2.1 @Scheduled注解的方式

@Scheduled注解的方式比较简单,我们只要在需要调度方法上加上@Scheduled注解,然后通过配置@Scheduled注解的元素值,来控制任务调度的频率。这种配置任务调度的频率方式有点硬编码的味道。若后期若需要调整任务调度的频率,则需要修改代码后重启服务才能生效。

2.1.1 @Scheduled注解

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;String cron() default "";String zone() default "";long fixedDelay() default -1;String fixedDelayString() default "";long fixedRate() default -1;String fixedRateString() default "";long initialDelay() default -1;String initialDelayString() default "";
}

@Scheduled注解的元素值说明:

cron

一个类似cron的任务执行表达式

zone

cron表达时解析使用的时区,默认为服务器的本地时区。

fixedDelay

上一次任务执行结束到下一次任务执行开始的固定时间延迟,单位为ms。

fixedDelayString

同fixedDelay,返回的是延迟的字符串形式

fixedRate

以固定的时间间隔来执行Scheduled注释的任务,单位为ms。使用fixedRate执行任务调度时,若上一次任务还未执行完毕,则将下一次任务加入worker队列,等上一次任务执行完成后,才能执行下一次任务。

fixedRateString

同fixedRate,返回的是时间间隔的字符串形式。

initialDelay

首次执行Scheduled注释任务的延迟时间,单位ms。

initialDelayString

同initialDelay,返回的是延迟的字符串形式。

TimeUnit

执行任务的时间单位,默认是ms。

要使用@Scheduled注解方式进行任务调度的话,我们只要在需要进行调度的方法上加上@Scheduled注解,再通过注解的属性值对任务调度的频率进行设置。最后增加一个@EnableScheduling注解来启用Spring定时任务的执行功能。在Spring中只需要通过以上的几步设置,就可以完成一个任务调度的功能。

2.1.2 示例代码

@Component
@EnableScheduling
public class MyTask {@Scheduled(cron = "0/5 * * * * ?")public void cronFun(){System.out.println("Thread "+Thread.currentThread().getName()+" execute  cronFun! "+DateUtil.DateToString(new Date()));}  @Scheduled(fixedDelay = 1000 * 3,initialDelay = 1000 * 5)public void fixedDelayFun(){System.out.println("Thread "+Thread.currentThread().getName()+" execute fixedDelayFun! "+DateUtil.DateToString(new Date()));}@Scheduled(fixedRate = 1000 * 6, initialDelay = 1000 * 5)public void fixedRateFun(){System.out.println("Thread "+Thread.currentThread().getName()+" execute fixedRateFun! "+DateUtil.DateToString(new Date()));}
}

在以上的示例代码中,分别使用了三种方式来模拟进行了任务的调度。

  • cron任务执行表达式的方式:我们定义了一个每5秒钟一次频率的cron表达式。cron表达式是由若干数字、空格、符号按一定的规则,组成的一组字符串,用来表达时间的信息。该字符串由6个空格分为7个域,每一个域代表一个时间含义。

  • fixedDelay是以固定的延迟来执行下一个任务。在示例代码中设置了每延迟3秒钟来执行下一个任务。初次执行调度任务会延迟5秒。在每一个任务执行完成后,都会停顿3秒钟,才会再执行下一个任务。

                     

  •  fixedRate以固定的时间间隔来执行调度任务。在示例代码中设置了每6秒钟的时间间隔频率来执行下一个任务。初次执行调度任务会延迟5秒。

  • 使用fixedRate进行任务调度时需要注意的是,若任务的执行时间超过了时间间隔,也就是上一次任务还未执行完毕,而又有执行下一次任务的调度。此时程序会将下一次任务暂时加入worker队列进行等待,直到上一次任务执行完成后,再执行队列中任务。在示例代码中,我们使用线程休眠的方式来模拟任务的执行时间,我们设置任务的执行时间为10秒,超过了调度的时间间隔6秒。那么此时任务调度就会变成每次会过10秒后,才会再执行下次任务。

       

2.2  SchedulingConfigurer接口方式

在Spring中,虽然使用@Scheduled注解的方式来进行任务调度确实简单易用,但是这种相当于硬编码的方式,一旦设定了任务的执行频率,在任务的执行过程中就无法改变。

Spring还提供了通过实现SchedulingConfigurer接口的方式来配置定时任务。SchedulingConfigurer是一个函数式接口,通过实现SchedulingConfigurer接口来配置任务调度,我们可以在任务运行过程中动态的调整任务的执行时间或频率,而无需修改代码和重启服务。这种方式特别适合于需要根据不同环境和需求,来实现定时任务执行频率差异化的场景。

SchedulingConfigurer接口

@FunctionalInterface
public interface SchedulingConfigurer {void configureTasks(ScheduledTaskRegistrar taskRegistrar);
}

示例代码:

@Component
public class CronTriggerSchedulingConfigurer implements SchedulingConfigurer {@Value("${schedul.config.trigger.cron}")private String cron;@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.addTriggerTask(() ->{System.out.println("this is cron triggerTask! "+DateUtil.DateToString(new Date()));          },trigger ->{          return new CronTrigger(cron).nextExecutionTime(trigger);});}
}

上述的示例代码中CronTriggerSchedulingConfigurer类实现了SchedulingConfigurer接口,在重写接口的configureTasks方法时,我们向ScheduledTaskRegistrar实例注册了一个TriggerTask类型的任务。TriggerTask类的构造函数需要两个参数,一个是代表任务业务逻辑的Runnable实例。另一个是触发任务执行频率的Trigger实例。这里Trigger我们使用了CronTrigger,这就意味着任务调度将以cron表达式定义的方式来执行。

代码中使用了@Value注解来注入cron表达式,在实际开发中,我们就可以使用Apollo配置或者从数据库读取的方式来获取cron表达式。这样当我们修改了cron表达式,程序在下次执行任务时,就读到新的cron表达式,那么任务调度就会以新的频率来执行任务。在示例中cron表达式配置为schedul.config.trigger.cron = 0/6 * * * * ?,表示定时任务会以6秒钟一次的频率来执行。

在configureTasks方法中,如果触发任务执行频率使用了PeriodicTrigger的实例,那么任务调度将会以周期形式来触发执行。PeriodicTrigger是可以用周期时间间隔fixedRate或者周期时间延迟fixedDelay这两种具体形式来触发任务。我们可以通过设置PeriodicTrigger实例的属性fixedRate来具体决定使用哪种方式。

period

long类型

表示执行任务的周期时长,fixedRate模式下表示执行任务的时间间隔,fixedDelay模式下表示延迟多长时间后才执行下一个任务。

timeUnit

TimeUnit类型

表示period周期的时间单位,默认是毫秒。

initialDelay

long类型

表示延迟多长时间后,才开始执行第一次任务。

fixedRate

boolean类型

表示是以fixedRate固定时间间隔模式,还是fixedDelay固定延迟模式来执行任务,默认为false也就是fixedDelay模式。

@Component
public class PeriodicTriggerSchedulingConfigurer implements SchedulingConfigurer {@Value("${schedul.config.trigger.periodic}")private long periodic; @Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {System.out.println("Periodic triggerTask start at "+DateUtil.DateToString(new Date()));taskRegistrar.addTriggerTask(() ->{ConcurrentUtil.Sleep(5);System.out.println("Thread "+Thread.currentThread().getName()+" execute periodic triggerTask! "+DateUtil.DateToString(new Date()));          },trigger ->{          PeriodicTrigger periodicTrigger = new PeriodicTrigger(periodic);periodicTrigger.setInitialDelay(6000);return periodicTrigger.nextExecutionTime(trigger);});    }
}

上述代码中的PeriodicTriggerSchedulingConfigurer使用了PeriodicTrigger这个周期性任务的执行触发器。触发器的initialDelay属性设置为6秒,表示第一次任务会延迟6秒后才执行。由于fixedRate属性默认为false, 那么任务将以fixedDelay固定延迟的模式来执行。schedul.config.trigger.periodic设置是的3000毫秒,所以任务在执行完成后,会延迟个3秒钟才能执行下一个的任务。任务执行时间是5秒加上延迟的3秒,所以每执行一次任务的周期是8秒钟。

当我们把PeriodicTrigger触发器的fixedRate属性默认为true时, 那么任务调度就会以fixedRate固定时间间隔的模式来执行。由于任务的周期periodic设置是的3秒,而任务的执行时间是5秒,任务执行时间比周期时间长,最终每次执行任务的时间周期就是5秒钟。

@Component
public class PeriodicTriggerSchedulingConfigurer implements SchedulingConfigurer {@Value("${schedul.config.trigger.periodic}")private long periodic; @Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {System.out.println("Periodic triggerTask start at "+DateUtil.DateToString(new Date()));taskRegistrar.addTriggerTask(() ->{System.out.println("Thread "+Thread.currentThread().getName()+" execute periodic triggerTask! "+DateUtil.DateToString(new Date()));          ConcurrentUtil.Sleep(5);},trigger ->{          PeriodicTrigger periodicTrigger = new PeriodicTrigger(periodic);periodicTrigger.setInitialDelay(6000);periodicTrigger.setFixedRate(true);return periodicTrigger.nextExecutionTime(trigger);});    }
}

3.Spring任务调度的原理

3.1 @EnableScheduling注解

我们在使用Spring进行定时任务调度的时候,需要使用@EnableScheduling注解来开启任务调度的功能。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}

在@EnableScheduling注解的源码中,我们发现@EnableScheduling注解又通过@Import注解引入了SchedulingConfiguration这个配置类。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}
}

在SchedulingConfiguration配置类中会使用Bean注解的方式来注入一个ScheduledAnnotationBeanPostProcessor类型的实例。ScheduledAnnotationBeanPostProcessor是一个Spring中典型的后置处理器,它是任务调度的核心类。Spring定时任务调度的功能主要是在这个类中实现的。

其实Spring中有许多类似的@EnableXXX+@Import的组合,这种组合可以实现以可插拔的方式来开启和关闭某项功能,可拓展性极强。

3.2 ScheduledAnnotationBeanPostProcessor

ScheduledAnnotationBeanPostProcessor类间接继承了BeanPostProcessor接口,这个后置处理器实现了BeanPostProcessor接口的postProcessAfterInitialization方法。postProcessAfterInitialization方法是在Bean实例化以后会执行的回调方法。

在ScheduledAnnotationBeanPostProcessor类的postProcessAfterInitialization方法中,Spring会找出所有带有@Scheduled注解的方法,然后再根据@Scheduled注解中配置的任务调度信息,将这些方法注册成一个个的定时任务。

3.2.1调度任务的注册

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {//对一些特殊类型的Bean不做处理,直接返回。
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}//获取bean的最终目标类Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);//判断目标类中是否含有@Scheduled注解
if (!this.nonAnnotatedClasses.contains(targetClass) &&AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {//获取目标类标记了Scheduled注解方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledMethods.isEmpty() ? scheduledMethods : null);});//如果目标类中没有@Scheduled注解方法//就把这个目标类缓存到nonAnnotatedClasses的Set集合中
if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);if (logger.isTraceEnabled()) {logger.trace("No @Scheduled annotations found on bean class: " + targetClass);}}else {// Non-empty set of methods
//遍历所有Scheduled注解方法,通过processScheduled方法把@Scheduled注解
//方法注册成定时任务annotatedMethods.forEach((method, scheduledMethods) ->scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));if (logger.isTraceEnabled()) {logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +"': " + annotatedMethods);}}}return bean;
}

在postProcessAfterInitialization方法中,通过反射的方式获取到所有@Scheduled注解的方法,然后再遍历这些方法,把这些注解和方法作为参数来调用processScheduled方法,processScheduled方法中会根据注解信息的不同,把方法注册成不同类型的定时任务。

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {//通过参数bean和method来创建一个Runnable的实例
Runnable runnable = createRunnable(bean, method);boolean processedSchedule = false;String errorMessage ="Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";Set<ScheduledTask> tasks = new LinkedHashSet<>(4);//从Scheduled注解解析获取注解中initialDelay属性值
//initialDelay有数值和字符串两种配置方式,这两方式只能取其一
long initialDelay = scheduled.initialDelay();String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}if (StringUtils.hasLength(initialDelayString)) {try {initialDelay = parseDelayAsLong(initialDelayString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");}}}//从Scheduled注解中获取cron的任务执行表达式
//如果存在cron表达式,则向ScheduledTaskRegistrar注册一个CronTask类型的任务
//注意Scheduled注解中使用了cron表达式,就不能使用initialDelayString cron = scheduled.cron();if (StringUtils.hasText(cron)) {String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// At this point we don't need to differentiate between initial delay set or not anymoreif (initialDelay < 0) {initialDelay = 0;}//从Scheduled注解中获取fixedDelay属性值
//如果配置了fixedDelay属性值,
//则向ScheduledTaskRegistrar注册一个FixedDelayTask类型的任务long fixedDelay = scheduled.fixedDelay();if (fixedDelay >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}String fixedDelayString = scheduled.fixedDelayString();if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedDelay = parseDelayAsLong(fixedDelayString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}//从Scheduled注解中获取fixedRate属性值
//如果配置了fixedRate属性值,
//则向ScheduledTaskRegistrar注册一个FixedRateTask类型的任务long fixedRate = scheduled.fixedRate();if (fixedRate >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = parseDelayAsLong(fixedRateString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}// Check whether we had any attribute setAssert.isTrue(processedSchedule, errorMessage);// Finally register the scheduled taskssynchronized (this.scheduledTasks) {Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));regTasks.addAll(tasks);}}catch (IllegalArgumentException ex) {throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());}
}

processScheduled方法共有三个参数:1. Scheduled注解。2.带有Scheduled注解method方法。3.包含了调度任务method方法Bean。

processScheduled方法中会对Scheduled注解中的属性值进行解析,根据注解中属性值的不同,则向ScheduledTaskRegistrar实例注册不同类型的任务。

  1. 若Scheduled注解中配置了cron的任务执行表达式,则向ScheduledTaskRegistrar注册一个CronTask类型的任务。
  2. 若Scheduled注解配置了fixedDelay属性,则向ScheduledTaskRegistrar注册一个FixedDelayTask类型的任务。
  3. 若Scheduled注解配置了fixedRate属性值,则向ScheduledTaskRegistrar注册一个FixedRateTask类型的任务
  4. 若Scheduled注解中配置了initialDelay属性,那么FixedDelayTask和FixedRateTask的任务首次执行会进行延迟。

在配置Scheduled注解的属性值时,需要注意的是:

  1. cron表达式和initialDelay属性不能同时配置。
  2. cron表达式,fixedDelay,fixedRate这三种任务类型每次只能选择其中的一种类型进行配置。

3.2.2调度任务的执行

程序中把Scheduled注解的方法注册成调度任务后,还需要对这些注册的任务进行执行的操作。

ScheduledAnnotationBeanPostProcessor类继承了ApplicationListener<ContextRefreshed-

Event>接口,实现了接口的onApplicationEvent(E event)方法。所以这个后置处理器类也是一个事件监听器,该监听器主要用于监听ContextRefreshedEvent类型的事件。ContextRefreshedEvent事件类是Spring内部为我们提供的一个事件。该事件类的一个参数就是ApplicationContext对象,即当前的容器。Spring是通过容器的refresh()刷新方法来创建一个IOC容器时,容器创建过程的最后一步finishRefresh()方法会调用publishEvent(new ContextRefreshedEvent(this))方法发布了ContextRefreshedEvent事件。此时Spring容器中的bean都已经创建完成,容器也已经初始化完成了。publishEvent方法会触发ScheduledAnnotationBeanPostProcessor监听器所实现的onApplicationEvent方法。在onApplicationEvent方法中又调用了finishRegistration()方法。在finishRegistration()方法中,就会把已经解析注册任务给调度执行起来。

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}
}
private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}//这里主要是对SchedulingConfigurer接口方式配置的调度任务进行处理if (this.beanFactory instanceof ListableBeanFactory) {//获取容器中所有SchedulingConfigurer类型的实例
Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);//遍历容器中的每个SchedulingConfigurer实例,依次调用实例的configureTasks方法
//通过configureTasks方法,就把用户自定义的任务注册到了ScheduledTaskRegistrar实例中
for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}//当ScheduledTaskRegistrar中没有任务调度的线程池时, 
//就会在容器中去查找线程池,如果容器中存在线程池的话,就通过setTaskScheduler方法
//把这个线程池用于执行要进行调度的任务
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");try {// Search for TaskScheduler bean...
//通过resolveSchedulerBean方法按类型在容器中查找TaskScheduler类型的Bean实例this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {logger.trace("Could not find unique TaskScheduler bean", ex);
//如果容器中存在多个TaskScheduler类型的Bean实例
//就再通过resolveSchedulerBean方法按名称的方式
//来确定一个TaskScheduler类型的Bean实例try {this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskScheduler bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.trace("Could not find default TaskScheduler bean", ex);// Search for ScheduledExecutorService bean next...
//若容器中不存在TaskScheduler类型的实例
//就再去查找ScheduledExecutorService类型的实例。
//也是先按类型再容器中查找ScheduledExecutorService类型的实例try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
//同样当容器中存在多个ScheduledExecutorService类型的实例的话
//就会再按名称的方式来确定一个try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {if (logger.isInfoEnabled()) {logger.info("More than one ScheduledExecutorService bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex2.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex2) {logger.trace("Could not find default ScheduledExecutorService bean", ex2);// Giving up -> falling back to default scheduler within the registrar...logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");}}}//最后调用ScheduledTaskRegistrar实例的afterPropertiesSet方法
//在该方法中,程序会通过线程池把注册的调度任务给执行起来this.registrar.afterPropertiesSet();
}

通过分析以上finishRegistration()方法的源码,发现finishRegistration()方法主要做了三件事情:

  1. 把SchedulingConfigurer接口类型调度任务注册到ScheduledTaskRegistrar实例。
  2. 给ScheduledTaskRegistrar实例设置一个用于执行调度任务的线程池。
  3. 调用ScheduledTaskRegistrar实例的afterPropertiesSet()方法,把注册的调度任务给执行起来。
3.2.1.1 SchedulingConfigurer接口调度任务的注册

SchedulingConfigurer接口中就声明了一个方法configureTasks(ScheduledTaskRegistrar taskRegistrar),方法中需要传入一个ScheduledTaskRegistrar类型的实例参数。

从代码中可以看出,Spring会首先从容器中查找所有SchedulingConfigurer类型的Bean实例,对这些Bean实例进行排序,然后依次遍历每个实例的并调用其configureTasks方法。在configureTasks方法中就可以把用户自定义的任务,注册到ScheduledTaskRegistrar实例中。

3.2.1.2 设置线程池

Spring首先将@Scheduled注解的调度任务解析出来后,就会注册到ScheduledTaskRegistrar的实例。接下来就是对这些调度的任务的执行,对于任务的执行,Spring会把这些任务交给线程池来执行,所以到目前为止我们就需要一个线程池。那么执行任务的调度线程池从何而来,Spring是做了如下的处理:

1. Spring会首先在容器中取查找TaskScheduler类型的实例,如果容器中有且只存在一个TaskScheduler类型的实例,就把这个TaskScheduler类型实例的线程池设置到ScheduledTaskRegistrar实例中,以后调度任务的执行就使用这个线程池。

2. 如果容器中存在多个TaskScheduler类型的实例,程序就会抛出NoUniqueBeanDefinitionException异常。程序会捕捉NoUniqueBeanDefinitionException异常,在对NoUniqueBeanDefinitionException异常进行处理时,会再按名称来确定一个TaskScheduler实例。Bean的名称会默认为taskScheduler。

3.如果容器中不存在TaskScheduler类型的实例,程序会抛出NoSuchBeanDefinitionException异常。程序在捕捉到NoSuchBeanDefinitionException异常进行处理的时候,又会在容器中取查找ScheduledExecutorService类型的线程池。

4. 程序查找ScheduledExecutorService类型线程池的步骤和以上查询TaskScheduler线程池步骤大致相似。如果容器中有且只存在一个ScheduledExecutorService实例,就直接返回这个实例。如果存在多个也是抛出NoUniqueBeanDefinitionException异常,在处理NoUniqueBeanDefinitionException这个异常时,同样是再按名称taskScheduler确定一个。若容器不存在ScheduledExecutorService类型的实例,在处理NoSuchBeanDefinitionException异常的时候,只是记录一下日志不做其他任务的处理,表明此时容器中用户没有配置线程池。

 在以上对线程池查找的每个步骤中,其实都调用了resolveSchedulerBean的方法。resolveSchedulerBean的方法是通过布尔参数的byName,来确定是按类型查找还是按名称来查找。

private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {if (byName) {T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType);if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory) {((ConfigurableBeanFactory) this.beanFactory).registerDependentBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName);}return scheduler;}else if (beanFactory instanceof AutowireCapableBeanFactory) {NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType);if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) {((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName);}return holder.getBeanInstance();}else {return beanFactory.getBean(schedulerType);}
}

线程池设置的流程图:

3.2.1.3 执行任务

现在是Spring已经将@Scheduled注解的调度任务解析出来并注册了,执行调度任务线程池也有了(如果用户没有配置线程池,Spring 接下来会自己默认创建一个),接下来就是任务的执行了。

调度任务的执行Spring是调用了ScheduledTaskRegistrar类的afterPropertiesSet方法。在afterPropertiesSet方法中又调用了scheduleTasks方法进行任务的执行。

this.registrar.afterPropertiesSet();public void afterPropertiesSet() {scheduleTasks();
}
protected void scheduleTasks() {//如果程序中没有用户自定义配置线程池,//程序就会默认创建一个单线程的线程池if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}//对TriggerTask 触发类型的任务进行执行if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}//对CronTask Cron表达式类型的任务进行执行if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}//对固定时间间隔的任务进行执行if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}//对固定延迟任务进行执行if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
}

从scheduleTasks方法的源码可以看出,Spring会先判断用户有没有配置自定义的线程池,如果没有的话,就会默认创建一个单线程的线程池来执行调度任务。

Spring在解析调度任务的时候,是按照cron表达式,fixedDelay,fixedRate,TriggerTask这几种不同的任务类型进行注册。所以在调度任务执行的时,也是分别遍历各种类型任务的集合,依次来执行各种的调度任务。

3.2.1.3.1 CronTask和TriggerTask任务执行

从scheduleTasks方法的代码可以看出,Spring是通过调用scheduleTriggerTask方法和scheduleCronTask方法来分别执行CronTask和TriggerTask类型的任务。

public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
public ScheduledTask scheduleTriggerTask(TriggerTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addTriggerTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}

TaskScheduler线程池实例调用了schedule方法来执行CronTask和TriggerTask任务。在Spring的TaskScheduler线程池类中组合封装了JDK的ScheduledExecutorService实例。TaskScheduler线程池对任务的执行最终都是委托给了ScheduledExecutorService实例来执行的。

我们知道ScheduledExecutorService实例执行schedule方法是一次性的,执行一次后就不会再次执行了,但是实际中的CronTask和TriggerTask任务都是周期循环执行的。这里面其实是Spring自己做了一些工作来支持循环周期执行。原理很简单,简单概括就是Spring在进行schedule任务调度时,每次会根据当前调度的时间信息,计算出下一次任务的执行时间,然后不断的重复进行任务调度。下面我们通过源码来具体分析实现的细节。

public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {//获取JDK调度线程池的实例用于执行任务ScheduledExecutorService executor = getScheduledExecutor();try {ErrorHandler errorHandler = this.errorHandler;if (errorHandler == null) {errorHandler = TaskUtils.getDefaultErrorHandler(true);}//实例化一个ReschedulingRunnable的实例,并调用实例schedule方法来调度任务return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);}
}

我们以TaskScheduler接口其中的一个实现类ThreadPoolTaskScheduler为例来进行分析。在schedule方法中,有一行非常关键的代码是用任务,触发器,线程池等参数构建了一个ReschedulingRunnable的对象,然后再调用这个对象的schedule方法。

public ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {//计算出下一次任务的执行时间this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {return null;}//现在下次任务的执行时间减去当前的时间
//计算出距离下次任务的时间间隔long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();//将ReschedulingRunnable自身对象作为任务提交给线程池执行this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}
}

在ReschedulingRunnable类中的schedule方法中,Spring会根据CronTask或者TriggerTask任务的触发机制计算出下一次任务的执行时间。然后用计算出的下一次任务的执行时间减去系统当前时间得到距离下一次任务的时间间隔。源码中用这个时间间隔作为任务延迟的参数,把ReschedulingRunnable对象本身this作为任务,调用了ScheduledExecutorService实例的schedule方法来提交任务进行执行。ReschedulingRunnable之所以能够作为任务,是因为这个类实现了Runnable接口。

Runnable接口中定义了一个run方法,当Runnable接口的实例提交到线程池后,其定义的run方法就会被执行。ReschedulingRunnable作为Runnable接口的实现类,自然也给run方法给出了自己的实现。

public void run() {//记录调度任务实际开始执行的时间Date actualExecutionTime = new Date();//这里是执行用户自定义的调度任务super.run();//记录调度任务实际执行结束的时间Date completionTime = new Date();synchronized (this.triggerContextMonitor) {Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");//在TriggerContext的上下文中更新任务执行的时间信息this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!obtainCurrentFuture().isCancelled()) {//再一次调用schedule方法,触发下一次的任务调度schedule();}}
}

在run方法中,首先调用父类的run方法,这里通过super.run()方法来执行用户自定义的调度任务,也就是@Scheduled注解的方法,并会记录下执行这些任务实际开始和结束时间。然后用任务的调度时间,执行任务实际开始和结束时间,在TriggerContext的上下文中更新任务执行的时间信息。在做完这一切以后,Spring 会再次调用schedule方法触发下一次的任务调度。而这个schedule方法又是把ReschedulingRunnable对象本身作为任务提交,那么ReschedulingRunnable对象作为任务时,其自身的run()方法又会被调用,这样ReschedulingRunnable类的schedule()方法会触发run()方法,而run()方法又会schedule()方法,这样就达到了循环调用的目的。

3.2.1.3.2 FixedRate任务执行

在ScheduledTaskRegistrar实例中,通过调用scheduleFixedRateTask方法来执行FixedRate类型的调度任务。

public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());}}else {addFixedRateTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}

在scheduleFixedRateTask方法中,对FixedRate类型调度任务的执行其实又是调用了TaskScheduler线程池实例的scheduleWithFixedDelay方法。

本例中我们以TaskScheduler线程池实例之一的ThreadPoolTaskScheduler类来进行分析。ThreadPoolTaskScheduler类的scheduleWithFixedDelay方法的源码如下。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {//获取JDK的ScheduledExecutorService实例线程池 ScheduledExecutorService executor = getScheduledExecutor();long initialDelay = startTime.getTime() - System.currentTimeMillis();try {//调用JDK线程池的scheduleAtFixedRate方法来执行固定间隔的任务return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);}
}

从以上源码中可以出,FixedRate类型的任务执行比较简单,由于JDK中的ScheduledExecutorService线程池本身就支持执行FixedRate这种固定时间间隔的任务。所以ThreadPoolTaskScheduler类中会先实例化一个ScheduledExecutorService类型的线程池,把任务封装成了一个DelegatingErrorHandlingRunnable类型的任务,然后程序就直接调用ScheduledExecutorService线程池实例的scheduleAtFixedRate方法来执行调度任务。

3.2.1.3.2 FixedDelay任务执行

FixedDelay和FixedRate任务的实现原理十分相似,由于JDK中的ScheduledExecutorService线程池也支持执行FixedDelay这种固定时间延迟的任务。所以程序中对FixedDelay任务,最终是委托给了ScheduledExecutorService线程池实例来执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146219.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

STM32F407单片机编程入门(十二) FreeRTOS实时操作系统详解及实战含源码

文章目录 一.概要二.什么是实时操作系统三.FreeRTOS的特性四.FreeRTOS的任务详解1.任务函数定义2.任务的创建3.任务的调度原理 五.CubeMX配置一个FreeRTOS例程1.硬件准备2.创建工程3.调试FreeRTOS任务调度 六.CubeMX工程源代码下载七.小结 一.概要 FreeRTOS是一个迷你的实时操…

联想(lenovo) 小新Pro13锐龙版(新机整理、查看硬件配置和系统版本、无线网络问题、windows可选功能)

新机整理 小新pro13win10新机整理 查看硬件配置和系统版本 设置-》系统-》系统信息 无线网络问题 部分热点可以&#xff0c;部分不可以 问题&#xff1a;是因为自己修改了WLAN的IP分配方式为手动分配&#xff0c;导致只能在连接家里无线网的时候可以&#xff0c;连接其他…

统信UOS的「端侧模型」

统信UOS早有布局的「端侧模型」的相关信息。 文章目录 前言一、究其原因1. 从需求侧来看2. 从技术侧来看二、产品特色1. 更流畅的使用体验2. 更智能的加速框架3. 更包容的硬件策略前言 在苹果2024秋季新品发布会上,苹果发布了有史以来最大的iPhone。而同一天开发布会的华为,…

【Linux】POSIX信号量、基于环形队列实现的生产者消费者模型

目录 一、POSIX信号量概述 信号量的基本概念 信号量在临界区的作用 与互斥锁的比较 信号量的原理 信号量的优势 二、信号量的操作 1、初始化信号量&#xff1a;sem_init 2、信号量申请&#xff08;P操作&#xff09;&#xff1a;sem_wait 3、信号量的释放&#xff08…

机器之心 | 阿里云Qwen2.5发布!再登开源大模型王座,Qwen-Max性能逼近GPT-4o

本文来源公众号“机器之心”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;阿里云Qwen2.5发布&#xff01;再登开源大模型王座&#xff0c;Qwen-Max性能逼近GPT-4o 人工智能领域再度迎来重磅消息&#xff01; 2023 年 8 月&#…

尚品汇-H5移动端整合系统(五十五)

目录&#xff1a; &#xff08;1&#xff09;运行前端页面 &#xff08;2&#xff09;启动前端页面 &#xff08;3&#xff09;添加搜索分类接口 &#xff08;4&#xff09;购物车模块修改 &#xff08;5&#xff09;登录模块 &#xff08;6&#xff09;订单模块 &#…

【巧用ddddocr破解算术运算验证码的经典示范】

计算型验证码 算术验证码&#xff0c;也叫计算型验证码, 计算型验证码其实是一种特殊的字符型验证码&#xff0c;只不过在它的基础上增加了数字运算。   计算型验证码在将人类视觉和计算机视觉的差异作为区分用户和电脑的依据的同时&#xff0c;还加上了逻辑运算&#xff0c…

基于SpringBoot+Vue的在线酒店预订系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSpringBootVueMySQL的…

人工智能开发实战常用分类算法归纳与解析

内容导读 决策树贝叶斯分类器最近邻分类器支持向量机神经网络 一、决策树 决策树(Decision Tree)是用于决策的一棵树&#xff0c;从根节点出发&#xff0c;通过决策节点对样本的不同特征属性进行划分&#xff0c;按照结果进入不同的分支&#xff0c;最终达到某一叶子节点&am…

计算机毕业设计 基于Python的校园个人闲置物品换购平台 闲置物品交易平台 Python+Django+Vue 前后端分离 附源码 讲解 文档

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

深耕电通二十年,崔光荣升电通中国首席执行官

电通今日宣布&#xff0c;任命拥有二十年深厚电通工作经验的杰出行业领袖崔光(Guang Cui)为电通中国首席执行官&#xff0c;该任命自2024年9月27日起生效。崔光自2004年加入电通以来&#xff0c;从策略规划岗位逐步成长为公司的核心领导者&#xff0c;这也是他职业生涯中的第9次…

篮球运动场景物体检测系统源码分享

篮球运动场景物体检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Comp…

Linux基础---13三剑客及正则表达式

一.划水阶段 首先我们先来一个三剑客与正则表达式混合使用的简单示例&#xff0c;大致了解是个啥玩意儿。下面我来演示一下如何查询登录失败的ip地址及次数。 1.首先&#xff0c;进入到 /var/log目录下 cd /var/log效果如下 2.最后&#xff0c;输入如下指令即可查看&#xf…

OpenGL渲染管线(Rendering Pipeline)介绍

渲染管线 计算机图形学中&#xff0c;计算机图形管线&#xff08;渲染管线 或简称 图形管线、流水线&#xff09;是一个概念模型&#xff0c;它描述了t图像系统将 3D场景渲染到2D屏幕所需执行的一系列步骤。渲染管线大的可以分为三个阶段。 &#xff08;一&#xff09;应用阶段…

[UTCTF2020]sstv

用goldwave和010editor打开均未发现线索&#xff0c; 网上搜索sstv&#xff0c;豆包回答如下&#xff1a; 慢扫描电视&#xff08;Slow Scan Television&#xff0c;简称 SSTV&#xff09;是一种通过无线电传输和接收静态图像的技术。 一、工作原理 SSTV 通过将图像逐行扫描并…

【GMNER】Grounded Multimodal Named Entity Recognition on Social Media

Grounded Multimodal Named Entity Recognition on Social Media 动机解决方法特征抽取多模态索引设计索引生成框架EncoderDecoder 实体定位、实体-类型-区域三元组重建 出处&#xff1a;ACL2023 论文链接&#xff1a;https://aclanthology.org/2023.acl-long.508.pdf code链接…

[Linux] Linux操作系统 进程的状态

标题&#xff1a;[Linux] Linux操作系统 进程的状态 个人主页&#xff1a;水墨不写bug &#xff08;图片来源于网络&#xff09; 目录 一、前置概念的理解 1.并行和并发 2.时间片 3.进程间具有独立性 4.等待的本质 正文开始&#xff1a; 在校的时候&#xff0c;你一定学过《…

10 张手绘图详解Java 优先级队列PriorityQueue

PriorityQueue 是 Java 中的一个基于优先级堆的优先队列实现&#xff0c;它能够在 O(log n) 的时间复杂度内实现元素的插入和删除操作&#xff0c;并且能够自动维护队列中元素的优先级顺序。 通俗来说&#xff0c;PriorityQueue 就是一个队列&#xff0c;但是它不是先进先出的…

【速成Redis】04 Redis 概念扫盲:事务、持久化、主从复制、哨兵模式

前言&#xff1a; 前三篇如下&#xff1a; 【速成Redis】01 Redis简介及windows上如何安装redis-CSDN博客 【速成Redis】02 Redis 五大基本数据类型常用命令-CSDN博客 【速成Redis】03 Redis 五大高级数据结构介绍及其常用命令 | 消息队列、地理空间、HyperLogLog、BitMap、…

带你0到1之QT编程:十五、探索QSplitter和QDockWidget的简单应用技巧

此为QT编程的第十五谈&#xff01;关注我&#xff0c;带你快速学习QT编程的学习路线&#xff01; 每一篇的技术点都是很很重要&#xff01;很重要&#xff01;很重要&#xff01;但不冗余&#xff01; 我们通常采取总-分-总和生活化的讲解方式来阐述一个知识点&#xff01; …