您好,登錄后才能下訂單哦!
本篇內容介紹了“SpringBoot定時任務功能怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
項目中需要一個可以動態新增定時定時任務的功能,現在項目中使用的是xxl-job定時任務調度系統,但是經過一番對xxl-job功能的了解,發現xxl-job對項目動態新增定時任務,動態刪除定時任務的支持并不是那么好,所以需要自己手動實現一個定時任務的功能
1 技術選擇
Timer
or ScheduledExecutorService
這兩個都能實現定時任務調度,先看下Timer的定時任務調度
public class MyTimerTask extends TimerTask { private String name; public MyTimerTask(String name){ this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void run() { //task Calendar instance = Calendar.getInstance(); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime())); } } Timer timer = new Timer(); MyTimerTask timerTask = new MyTimerTask("NO.1"); //首次執行,在當前時間的1秒以后,之后每隔兩秒鐘執行一次 timer.schedule(timerTask,1000L,2000L);
在看下ScheduledThreadPoolExecutor的實現
//org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { //do something } },initialDelay,period, TimeUnit.HOURS);
兩個都能實現定時任務,那他們的區別呢,使用阿里p3c會給出建議和區別
多線程并行處理定時任務時,Timer運行多個TimeTask時,只要其中之一沒有捕獲拋出的異常,其它任務便會自動終止運行,使用ScheduledExecutorService則沒有這個問題。
從建議上來看,是一定要選擇ScheduledExecutorService
了,我們看看源碼看看為什么Timer
出現問題會終止執行
/** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); public Timer() { this("Timer-" + serialNumber()); } public Timer(String name) { thread.setName(name); thread.start(); }
新建對象時,我們看到開啟了一個線程,那么這個線程在做什么呢?一起看看
class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; /** * 每一件一個任務都是一個quene */ private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // 清除所有任務信息 } } } /** * The main timer loop. (See class comment.) */ private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } }
我們看到,執行了 mainLoop()
,里面是 while (true)
方法無限循環,獲取程序中任務對象中的時間和當前時間比對,相同就執行,但是一旦報錯,就會進入finally中清除掉所有任務信息。
這時候我們已經找到了答案,timer是在被實例化后,啟動一個線程,不間斷的循環匹配,來執行任務,他是單線程的,一旦報錯,線程就終止了,所以不會執行后續的任務,而ScheduledThreadPoolExecutor是多線程執行的,就算其中有一個任務報錯了,并不影響其他線程的執行。
2 使用ScheduledThreadPoolExecutor
從上面看,使用ScheduledThreadPoolExecutor
還是比較簡單的,但是我們要實現的更優雅一些,所以選擇 TaskScheduler
來實現
@Component public class CronTaskRegistrar implements DisposableBean { private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); @Autowired private TaskScheduler taskScheduler; public TaskScheduler getScheduler() { return this.taskScheduler; } public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } private void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { Set<Runnable> runnables = this.scheduledTasks.keySet(); Iterator it1 = runnables.iterator(); while (it1.hasNext()) { SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next(); Long taskId = schedulingRunnable.getTaskId(); SchedulingRunnable cancelRunnable = (SchedulingRunnable) task; if (taskId.equals(cancelRunnable.getTaskId())) { ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable); if (scheduledTask != null){ scheduledTask.cancel(); } } } } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() throws Exception { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } }
TaskScheduler
是本次功能實現的核心類,但是他是一個接口
public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
前面的代碼可以看到,我們在類中注入了這個類,但是他是接口,我們怎么知道是那個實現類呢,以往出現這種情況要在類上面加@Primany或者@Quality來執行實現的類,但是我們看到我的注入上并沒有標記,因為是通過另一種方式實現的
@Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定時任務執行線程池核心線程數 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
在spring初始化時就注冊了Bean TaskScheduler,而我們可以看到他的實現是ThreadPoolTaskScheduler,在網上的資料中有人說ThreadPoolTaskScheduler是TaskScheduler的默認實現類,其實不是,還是需要我們去指定,而這種方式,當我們想替換實現時,只需要修改配置類就行了,很靈活。
而為什么說他是更優雅的實現方式呢,因為他的核心也是通過ScheduledThreadPoolExecutor來實現的
public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); return this.scheduledExecutor; }
這次的實現過程中,我并沒有選擇xxl-job來進行實現,而是采用了TaskScheduler來實現,這也產生了一個問題,xxl-job是分布式的程序調度系統,當想要執行定時任務的應用使用xxl-job時,無論應用程序中部署多少個節點,xxl-job只會選擇其中一個節點作為定時任務執行的節點,從而不會產生定時任務在不同節點上同時執行,導致重復執行問題,而使用TaskScheduler來實現,就要考慮多節點重復執行問題。當然既然有問題,就有解決方案
· 方案一 將定時任務功能拆出來單獨部署,且只部署一個節點 · 方案二 使用redis setNx的形式,保證同一時間只有一個任務在執行
我選擇的是方案二來執行,當然還有一些方式也能保證不重復執行,這里就不多說了,一下是我的實現
public void executeTask(Long taskId) { if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) { log.info("已有執行中定時發送短信任務,本次不執行!"); return; }
“SpringBoot定時任務功能怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。