如何动态修改spring中定时任务的调度策略(1)

news/2024/7/23 11:56:17 标签: spring, java, 后端, scheduled, 调度

在我们日常开发中经常会调度工具来处理一下需要定时执行的任务,比如定时导出报表数据给业务方发送邮件。你在工作中是如何这种定时调度

如何实现调度任务

使用java技术栈的老铁来说,现成定时调度的解决方案应该有很多,总结来说有三大类:

1.开源的定时调度框架,如xxlJob。

2.使用java中的定时任务api自己写,比如Timer.

3.使用spring提供的@Scheduled注解,简单方便。

上述三种方案中,开源调度框架是最好用的,而且有可视化管理面板。但是这种开源框架通常需要单独部署和维护,成本较高,如果业务中调度任务不多的话,没有必要大费周折去部署和维护这么一套系统,得不偿失。

使用jdk自带的Timer,或者 ScheduledThreadPoolExecutor来自己手动实现,这种方式灵活度高,可更具业务进行自定义开发,但是jdk自带的api相对较为底层,不能很好的使用 cron表达式,如果想要使用的话,还需要实现一套cron表达式解析器的开发,开发成本较高。

使用spring提供的@Scheduled注解,通过简单配置一下cron表达式即可,再加上现在java应用开发,基本上都在使用spring全家桶,引用@Scheduled的成本不高。

Sping中调度任务是如何实现的

在使用spring的项目中 ,在一个方法上添加一个 @Scheduled 注解

然后根据业务需要添加需要的cron表达式(其实@Scheduled不仅仅支持cron表达式,还支持其他很多调度策略,详细可以看@Scheduled注解的定义)

最后在启动类上添加 @EnableScheduling 即可。

是不是很简单,完整代码如下:

java">
@SpringBootApplication
@EnableScheduling
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

@Component
public class JobScheduler {

  @Scheduled(cron="0/1 * * * * ?")
  public void schedule(){
        System.out.println("scheduler" + new SimpleDateFormat("YYYY-MM-dd HH-mm:ss").format(new Date()) + "  :  "+Thread.currentThread().getName());
    }
}

这里可能就有老铁好奇了,为什么只添加了 @Scheduled 和 @EnableScheduling 两个注解,就可以实现调度的能力呢?

我们可以简单的看下 spring源码,探索一下它是如何实现的。

@EnableScheduling

当你在配置类上使用@EnableScheduling注解时,Spring会创建一个名为scheduledAnnotationProcessor的bean。这个过程是通过SchedulingConfiguration类实现的。SchedulingConfiguration类是@EnableScheduling注解导入的配置类,它负责创建ScheduledAnnotationBeanPostProcessor实例。

具体代码如下:

java">@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}


@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

}

在SchedulingConfiguration类中,定义了一个名为scheduledAnnotationProcessor的bean。这个bean是通过scheduledAnnotationProcessor()方法创建的,该方法返回一个新的ScheduledAnnotationBeanPostProcessor实例。

同时,@Role(BeanDefinition.ROLE_INFRASTRUCTURE)注解表示这个bean是基础设施bean,通常不会被应用程序代码直接使用。

ScheduledAnnotationBeanPostProcessor类主要负责解析@Scheduled注解并创建定时任务。

以下是解析@Scheduled注解并创建定时任务的详细过程(以cron任务做介绍,其他任务流程类似)。

1.处理bean

当Spring创建bean时,ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法会被调用。这个方法会检查bean中的所有方法,寻找带有@Scheduled注解的方法。

java">	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
				bean instanceof ScheduledExecutorService) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}

		Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
		if (!this.nonAnnotatedClasses.contains(targetClass) &&
				AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
			Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
						Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
								method, Scheduled.class, Schedules.class);
						return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
					});
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(targetClass);
				if (logger.isTraceEnabled()) {
					logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
				}
			}
			else {
				// Non-empty set of methods
				annotatedMethods.forEach((method, scheduledAnnotations) ->
						scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
				if (logger.isTraceEnabled()) {
					logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
							"': " + annotatedMethods);
				}
			}
		}
		return bean;
	}

2.创建定时任务

对于每个解析到的@Scheduled注解,ScheduledAnnotationBeanPostProcessor会创建一个定时任务。

它首先获取一个TaskScheduler实例(默认为 ThreadPoolTaskScheduler ),然后使用该实例为带有@Scheduled注解的方法创建一个定时任务,任务的执行策略根据注解中的属性确定。

这一步是在processScheduled方法的另一个重载版本中完成的。

java">	protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
		try {
			Runnable runnable = createRunnable(bean, method);
			boolean processedSchedule = false;
			String errorMessage =
					"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

			Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

			// Determine initial delay
			long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
			String initialDelayString = scheduled.initialDelayString();
			if (StringUtils.hasText(initialDelayString)) {
				Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
				if (this.embeddedValueResolver != null) {
					initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
				}
				if (StringUtils.hasLength(initialDelayString)) {
					try {
						initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
					}
					catch (RuntimeException ex) {
						throw new IllegalArgumentException(
								"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
					}
				}
			}

			// Check cron expression  // cron表达式处理
			String cron = scheduled.cron();
			if (StringUtils.hasText(cron)) {
				String zone = scheduled.zone();
				if (this.embeddedValueResolver != null) {
					cron = this.embeddedValueResolver.resolveStringValue(cron);
					zone = this.embeddedValueResolver.resolveStringValue(zone);
				}
				if (StringUtils.hasLength(cron)) {
					Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
					processedSchedule = true;
					if (!Scheduled.CRON_DISABLED.equals(cron)) {
						TimeZone timeZone;
						if (StringUtils.hasText(zone)) {
							timeZone = StringUtils.parseTimeZoneString(zone);
						}
						else {
							timeZone = TimeZone.getDefault();
						}
						tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
					}
				}
			}

			// At this point we don't need to differentiate between initial delay set or not anymore
			if (initialDelay < 0) {
				initialDelay = 0;
			}

			// Check fixed delay// 固定延迟
			......

			// Check fixed rate// 固定频次
			......

			// Check whether we had any attribute set
			Assert.isTrue(processedSchedule, errorMessage);

			// Finally register the scheduled tasks
			synchronized (this.scheduledTasks) {
				Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
				regTasks.addAll(tasks);
			}
		}
		catch (IllegalArgumentException ex) {
			throw new IllegalStateException(
					"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
		}
	}

3.创建Runnable
createRunnable方法用于创建一个Runnable实例,它封装了带有@Scheduled注解的方法的调用。这个Runnable实例将被传递给任务调度器。

java">	protected Runnable createRunnable(Object target, Method method) {
		Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
		Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
		return new ScheduledMethodRunnable(target, invocableMethod);
	}

4.创建Trigger
createTrigger方法根据@Scheduled注解的属性创建一个Trigger实例。Trigger实例决定了任务的执行策略,如固定速率、固定延迟或Cron表达式。

java">	public CronTrigger(String expression, ZoneId zoneId) {
		Assert.hasLength(expression, "Expression must not be empty");
		Assert.notNull(zoneId, "ZoneId must not be null");

		this.expression = CronExpression.parse(expression);
		this.zoneId = zoneId;
	}

5.配置调度任务

根据cron表达式对业务逻辑进行调度

java">public ScheduledTask scheduleCronTask(CronTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		boolean newTask = false;
		if (scheduledTask == null) {
			scheduledTask = new ScheduledTask(task);
			newTask = true;
		}
		if (this.taskScheduler != null) {
			scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
		}
		else {
			addCronTask(task);
			this.unresolvedTasks.put(task, scheduledTask);
		}
		return (newTask ? scheduledTask : null);
	}

从整个流程看下来,还是很复杂的,只不过这个复杂的流程,交给了spring自身来处理,只暴露给用户一个@Scheduled注解,简单的配置一下就可以。

到这里呢,spring的对@Scheduled 注解的解析和我们在日常开发中使用 @Scheduled的方式,想必你已经很清楚了,不过通过走读源码,大概也会发现一些问题:

1.cron表达式不能动态修改,如果想要调整调度策略的话,需要修改cron表达式,并重启项目才可以。

2.调度任务使用的 调度线程池是spring中默认的,线程池的大小,队里长度,拒绝策略都是什么?如何自定义个线程池提供给spring使用?

这两个问题,我们发到下一篇文中,进一步说明。


http://www.niftyadmin.cn/n/5448510.html

相关文章

【Docker】Airflow Scheduler 容器部署

Airflow Scheduler标准软件基于Bitnami airflow-scheduler 构建。当前版本为2.4.58 你可以通过轻云UC部署工具直接安装部署&#xff0c;也可以手动按如下文档操作&#xff0c;该项目已经全面开源&#xff0c;可以从如下环境获取 配置文件地址: https://gitee.com/qingplus/qin…

高架学习笔记之需求工程

目录 一、什么是软件需求 二、需求工程 2.1. 需求获取 2.2. 需求分析 2.3. 形成需求规格 2.4. 需求确认 2.5. 需求管理 2.5.1. 变更控制 2.5.2. 版本控制 2.5.3. 需求跟踪 2.5.4. 需求状态跟踪 一、什么是软件需求 软件需求目前没有统一的定义&#xff0c;一般是指用…

阿里云服务器价格购买价格表,2024新版报价查询

2024年腾讯云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新的云服务器优惠券…

Python|Pyppeteer实现启动Adspower并自动关闭多余的窗口页面(23)

前言 本文是该专栏的第23篇,结合优质项目案例持续分享Pyppeteer的干货知识,记得关注。 本文笔者将针对pyppeteer启动adspower浏览器的时候,出现多个浏览窗口的问题,详细介绍一个解决方法。这也是很多同学,比较关心的一个问题。正好借助此文,笔者对该问题结合实际案例代码…

php 函数五 日期时间相关扩展 一

一 DateTime 与DateTimeImmutable类行为类似&#xff0c;但是可以修改对象本身。 1.1 静态常量 静态常量值ATOM"Y-m-d\\TH:i:sP"COOKIE"l, d-M-Y H:i:s T"ISO8601"Y-m-d\\TH:i:sO"ISO8601_EXPANDED"X-m-d\\TH:i:sP"RFC822"D, …

Redis 教程系列之Redis 性能测试(七)

Redis 性能测试 Redis 性能测试是通过同时执行多个命令实现的。 语法 redis 性能测试的基本命令如下&#xff1a; redis-benchmark [option] [option value] 注意&#xff1a;该命令是在 redis 的目录下执行的&#xff0c;而不是 redis 客户端的内部指令。 实例 以下实例…

Linux Ncurses库部分函数使用说明

目录 1. initscr&#xff08;&#xff09;函数 2. endwin&#xff08;&#xff09;函数 3. curs_set()函数 4.noecho()函数 5. keypad()函数 6. start_color()函数 7.init_pair()函数 8.getch()函数 9.move()函数 10.addch()函数 11. refresh()函数 12.inch()函数…

CancellationToken:程序员必读的悬空指针灾难案例

在现代多线程和异步编程中&#xff0c;CancellationToken是一个常见的工具&#xff0c;用于及时、安全地取消长时间运行的任务或操作。然而&#xff0c;在处理此类对象时&#xff0c;如果不慎操作&#xff0c;也可能导致悬空指针这一严重问题&#xff0c;进而引发程序崩溃或者难…