springBoot自定義cron表達式注冊定時任務
一、原理
- 1、使用Spring自帶的TaskScheduler注冊任務
- 2、注冊后返回:ScheduledFuture,用于取消定時任務
- 3、注冊任務后不會馬上取消任務,所以將任務緩存。在需要取消任務的時候調用取消接口取消
- 4、cron表達式可以由前端或者后端生成。實現中會校驗cron表達式
public class TestScheduled {
/**
* 1、使用Spring自帶的TaskScheduler注冊任務
* 2、注冊后返回:ScheduledFuture,用于取消定時任務
*/
@Resource
private TaskScheduler taskScheduler;
public void registrarTask() {
//具體的任務Runnable(一般使用類實現Runnable接口)
Runnable taskRunnable = new Runnable() {
@Override
public void run() {
}
};
//cron表達式觸發器
CronTrigger trigger = new CronTrigger("0/5 * * * * ?");
//開啟定時任務的真正方法
ScheduledFuture<?> future = this.taskScheduler.schedule(taskRunnable, trigger);
//取消定時任務
future.cancel(true);
}
}
二、具體實現
1、配置任務調度器
- 作用:設置:核心線程數:可同時執行任務數;設置線程名稱前綴
- 可以不配置。不配置就默認使用spring自帶的
package com.cc.ssd.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/** TaskScheduler任務調度器配置類
* @since 2023/4/21 0021
* @author CC
**/
@Configuration
public class CronTaskConfig {
/**
* 任務調度器自定義配置
*/
@Bean(name = "taskScheduler")
public TaskScheduler taskScheduler() {
// 任務調度線程池
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定時任務執行線程池核心線程數:可同時執行4個任務
taskScheduler.setPoolSize(4);
taskScheduler.setRemoveOnCancelPolicy(true);
// 線程名稱前綴
taskScheduler.setThreadNamePrefix("Cs-ThreadPool-");
return taskScheduler;
}
}
2、定時任務注冊類
- 作用:緩存、注冊定時任務;還可以查詢、刪除定時任務
package com.cc.ssd.registrar;
import com.cc.ssd.task.CronTaskFuture;
import com.cc.ssd.task.CronTaskRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** 注冊定時任務:緩存定時任務、注冊定時任務到調度中心
* @author CC
**/
@Component
public class CronTaskRegistrar implements DisposableBean {
private static final Logger log = LoggerFactory.getLogger(CronTaskRegistrar.class);
/**
* 緩存任務
* key:具體的任務
* value:注冊定時任務后返回的ScheduledFuture
*/
private final Map<Runnable, CronTaskFuture> scheduledTasks = new ConcurrentHashMap<>(16);
/**
* 使用自定義的任務調度配置
*/
@Resource(name = "taskScheduler")
private TaskScheduler taskScheduler;
/** 獲取任務調度配置
* @return 任務調度配置
*/
public TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
/** 新增定時任務1
* 存在任務:刪除此任務,重新新增這個任務
* @param taskRunnable 執行的具體任務定義:taskRunnable 實現Runnable
* @param cronExpression cron表達式
*/
public void addCronTask(Runnable taskRunnable, String cronExpression) {
//驗證cron表達式是否正確
boolean validExpression = CronExpression.isValidExpression(cronExpression);
if (!validExpression) {
throw new RuntimeException("cron表達式驗證失敗!");
}
//獲取下次執行時間
CronExpression parse = CronExpression.parse(cronExpression);
LocalDateTime next = parse.next(LocalDateTime.now());
if (Objects.nonNull(next)) {
//定時任務下次執行的時間
String format = next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
log.info("定時任務下次執行的時間:{}", format);
}
//封裝成 CronTask(cron任務)
CronTask cronTask = new CronTask(taskRunnable, cronExpression);
this.addCronTask(cronTask);
}
/** 新增定時任務2
* @param cronTask :<p>CronTask用于在指定時間間隔內執行定時任務。</p>
* <p>它是通過CronTrigger來實現的,CronTrigger是一個基于cron表達式的觸發器,</p>
* <p>可以在指定的時間間隔內觸發任務執行。</p>
* @since 2023/4/21 0021
* @author CC
**/
private void addCronTask(CronTask cronTask) {
if (Objects.nonNull(cronTask)) {
//1有這個任務,先刪除這個任務。再新增
Runnable task = cronTask.getRunnable();
String taskId = null;
if (task instanceof CronTaskRunnable) {
taskId = ((CronTaskRunnable) task).getTaskId();
}
//通過任務id獲取緩存的任務,如果包含則刪除,然后新增任務
Runnable taskCache = this.getTaskByTaskId(taskId);
if (Objects.nonNull(taskCache) && this.scheduledTasks.containsKey(taskCache)) {
this.removeCronTaskByTaskId(taskId);
}
//2注冊定時任務到調度中心
CronTaskFuture scheduledFutureTask = this.scheduleCronTask(cronTask);
//3緩存定時任務
this.scheduledTasks.put(task, scheduledFutureTask);
//todo cc 4可以將任務保存到數據庫中……重新啟動程序然后加載數據庫中的任務到緩存中……
}
}
/** 注冊 ScheduledTask 定時任務
* @param cronTask cronTask
* @return 注冊定時任務后返回的 ScheduledFutureTask
*/
private CronTaskFuture scheduleCronTask(CronTask cronTask) {
//注冊定時任務后記錄的Future
CronTaskFuture scheduledTask = new CronTaskFuture();
//開啟定時任務的真正方法
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
// scheduledTask.setThreadLocal(this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()));
return scheduledTask;
}
/** 獲取任務列表
* @return
*/
public List<CronTaskRunnable> getScheduledTasks() {
List<CronTaskRunnable> tasks = new ArrayList<>();
Set<Runnable> keySet = scheduledTasks.keySet();
keySet.forEach(key -> {
CronTaskRunnable task = new CronTaskRunnable();
if (key instanceof CronTaskRunnable) {
CronTaskRunnable taskParent = (CronTaskRunnable) key;
BeanUtils.copyProperties(taskParent, task);
}
tasks.add(task);
});
return tasks.stream()
.sorted(Comparator.comparing(CronTaskRunnable::getTaskId))
.collect(Collectors.toList());
}
/** 根據任務id刪除單個定時任務
* @param taskId 任務id
*/
public void removeCronTaskByTaskId(String taskId) {
//通過任務id獲取任務
Runnable task = this.getTaskByTaskId(taskId);
//需要通過任務id獲取任務,然后再移除
CronTaskFuture cronTaskFuture = this.scheduledTasks.remove(task);
if (Objects.nonNull(cronTaskFuture)) {
cronTaskFuture.cancel();
}
}
/** 通過任務id獲取任務。未查詢到返回null
* @param taskId 任務id
* @return java.lang.Runnable
* @since 2023/4/21 0021
* @author CC
**/
private Runnable getTaskByTaskId(String taskId) {
Assert.notNull(taskId, "任務id不能為空!");
Set<Map.Entry<Runnable, CronTaskFuture>> entries = scheduledTasks.entrySet();
//根據任務id獲取該任務緩存
Map.Entry<Runnable, CronTaskFuture> rcf = entries.stream().filter(rf -> {
Runnable key = rf.getKey();
String taskId1 = null;
if (key instanceof CronTaskRunnable) {
taskId1 = ((CronTaskRunnable) key).getTaskId();
}
return taskId.equals(taskId1);
}).findAny().orElse(null);
if (Objects.nonNull(rcf)) {
return rcf.getKey();
}
return null;
}
/** 刪除所有的定時任務
* DisposableBean是Spring框架中的一個接口,它定義了一個destroy()方法,
* 用于在Bean銷毀時執行清理工作。
* 當一個Bean實現了DisposableBean接口時,
* Spring容器會在該Bean銷毀時自動調用destroy()方法,
* 以便進行一些清理工作,例如釋放資源等。
* 如果您的Bean需要在銷毀時執行一些清理工作,
* 那么實現DisposableBean接口是一個很好的選擇。
*/
@Override
public void destroy() {
//關閉所有定時任務
for (CronTaskFuture task : this.scheduledTasks.values()) {
task.cancel();
}
//清空緩存
this.scheduledTasks.clear();
log.info("取消所有定時任務!");
//todo cc 修改或刪除數據庫的任務
}
}
3、定時任務的執行結果ScheduledFuture
- 作用:CronTaskFuture類中使用的是ScheduledFuture對象來表示定時任務的執行結果。
package com.cc.ssd.task;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
/** CronTaskFuture類中使用的是ScheduledFuture對象來表示定時任務的執行結果。
* ——最后ps:也可以不要這個記錄類,直接緩存ScheduledFuture對象。
* 用來記錄單獨的Future、定時任務注冊任務后產生的
* @author CC
**/
public final class CronTaskFuture {
/** 每個線程一個副本
* 經過測試這里不能使用ThreadLocal
*/
// private static final ThreadLocal<ScheduledFuture<?>> THREAD_LOCAL = new ThreadLocal<>();
/** 最后ps:由于ScheduledFuture是線程安全的。這里不用 volatile 或者 ThreadLocal
* 注冊任務后返回的:ScheduledFuture 用于記錄并取消任務
* 這兩個都可以不使用。直接給future賦值
* volatile:線程之間可見:volatile用于實現多線程之間的可見性和一致性,保證數據的正確性。
* ThreadLocal:用于實現線程封閉,保證線程安全
* 使用建議:
* CronTaskFuture類中使用的是ScheduledFuture對象來表示定時任務的執行結果。
* ScheduledFuture對象是線程安全的,因此不需要使用volatile關鍵字來保證多線程同步。
* 如果需要在多線程中使用線程本地變量,可以使用ThreadLocal。
* 因此,建議在CronTaskFuture類中使用ScheduledFuture對象,而不是使用volatile或ThreadLocal。
* 另外,如果需要在Spring容器銷毀時執行一些清理操作,可以實現DisposableBean接口,并在destroy()方法中進行清理操作。
*/
public ScheduledFuture<?> future;
// public volatile ScheduledFuture<?> future;
// public void setThreadLocal(ScheduledFuture<?> future){
// THREAD_LOCAL.set(future);
// }
/**
* 取消當前定時任務
*/
public void cancel() {
try {
// ScheduledFuture<?> future = THREAD_LOCAL.get();
ScheduledFuture<?> future = this.future;
if (Objects.nonNull(future)) {
future.cancel(true);
}
} catch (Exception e) {
throw new RuntimeException("銷毀定時任務失敗!");
} finally {
// THREAD_LOCAL.remove();
}
}
}
4、具體的任務。
- 實現Runable接口
- 任務處理的方式按照自己的需求去實現即可
package com.cc.ssd.task;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/** 具體任務實現
* @author CC
* @since 2023/4/21 0021
*/
@Data
public class CronTaskRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(CronTaskRunnable.class);
/**
* 任務id(必須唯一)
*/
private String taskId;
/**
* 任務類型:自定義
*/
private Integer taskType;
/**
* 任務名字
*/
private String taskName;
/**
* 任務參數
*/
private Object[] params;
public CronTaskRunnable() {
}
public CronTaskRunnable(String taskId, Integer taskType, String taskName, Object... params) {
this.taskId = taskId;
this.taskType = taskType;
this.taskName = taskName;
this.params = params;
}
/** 執行任務
* @since 2023/4/21 0021
* @author CC
**/
@Override
public void run() {
long start = System.currentTimeMillis();
//具體的任務。
log.info("\n\t {}號.定時任務開始執行 - taskId:{},taskName:{},taskType:{},params:{}",
taskType, taskId, taskName, taskType, params);
//任務處理的方式:
//todo cc 1就在這里執行:模擬任務
//todo cc 2開啟策略模式,根據任務類型 調度不同的任務
//todo cc 3使用反射:傳來bean名字,方法名字,調用不同的任務
//todo cc 4開啟隊列,把要執行的任務放到隊列中,然后執行 —— 使用場景:每個任務執行很耗時的情況下使用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("\n\t {}號.任務執行完成 - 耗時:{},taskId:{},taskType:{}",
taskType, System.currentTimeMillis() - start, taskId, taskType);
}
}
5、測試Controller
package com.cc.ssd.web.controller;
import com.cc.ssd.registrar.CronTaskRegistrar;
import com.cc.ssd.task.CronTaskRunnable;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
* @author CC
* @since 2023/4/21 0021
*/
@RestController
@RequestMapping("/scheduled")
public class TestScheduledController {
@Resource
private CronTaskRegistrar cronTaskRegistrar;
/** 獲取任務列表
* @return java.util.List<com.cc.ssd.task.SchedulingRunnableTask>
* @since 2023/4/21 0021
* @author CC
**/
@GetMapping
public List<CronTaskRunnable> getScheduledTasks() {
return cronTaskRegistrar.getScheduledTasks();
}
/** 添加任務
* @param param param
* @return java.lang.String
* @since 2023/4/21 0021
* @author CC
**/
@PostMapping
public String addCronTask(@RequestBody Map<String, Object> param) {
//自己拿任務參數的邏輯:可以把每個任務保存到數據庫,重新啟動任務的同時,加載這些任務到任務調度中心
String taskId = (String) param.get("taskId");
Integer taskType = (Integer) param.get("taskType");
String taskName = (String) param.get("taskName");
Object params = param.get("params");
//添加任務參數
CronTaskRunnable task = new CronTaskRunnable(taskId, taskType, taskName, params);
//注冊任務:cron表達式,可以從傳入不一樣的
cronTaskRegistrar.addCronTask(task, "0/5 * * * * ?");
return "ok";
}
/** 根據任務id刪除定時任務
* @param taskId 任務id
* @return java.lang.String
* @since 2023/4/21 0021
* @author CC
**/
@DeleteMapping
public String removeCronTaskByTaskId(@RequestParam String taskId) {
cronTaskRegistrar.removeCronTaskByTaskId(taskId);
return "ok";
}
/** 刪除全部任務
* @return java.lang.String
* @since 2023/4/21 0021
* @author CC
**/
@DeleteMapping("/removeAll")
public String removeCronTask() {
cronTaskRegistrar.destroy();
return "ok";
}
}
6、最后效果
