2025-05-08🌱上海: ☀️ 🌡️+19°C 🌬️↖19km/h
# Part011 动态定时任务 Job
# 1. 为什么(Why)
# 1.1 项目背景
part011
模块实现了一个基于 Spring Boot 的动态定时任务管理框架,解决了企业应用中定时任务管理的常见问题。在实际业务系统中,定时任务广泛应用于数据同步、报表生成、缓存更新、数据清理等场景。传统的定时任务实现方式通常依赖于 @Scheduled
注解或 Quartz 配置,这些方式在任务创建后难以动态调整,每次修改都需要重新编译部署应用,无法适应业务需求的快速变化。本模块设计了一套灵活、可动态调整的定时任务管理框架,支持在运行时通过 API 接口动态创建、更新、删除和控制定时任务,大大提高了系统的灵活性和运维效率。
# 1.2 解决的问题
-
静态配置问题:传统定时任务创建后无法动态调整,每次修改都需要重新部署应用。
-
集中管理困难:系统中的定时任务散落在各处,缺乏统一的管理和监控机制。
-
运行状态控制:无法实时控制任务的启停状态,难以应对临时需求。
-
参数动态调整:无法在运行时调整任务的执行频率和执行方法。
-
任务同步问题:多实例部署环境下的任务执行同步问题,容易导致重复执行。
# 2. 如何实现(How)
# 2.1 项目结构
part011/ | |
├── src/ | |
│ ├── main/ | |
│ │ ├── java/ | |
│ │ │ └── com/ | |
│ │ │ └── muzi/ | |
│ │ │ └── part11/ | |
│ │ │ ├── comm/ # 通用组件 | |
│ │ │ ├── controller/ # 控制层 | |
│ │ │ │ └── JobController.java # 任务控制器 | |
│ │ │ ├── dto/ # 数据传输对象 | |
│ │ │ │ ├── Job.java # 任务DTO | |
│ │ │ │ ├── JobCreateRequest.java # 任务创建请求 | |
│ │ │ │ └── JobUpdateRequest.java # 任务更新请求 | |
│ │ │ ├── enums/ # 枚举类 | |
│ │ │ │ └── JobStatusEnum.java # 任务状态枚举 | |
│ │ │ ├── job/ # 任务核心包 | |
│ │ │ │ ├── JobChange.java # 任务变更对象 | |
│ │ │ │ ├── SpringJobConfiguration.java # 线程池配置 | |
│ │ │ │ ├── SpringJobRunManager.java # 任务运行管理器 | |
│ │ │ │ └── SpringJobTask.java # 任务执行器 | |
│ │ │ ├── mapper/ # MyBatis映射 | |
│ │ │ │ └── JobMapper.java # 任务数据访问 | |
│ │ │ ├── po/ # 持久化对象 | |
│ │ │ │ └── JobPO.java # 任务持久化对象 | |
│ │ │ ├── service/ # 服务层 | |
│ │ │ │ ├── JobService.java # 任务服务接口 | |
│ │ │ │ └── JobServiceImpl.java # 任务服务实现 | |
│ │ │ ├── test/ # 测试任务 | |
│ │ │ │ ├── Job1.java # 测试任务1 | |
│ │ │ │ ├── Job2.java # 测试任务2 | |
│ │ │ │ └── Job3.java # 测试任务3 | |
│ │ │ └── utils/ # 工具类 | |
│ │ └── resources/ # 配置文件 | |
│ └── test/ # 测试类 | |
└── pom.xml # Maven配置文件 |
# 2.2 关键技术点
# 2.2.1 案例分析:动态定时任务管理器的设计与实现
技术实现: 本模块设计了一套动态定时任务管理系统,核心是 SpringJobRunManager
类,它实现了 CommandLineRunner
接口,在应用启动后自动初始化并监控定时任务的变化:
@Component | |
public class SpringJobRunManager implements CommandLineRunner { | |
private static Logger logger = LoggerFactory.getLogger(SpringJobRunManager.class); | |
@Autowired | |
private ApplicationContext applicationContext; | |
@Autowired | |
private ThreadPoolTaskScheduler threadPoolTaskScheduler; | |
@Autowired | |
private JobService jobService; | |
// 系统中正在运行的 job 列表 | |
private Map<String, SpringJobTask> runningJobMap = new ConcurrentHashMap<>(); | |
/** | |
* springboot 应用启动后会回调 | |
*/ | |
@Override | |
public void run(String… args) throws Exception { | |
// 1、启动 job | |
this.startAllJob(); | |
// 2、监控 db 中 job 的变化,同步给 job 执行器去执行 | |
this.monitorDbJobChange(); | |
} | |
private void startAllJob() { | |
List<Job> jobList = this.jobService.getStartJobList(); | |
for (Job job : jobList) { | |
this.startJob(job); | |
} | |
} | |
/** | |
* 启动 job | |
*/ | |
private void startJob(Job job) { | |
SpringJobTask springJobTask = new SpringJobTask(job, this.applicationContext); | |
CronTrigger trigger = new CronTrigger(job.getCron()); | |
ScheduledFuture<?> scheduledFuture = this.threadPoolTaskScheduler.schedule(springJobTask, trigger); | |
springJobTask.setScheduledFuture(scheduledFuture); | |
runningJobMap.put(job.getId(), springJobTask); | |
logger.info("启动 job 成功:{}", JSONUtil.toJsonStr(job)); | |
} | |
/** | |
* 监控 db 中 job 的变化,每 5 秒监控一次 | |
*/ | |
private void monitorDbJobChange() { | |
this.threadPoolTaskScheduler.scheduleWithFixedDelay(this::jobChangeDispose, Duration.ofSeconds(5)); | |
} | |
} |
原理分析:
-
任务初始化机制
-
通过实现
CommandLineRunner
接口,在 Spring Boot 应用启动完成后自动加载已启动的任务 -
使用
ThreadPoolTaskScheduler
进行任务调度,支持 cron 表达式配置执行频率 -
使用
ConcurrentHashMap
存储运行中的任务,保证线程安全
-
-
任务变更监控
-
定时检查数据库中的任务配置变化
-
实现增量式任务同步,只处理变更的任务,减少系统资源消耗
-
通过对比任务属性判断任务是否变更,包括 cron 表达式、目标 Bean 和方法
-
-
任务生命周期管理
-
支持任务的完整生命周期:创建、启动、更新、停止、删除
-
对于更新操作,先停止旧任务再启动新任务,确保状态一致性
-
通过
ScheduledFuture
控制任务的执行状态,支持优雅停止
-
# 2.2.2 案例分析:任务执行器的设计与实现
技术实现: 本模块通过 SpringJobTask
类实现了任务的具体执行逻辑,核心是通过反射机制动态调用 Spring Bean 的方法:
public class SpringJobTask implements Runnable { | |
private static Logger logger = LoggerFactory.getLogger(SpringJobTask.class); | |
private Job job; | |
private ApplicationContext applicationContext; | |
private ScheduledFuture<?> scheduledFuture; | |
public SpringJobTask(Job job, ApplicationContext applicationContext) { | |
this.job = job; | |
this.applicationContext = applicationContext; | |
} | |
@Override | |
public void run() { | |
try { | |
// 通过反射执行方法 | |
Object bean = applicationContext.getBean(job.getBeanName()); | |
Method method = bean.getClass().getMethod(job.getBeanMethod()); | |
method.invoke(bean); | |
} catch (Exception e) { | |
logger.error("job执行异常,jobId:{},jobName:{},Exception:{}", job.getId(), job.getName(), e.getMessage(), e); | |
} | |
} | |
public Job getJob() { | |
return job; | |
} | |
public ScheduledFuture<?> getScheduledFuture() { | |
return scheduledFuture; | |
} | |
public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) { | |
this.scheduledFuture = scheduledFuture; | |
} | |
} |
原理分析:
-
反射调用机制
-
通过
ApplicationContext
获取目标 Spring Bean -
使用 Java 反射 API 获取目标方法
-
通过
invoke
方法执行目标方法,实现动态调用
-
-
任务状态管理
-
持有
ScheduledFuture
引用,用于控制任务的执行状态 -
支持任务的取消和中断操作
-
封装了任务的执行上下文,包括目标 Bean、方法和参数
-
-
异常处理
-
捕获并记录任务执行过程中的异常
-
确保单个任务的异常不影响其他任务执行
-
提供详细的错误日志,便于问题排查
-
# 2.2.3 案例分析:任务变更检测与同步
技术实现: 本模块实现了任务变更的检测与同步机制,核心是通过对比内存中的任务和数据库中的任务来确定增加、删除和更新的任务:
private JobChange getJobChange() { | |
// 新增的 job | |
List<Job> addJobList = new ArrayList<>(); | |
// 删除的 job | |
List<Job> deleteJobList = new ArrayList<>(); | |
// 更新的 job | |
List<Job> updateJobList = new ArrayList<>(); | |
// 从 db 中拿到所有 job,和目前内存中正在运行的所有 job 对比 | |
List<Job> startJobList = this.jobService.getStartJobList(); | |
for (Job job : startJobList) { | |
SpringJobTask springJobTask = runningJobMap.get(job.getId()); | |
if (springJobTask == null) { | |
addJobList.add(job); | |
} else { | |
//job 的执行规则变了 | |
if (jobIsChange(job, springJobTask.getJob())) { | |
updateJobList.add(job); | |
} | |
} | |
} | |
// 获取被删除的 job | |
Set<String> startJobIdList = CollUtils.convertSet(startJobList, Job::getId); | |
for (Map.Entry<String, SpringJobTask> springJobTaskEntry : runningJobMap.entrySet()) { | |
if (!startJobIdList.contains(springJobTaskEntry.getKey())) { | |
deleteJobList.add(springJobTaskEntry.getValue().getJob()); | |
} | |
} | |
// 返回 job 变更结果 | |
JobChange jobChange = new JobChange(); | |
jobChange.setAddJobList(addJobList); | |
jobChange.setUpdateJobList(updateJobList); | |
jobChange.setDeleteJobList(deleteJobList); | |
return jobChange; | |
} | |
private boolean jobIsChange(Job job1, Job job2) { | |
return !(Objects.equals(job1.getCron(), job2.getCron()) && | |
Objects.equals(job1.getBeanName(), job2.getBeanName()) && | |
Objects.equals(job1.getBeanMethod(), job2.getBeanMethod())); | |
} |
原理分析:
-
增量同步算法
-
将数据库中启用状态的任务与内存中运行的任务进行对比
-
通过任务 ID 匹配,识别新增和删除的任务
-
通过任务关键属性比对,识别更新的任务
-
-
变更检测机制
-
定义明确的任务变更判断标准:cron 表达式、目标 Bean、目标方法
-
只同步真正发生变化的任务,避免无谓的重启
-
使用集合操作优化比对效率,支持大量任务的快速比对
-
-
状态同步处理
-
对新增任务执行启动操作
-
对删除任务执行停止和移除操作
-
对更新任务执行先停止再启动的操作,确保状态一致
-
# 2.2.4 案例分析:RESTful API 接口设计
技术实现: 本模块通过 RESTful API 提供任务管理接口:
@RestController | |
public class JobController { | |
private static Logger logger = LoggerFactory.getLogger(JobController.class); | |
@Autowired | |
private JobService jobService; | |
@GetMapping("") | |
public List<Job> jobList() { | |
return jobService.getJobList(); | |
} | |
@PostMapping("/jobCreate") | |
public Job jobCreate(@RequestBody JobCreateRequest request) { | |
logger.info("jobCreate请求:{}", JSONUtil.toJsonStr(request)); | |
Job job = jobService.createJob(request); | |
return job; | |
} | |
@PostMapping("/jobUpdate") | |
public Job jobUpdate(@RequestBody JobUpdateRequest request) { | |
logger.info("jobUpdate请求:{}", JSONUtil.toJsonStr(request)); | |
Job job = jobService.updateJob(request); | |
return job; | |
} | |
@PostMapping("/jobDelete") | |
public boolean jobDelete(@RequestParam String id) { | |
logger.info("jobDelete请求,id:{}", id); | |
return jobService.deleteJob(id); | |
} | |
@PostMapping("/jobStart") | |
public boolean jobStart(@RequestParam String id) { | |
logger.info("jobStart请求,id:{}", id); | |
return jobService.startJob(id); | |
} | |
@PostMapping("/jobStop") | |
public boolean jobStop(@RequestParam String id) { | |
logger.info("jobStop请求,id:{}", id); | |
return jobService.stopJob(id); | |
} | |
} |
原理分析:
-
接口设计
-
提供完整的 CRUD 操作接口,支持任务的创建、查询、更新和删除
-
增加任务启停控制接口,实现运行时任务状态切换
-
遵循 RESTful 设计理念,使用 HTTP 方法表达操作语义
-
-
参数处理
-
使用专门的请求 DTO 对象封装创建和更新参数
-
通过
@RequestBody
注解自动解析 JSON 请求体 -
通过
@RequestParam
注解处理简单参数
-
-
响应处理
-
返回统一格式的任务对象或操作结果
-
日志记录所有 API 请求,便于问题排查
-
异常情况返回适当的 HTTP 状态码和错误信息
-
# 3. 技术点详解(Detail)
# 3.1 Spring 定时任务调度机制
本模块基于 Spring 的定时任务调度机制实现了动态任务管理:
-
ThreadPoolTaskScheduler 特点
-
Spring 提供的线程池调度器,支持复杂的调度需求
-
支持 cron 表达式、固定延迟、固定频率等多种调度方式
-
基于 JDK 的
ScheduledExecutorService
实现,性能优良
-
-
CronTrigger 实现原理
-
基于 cron 表达式计算下次执行时间
-
支持复杂的时间表达式,能满足多样化的调度需求
-
自动处理时区、夏令时等时间相关问题
-
-
ScheduledFuture 使用
-
通过
ScheduledFuture
控制任务的执行状态 -
支持任务的取消和中断
-
可以查询任务的完成状态和结果
-
# 3.2 任务变更检测与同步机制
本模块实现的任务变更检测与同步机制基于以下原理:
-
定时扫描策略
-
采用定时扫描模式,周期性检查数据库中的任务变化
-
扫描频率可配置,默认为 5 秒一次
-
平衡实时性和系统资源消耗
-
-
增量同步算法
-
只处理变化的任务,避免对所有任务重新调度
-
通过比对关键属性判断任务是否变更
-
分类处理新增、删除和更新的任务
-
-
状态一致性保证
-
任务操作的原子性保证,避免中间状态
-
使用
ConcurrentHashMap
确保多线程环境下的数据一致性 -
异常处理机制确保系统在任务变更失败时的稳定性
-
# 3.3 反射机制在动态调用中的应用
本模块使用反射机制实现了 Spring Bean 方法的动态调用:
-
动态方法调用
-
通过
ApplicationContext
获取 Spring Bean 实例 -
使用
Class.getMethod
获取方法对象 -
通过
Method.invoke
执行目标方法
-
-
安全性考虑
-
参数校验确保目标 Bean 和方法存在
-
异常捕获处理反射调用中可能出现的问题
-
日志记录调用过程,便于问题排查
-
-
性能优化
-
反射操作性能较低,但在定时任务场景下影响有限
-
任务执行频率通常较低,反射性能消耗可接受
-
可考虑添加方法缓存减少重复反射操作
-
# 3.4 数据库与内存同步策略
本模块实现了数据库配置与内存执行状态的同步策略:
-
数据模型设计
-
任务实体包含 ID、名称、cron 表达式、目标 Bean、目标方法等核心属性
-
使用状态字段标识任务的启用 / 禁用状态
-
添加创建时间、更新时间等审计字段
-
-
数据同步流程
-
应用启动时从数据库加载所有启用状态的任务
-
定时扫描数据库变更,实现增量同步
-
任务操作先更新数据库,再同步到内存状态
-
-
持久化策略
-
使用 MyBatis-Plus 实现数据访问层
-
采用乐观锁机制处理并发更新
-
使用事务确保数据一致性
-
# 4. 使用示例(Usage)
# 4.1 基本使用
// 创建定时任务执行类 | |
@Component | |
public class MyTask { | |
private static final Logger logger = LoggerFactory.getLogger(MyTask.class); | |
public void execute() { | |
logger.info("执行定时任务:" + LocalDateTime.now()); | |
// 任务具体逻辑 | |
} | |
} | |
// 通过 API 创建定时任务 | |
JobCreateRequest request = new JobCreateRequest(); | |
request.setName("数据同步任务"); | |
request.setCron("0 0/10 * * * ?"); // 每 10 分钟执行一次 | |
request.setBeanName("myTask"); // Spring Bean 名称 | |
request.setBeanMethod("execute"); // 要执行的方法 | |
request.setStatus(JobStatusEnum.START.getStatus()); // 创建后立即启动 | |
// 调用服务创建任务 | |
Job job = jobService.createJob(request); |
# 4.2 任务管理示例
// 更新任务 | |
JobUpdateRequest updateRequest = new JobUpdateRequest(); | |
updateRequest.setId("1001"); | |
updateRequest.setName("更新后的任务名称"); | |
updateRequest.setCron("0 0 2 * * ?"); // 每天凌晨 2 点执行 | |
updateRequest.setBeanName("myTask"); | |
updateRequest.setBeanMethod("execute"); | |
updateRequest.setStatus(JobStatusEnum.START.getStatus()); | |
Job updatedJob = jobService.updateJob(updateRequest); | |
// 停止任务 | |
boolean result = jobService.stopJob("1001"); | |
// 启动任务 | |
boolean result = jobService.startJob("1001"); | |
// 删除任务 | |
boolean result = jobService.deleteJob("1001"); |
# 4.3 API 调用示例
// 前端创建任务 | |
async function createJob() { | |
const response = await fetch('/jobCreate', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify({ | |
name: '数据同步任务', | |
cron: '0 0/10 * * * ?', | |
beanName: 'myTask', | |
beanMethod: 'execute', | |
status: 1 | |
}) | |
}); | |
const job = await response.json(); | |
console.log('创建的任务:', job); | |
} | |
// 前端获取任务列表 | |
async function getJobList() { | |
const response = await fetch('/job/list', { | |
method: 'GET', | |
headers: { | |
'Content-Type': 'application/json' | |
} | |
}); | |
const jobList = await response.json(); | |
console.log('任务列表:', jobList); | |
} | |
// 前端启动任务 | |
async function startJob(id) { | |
const response = await fetch(`/jobStart?id=${id}`, { | |
method: 'POST' | |
}); | |
const result = await response.json(); | |
console.log('启动结果:', result); | |
} |
# 4.4 自定义复杂任务示例
@Component | |
public class DataSyncTask { | |
@Autowired | |
private UserService userService; | |
@Autowired | |
private OrderService orderService; | |
// 用户数据同步任务 | |
public void syncUserData() { | |
// 复杂任务实现逻辑 | |
} | |
// 订单数据同步任务 | |
public void syncOrderData() { | |
// 复杂任务实现逻辑 | |
} | |
// 报表生成任务 | |
public void generateReport() { | |
// 复杂任务实现逻辑 | |
} | |
} | |
// 通过 API 创建多个不同的任务 | |
private void setupTasks() { | |
// 用户数据同步 - 每小时执行 | |
createTask("用户数据同步", "0 0 * * * ?", "dataSyncTask", "syncUserData"); | |
// 订单数据同步 - 每 10 分钟执行 | |
createTask("订单数据同步", "0 0/10 * * * ?", "dataSyncTask", "syncOrderData"); | |
// 报表生成 - 每天凌晨 2 点执行 | |
createTask("日报表生成", "0 0 2 * * ?", "dataSyncTask", "generateReport"); | |
} |
# 5. 总结与优化方向(Summary)
# 5.1 技术总结
本模块实现了一个灵活、功能完善的动态定时任务管理框架:
-
基于 Spring Boot 实现了定时任务的动态管理功能
-
支持任务的完整生命周期管理:创建、启动、更新、停止、删除
-
实现了数据库配置与运行状态的自动同步机制
-
通过 RESTful API 提供了任务管理接口,便于集成和使用
# 5.2 优化方向
-
分布式支持
-
增加分布式锁机制,避免集群环境下的任务重复执行
-
实现任务执行结果的分布式存储和共享
-
支持跨节点的任务调度和负载均衡
-
实现基于 Zookeeper 或 Redis 的任务协调机制
-
-
任务监控增强
-
实现任务执行历史记录和统计
-
添加任务执行耗时、成功率等性能指标
-
支持任务执行异常的告警机制
-
提供可视化的监控界面
-
-
参数支持
-
支持任务执行时传递参数
-
实现参数的动态配置和修改
-
支持更复杂的参数类型,如对象、集合等
-
增加参数验证和类型转换机制
-
-
安全性增强
-
添加任务访问权限控制
-
实现操作审计日志
-
增加敏感操作的多重验证
-
防止恶意任务的注入和执行
-
-
功能扩展
-
支持任务的依赖关系和执行链
-
实现任务的重试机制和失败处理策略
-
添加任务执行的超时控制
-
支持基于表达式的任务条件执行
-