Java 开发

SpringBoot 中 @Scheduled 的执行原理

言七墨 · 12月3日 · 2019年 · 948次已读

前面有篇文章是“线上定时任务全部罢工”,本文主要从代码的角度分析下原因。

注解 @Scheduled 的执行原理

1、加载使用 @Scheduled 注解的类及方法

Spring 首先会通过类 ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法去初始化 bean,待初始化完 bean 后,就会拦截所有用到“@Scheduled”注解的方法,进行调度处理,具体细节请看下面代码:

@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
  // 判断是否是代理类,如果是代理类,拿到真正的目标类
  Class<?> targetClass = AopUtils.getTargetClass(bean);
  // 判断是否已经处理过。nonAnnotatedClasses属性是个Class集合,用于存储bean对应的class是否有@Scheduled注解的方法,如果没有,则添加到这个集合中
  if (!this.nonAnnotatedClasses.contains(targetClass)) {
    // 找出class中带有@Scheduled注解的方法
    Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
        new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {
          @Override
          public Set<Scheduled> inspect(Method method) {
            Set<Scheduled> scheduledMethods =
                AnnotationUtils.getRepeatableAnnotations(method, Scheduled.class, Schedules.class);
            return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
          }
        });
    // 如果不存在@Scheduled注解的方法
    if (annotatedMethods.isEmpty()) {
      // 添加到nonAnnotatedClasses集合中。下次不用重复处理该类
      this.nonAnnotatedClasses.add(targetClass);
      if (logger.isTraceEnabled()) {
        logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
      }
    }
    else { // 如果存在@Scheduled注解的方法
      // 遍历这些@Scheduled注解的方法
      for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
        Method method = entry.getKey();
        for (Scheduled scheduled : entry.getValue()) {
          // 进行调度处理,(定时规则,方法,类)
          processScheduled(scheduled, method, bean);
        }
      }
      if (logger.isDebugEnabled()) {
        logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
            "': " + annotatedMethods);
      }
    }
  }
  return bean;
}

2、解析 @Scheduled 的内容,并将定时任务注册到 ScheduledTaskRegistrar 中

解析相应的的注解参数,放入“定时任务列表”等待后续处理;之后在“定时任务列表”中统一执行相应的定时任务(定时任务先执行 corn,判断定时任务的执行时间,计算出相应的下次执行时间,放入线程中,到了时间就执行。再执行按“频率”(fixedRate)执行的定时任务,直到所有任务执行结束)。

// 获取scheduled类参数,之后根据参数类型、相应的延时时间、对应的时区放入不同的任务列表中
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    // ...
    // 获取corn类型
    String cron = scheduled.cron();
    if (StringUtils.hasText(cron)) {
        Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
        processedSchedule = true;
        String zone = scheduled.zone();
        if (this.embeddedValueResolver != null) {
            cron = this.embeddedValueResolver.resolveStringValue(cron);
            zone = this.embeddedValueResolver.resolveStringValue(zone);
        }
        TimeZone timeZone;
        if (StringUtils.hasText(zone)) {
            timeZone = StringUtils.parseTimeZoneString(zone);
        } else {
            timeZone = TimeZone.getDefault();
        }
        // 放入cron任务列表中(不执行)
        tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
    }
    // ...
    // 获取频率类型(long类型)
    long fixedRate = scheduled.fixedRate();
    if (fixedRate >= 0) {
        Assert.isTrue(!processedSchedule, errorMessage);
        processedSchedule = true;
        // 放入FixedRate任务列表中(不执行)(registrar为ScheduledTaskRegistrar)
        tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
    }
    // 执行频率类型(字符串类型,不接收参数计算如:600*20)
    String fixedRateString = scheduled.fixedRateString();
    if (StringUtils.hasText(fixedRateString)) {
        Assert.isTrue(!processedSchedule, errorMessage);
        processedSchedule = true;
        if (this.embeddedValueResolver != null) {
            fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
        }
        try {
            fixedRate = Long.parseLong(fixedRateString);
        } catch(NumberFormatException ex) {
            throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer");
        }
        // 放入FixedRate任务列表中(不执行)
        tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
    }
}

3、执行定时任务

 run 方法是调度 task 的核心,task 的执行实际上是 run 方法的执行。

public void run() {
    // 是否是周期性的
    boolean periodic = isPeriodic();
    // 线程池是shundown状态不支持处理新任务,直接取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果不是周期性任务,直接调用run方法,会设置执行结果,然后直接返回
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任务,调用runAndReset方法,不会设置执行结果,然后直接返回
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下一次执行该任务的时间
        setNextRunTime();
        // 重复执行该任务
        reExecutePeriodic(outerTask);
    }
}

下方图片显示的是定时任务执行时的参数,请注意 pool size = 1,active threads = 1(线程池中只会有一个生效的线程), queued tasks(所有的定时任务存在于队列中)。

总结

从上面代码可以看出,如果多个定时任务定义的是同一个时间,会根据程序加载标有 @Scheduled 方法的先后来执行。若某个定时任务一直无法执行完成,则无法设置下次任务执行时间,之后会导致此任务后面的所有定时任务无法继续执行,也就会出现所有的定时任务罢工的现象。所以应用 SpringBoot 的定时任务的方法中,一定不要出现“死循环”、“执行耗费大量时间”、“http持续等待无响应”的现象,否则会导致定时任务直接罢工。针对数据量、查询或者远程调用特别多的场景,推荐把定时任务分段处理。

0 条回应