亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java Spring定時任務Quartz執行過程是什么

發布時間:2021-11-18 10:17:32 來源:億速云 閱讀:213 作者:iii 欄目:編程語言

這篇文章主要介紹“java Spring定時任務Quartz執行過程是什么”,在日常操作中,相信很多人在java Spring定時任務Quartz執行過程是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java Spring定時任務Quartz執行過程是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

一、前言介紹

在日常開發中經常會用到定時任務,用來;庫表掃描發送MQ、T+n賬單結算、緩存數據更新、秒殺活動狀態變更,等等。因為有了Spring的Schedule極大的方便了我們對這類場景的使用。那么,除了應用你還了解它多少呢;

  1. 默認初始化多少個任務線程

  2. JobStore有幾種實現,你平時用的都是哪個

  3. 一個定時任務的執行流程簡述下

二、案例工程

為了更好的做源碼分析,我們將平時用的定時任務服務單獨抽離出來。

 1itstack-demo-code-schedule
2└── src
3    ├── main
4    │   ├── java
5    │   │   └── org.itstack.demo
6    │   │       ├── DemoTask.java
7    │   │       └── JobImpl.java   
8    │   └── resources    
9    │       ├── props    
10    │       │   └── config.properties
11    │       ├── spring
12    │       │   └── spring-config-schedule-task.xml
13    │       ├── logback.xml
14    │       └── spring-config.xml
15    └── test
16         └── java
17             └── org.itstack.demo.test
18                 ├── ApiTest.java
19                 ├── MyQuartz.java                
20                 └── MyTask.java

三、環境配置

  1. JDK 1.8

  2. IDEA 2019.3.1

  3. Spring 4.3.24.RELEASE

  4. quartz 2.3.2 {不同版本略有代碼差異}

四、源碼分析

1<dependency>
2    <groupId>org.quartz-scheduler</groupId>
3    <artifactId>quartz</artifactId>
4    <version>2.3.2</version>
5</dependency>

依賴于Spring版本升級quartz選擇2.3.2,同時如果你如本文案例中所示使用xml配置任務。那么會有如下更改;

Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean

1 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
2     <property name="jobDetail" ref="taskHandler"/>
3     <property name="cronExpression" value="0/5 * * * * ?"/>
4 </bean>

Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean

1 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
2     <property name="jobDetail" ref="taskHandler"/>
3     <property name="cronExpression" value="0/5 * * * * ?"/>
4 </bean>

在正式分析前,可以看下quartz的默認配置,很多初始化動作都要從這里取得參數,同樣你可以配置自己的配置文件。例如,當你的任務很多時,默認初始化的10個線程組不滿足你的業務需求,就可以按需調整。

quart.properties

 1# Default Properties file for use by StdSchedulerFactory
2# to create a Quartz Scheduler Instance, if a different
3# properties file is not explicitly specified.
4#
5
6org.quartz.scheduler.instanceName: DefaultQuartzScheduler
7org.quartz.scheduler.rmi.export: false
8org.quartz.scheduler.rmi.proxy: false
9org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
10
11org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
12org.quartz.threadPool.threadCount: 10
13org.quartz.threadPool.threadPriority: 5
14org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
15
16org.quartz.jobStore.misfireThreshold: 60000
17
18org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

1. 從一個簡單案例開始

平時我們使用Schedule基本都是注解或者xml配置文件,但是為了可以更簡單的分析代碼,我們從一個簡單的Demo入手,放到main函數中。

DemoTask.java & 定義一個等待被執行的任務

1public class DemoTask {
2
3    private Logger logger = LoggerFactory.getLogger(DemoTask.class);
4
5    public void execute() throws Exception{
6        logger.info("定時處理用戶信息任務:0/5 * * * * ?");
7    }
8
9}

MyTask.java & 測試類,將配置在xml中的代碼抽離出來

 1public class MyTask {
2
3    public static void main(String[] args) throws Exception {
4
5        DemoTask demoTask = new DemoTask();
6
7        // 定義了;執行的內容
8        MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
9        methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
10        methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
11        methodInvokingJobDetailFactoryBean.setConcurrent(true);
12        methodInvokingJobDetailFactoryBean.setName("demoTask");
13        methodInvokingJobDetailFactoryBean.afterPropertiesSet();
14
15        // 定義了;執行的計劃
16        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
17        cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
18        cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
19        cronTriggerFactoryBean.setName("demoTask");
20        cronTriggerFactoryBean.afterPropertiesSet();
21
22        // 實現了;執行的功能
23        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
24        schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
25        schedulerFactoryBean.setAutoStartup(true);
26        schedulerFactoryBean.afterPropertiesSet();
27
28        schedulerFactoryBean.start();
29
30        // 暫停住
31        System.in.read();
32
33    }
34
35}

如果一切順利,那么會有如下結果:

 12020-01-04 10:47:16.369 [main] INFO  org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor
22020-01-04 10:47:16.421 [main] INFO  org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
32020-01-04 10:47:16.422 [main] INFO  org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.
42020-01-04 10:47:16.423 [main] INFO  org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.
52020-01-04 10:47:16.424 [main] INFO  org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
6  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
7  NOT STARTED.
8  Currently in standby mode.
9  Number of jobs executed: 0
10  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
11  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
12
132020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.
142020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.2
152020-01-04 10:47:16.426 [main] INFO  org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010
162020-01-04 10:47:16.651 [main] INFO  org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.
17一月 04, 2020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler
18信息: Starting Quartz Scheduler now
192020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO  org.itstack.demo.DemoTask[11] - 定時處理用戶信息任務:0/5 * * * * ?
202020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO  org.itstack.demo.DemoTask[11] - 定時處理用戶信息任務:0/5 * * * * ?
212020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO  org.itstack.demo.DemoTask[11] - 定時處理用戶信息任務:0/5 * * * * ?
222020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO  org.itstack.demo.DemoTask[11] - 定時處理用戶信息任務:0/5 * * * * ?
232020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO  org.itstack.demo.DemoTask[11] - 定時處理用戶信息任務:0/5 * * * * ?
24
25Process finished with exit code -1
26

2. 定義執行內容(MethodInvokingJobDetailFactoryBean)

1// 定義了;執行的內容
2MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
3methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
4methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
5methodInvokingJobDetailFactoryBean.setConcurrent(true);
6methodInvokingJobDetailFactoryBean.setName("demoTask");
7methodInvokingJobDetailFactoryBean.afterPropertiesSet();

這塊內容主要將我們的任務體(即待執行任務DemoTask)交給MethodInvokingJobDetailFactoryBean管理,首先設置必要信息;

  • targetObject:目標對象bean,也就是demoTask

  • targetMethod:目標方法name,也就是execute

  • concurrent:是否并行執行,非并行執行任務,如果上一個任務沒有執行完,下一刻不會執行

  • name:xml配置非必傳,源碼中可以獲取beanName

最后我們通過手動調用 afterPropertiesSet() 來模擬初始化。如果我們的類是交給 Spring 管理的,那么在實現了 InitializingBean 接口的類,在類配置信息加載后會自動執行 afterPropertiesSet() 。一般實現了 InitializingBean 接口的類,同時也會去實現 FactoryBean 接口,因為這個接口實現后就可以通過 T getObject() 獲取自己自定義初始化的類。這也常常用在一些框架開發中。

MethodInvokingJobDetailFactoryBean.afterPropertiesSet()

 1public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
2    prepare();
3    // Use specific name if given, else fall back to bean name.
4    String name = (this.name != null ? this.name : this.beanName);
5    // Consider the concurrent flag to choose between stateful and stateless job.
6    Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
7    // Build JobDetail instance.
8    JobDetailImpl jdi = new JobDetailImpl();
9    jdi.setName(name);
10    jdi.setGroup(this.group);
11    jdi.setJobClass((Class) jobClass);
12    jdi.setDurability(true);
13    jdi.getJobDataMap().put("methodInvoker", this);
14    this.jobDetail = jdi;
15
16    postProcessJobDetail(this.jobDetail);
17}
  • 源碼168行: 根據是否并行執行選擇任務類,這兩個類都是MethodInvokingJobDetailFactoryBean的內部類,非并行執行的StatefulMethodInvokingJob只是繼承MethodInvokingJob添加了標記注解。

  • 源碼171行: 創建JobDetailImpl,添加任務明細信息,注意這類的jdi.setJobClass((Class) jobClass)實際就是MethodInvokingJob。MethodInvokingJob也是我們最終要反射調用執行的內容。

  • 源碼177行: 初始化任務后賦值給this.jobDetail = jdi,也就是最終的類對象

    MethodInvokingJobDetailFactoryBean.getObject()

    1@Override
    2public JobDetail getObject() {
    3    return this.jobDetail;
    4}
  • 源碼:220行: 獲取對象時返回 this.jobDetail,這也就解釋了為什么 MethodInvokingJobDetailFactoryBean 初始化后直接賦值給了一個 JobDetail ;

    java Spring定時任務Quartz執行過程是什么
    微信公眾號:bugstack蟲洞棧 & Schedule.xml

3. 定義執行計劃(CronTriggerFactoryBeann)

1// 定義了;執行的計劃
2CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
3cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
4cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
5cronTriggerFactoryBean.setName("demoTask");
6cronTriggerFactoryBean.afterPropertiesSet();

這一塊主要定義任務的執行計劃,并將任務執行內容交給 CronTriggerFactoryBean 管理,同時設置必要信息;

  • jobDetail:設置任務體,xml 中可以直接將對象賦值,硬編碼中設置執行的 JobDetail 對象信息。也就是我們上面設置的 JobDetailImpl ,通過 getObject() 獲取出來。

  • cronExpression:計劃表達式;秒、分、時、日、月、周、年

CronTriggerFactoryBean.afterPropertiesSet()

 1@Override
2public void afterPropertiesSet() throws ParseException {
3
4    // ... 校驗屬性信息
5
6    CronTriggerImpl cti = new CronTriggerImpl();
7    cti.setName(this.name);
8    cti.setGroup(this.group);
9    if (this.jobDetail != null) {
10        cti.setJobKey(this.jobDetail.getKey());
11    }
12    cti.setJobDataMap(this.jobDataMap);
13    cti.setStartTime(this.startTime);
14    cti.setCronExpression(this.cronExpression);
15    cti.setTimeZone(this.timeZone);
16    cti.setCalendarName(this.calendarName);
17    cti.setPriority(this.priority);
18    cti.setMisfireInstruction(this.misfireInstruction);
19    cti.setDescription(this.description);
20    this.cronTrigger = cti;
21}
  • 源碼237行: 創建觸發器 CronTriggerImpl 并設置相關屬性信息

  • 源碼245行: 生成執行計劃類 cti.setCronExpression(this.cronExpression);

    1public void setCronExpression(String cronExpression) throws ParseException {
    2    TimeZone origTz = getTimeZone();
    3    this.cronEx = new CronExpression(cronExpression);
    4    this.cronEx.setTimeZone(origTz);
    5}

     

    CronExpression.java & 解析Cron表達式

     1protected void buildExpression(String expression) throws ParseException {
    2    expressionParsed = true;
    3    try {
        // ... 初始化 TreeSet xxx = new TreeSet&lt;Integer&gt;();
    4
    5    int exprOn = SECOND;
    6    StringTokenizer exprsTok = new StringTokenizer(expression, " \t",
    7            false);
    8
    9    while (exprsTok.hasMoreTokens() &amp;&amp; exprOn &lt;= YEAR) {
    10        String expr = exprsTok.nextToken().trim();
    11
    12        // ... 校驗DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
    13
    14        StringTokenizer vTok = new StringTokenizer(expr, ",");
    15        while (vTok.hasMoreTokens()) {
    16            String v = vTok.nextToken();
    17            storeExpressionVals(0, v, exprOn);
    18        }
    19        exprOn++;
    20    }
    21
    22    // ... 校驗DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
    23
    24} catch (ParseException pe) {
    25    throw pe;
    26} catch (Exception e) {
    27    throw new ParseException("Illegal cron expression format ("
    28            + e.toString() + ")", 0);
    29}
    30
    }
    • Cron表達式有7個字段,CronExpression 把7個字段解析為7個 TreeSet 對象。

    • 填充TreeSet對象值的時候,表達式都會轉換為起始值、結束值和增量的計算模式,然后計算出匹配的值放進TreeSet對象

CronTriggerFactoryBean.getObject()

1@Override
2public CronTrigger getObject() {
3    return this.cronTrigger;
4}
  • 源碼257行: 獲取對象時返回 this.cronTrigger ,也就是 CronTriggerImpl 對象

4. 調度執行計劃(SchedulerFactoryBean)

1// 調度了;執行的計劃(scheduler)
2SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
3schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
4schedulerFactoryBean.setAutoStartup(true);
5schedulerFactoryBean.afterPropertiesSet();
6
7schedulerFactoryBean.start();

這一部分如名字一樣調度工廠,相當于一個指揮官,可以從全局做調度,比如監聽哪些trigger已經ready、分配線程等等,同樣也需要設置必要的屬性信息;

  • triggers:按需可以設置多個觸發器,本文設置了一個 cronTriggerFactoryBean.getObject() 也就是 CronTriggerImpl 對象

  • autoStartup:默認是否自動啟動任務,默認值為true

這個過程較長包括:調度工廠、線程池、注冊任務等等,整體核心加載流程如下;

java Spring定時任務Quartz執行過程是什么  
微信公眾號:bugstack蟲洞棧 & 調度工程初始化流程
  • 整個加載過程較長,抽取部分核心代碼塊進行分析,其中包括的類;

    • StdScheduler

    • StdSchedulerFactory

    • SimpleThreadPool

    • QuartzScheduler

    • QuartzSchedulerThread

    • RAMJobStore

    • CronTriggerImpl

    • CronExpression

SchedulerFactoryBean.afterPropertiesSet()

 1public void afterPropertiesSet() throws Exception {
2    if (this.dataSource == null && this.nonTransactionalDataSource != null) {
3        this.dataSource = this.nonTransactionalDataSource;
4    }
5    if (this.applicationContext != null && this.resourceLoader == null) {
6        this.resourceLoader = this.applicationContext;
7    }
8    // Initialize the Scheduler instance...
9    this.scheduler = prepareScheduler(prepareSchedulerFactory());
10    try {
11        registerListeners();
12        registerJobsAndTriggers();
13    }
14    catch (Exception ex) {
15        try {
16            this.scheduler.shutdown(true);
17        }
18        catch (Exception ex2) {
19            logger.debug("Scheduler shutdown exception after registration failure", ex2);
20        }
21        throw ex;
22    }
23}
  • 源碼474行: 為調度器做準備工作 prepareScheduler(prepareSchedulerFactory()) ,依次執行如下;

    11)初始化threadPool(線程池):開發者可以通過org.quartz.threadPool.class配置指定使用哪個線程池類,比如SimpleThreadPool。
    22)初始化jobStore(任務存儲方式):開發者可以通過org.quartz.jobStore.class配置指定使用哪個任務存儲類,比如RAMJobStore。
    33)初始化dataSource(數據源):開發者可以通過org.quartz.dataSource配置指定數據源詳情,比如哪個數據庫、賬號、密碼等。
    44)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
    55)初始化threadExecutor(線程執行器):默認為DefaultThreadExecutor;
    66)創建工作線程:根據配置創建N個工作thread,執行start()啟動thread,并將N個thread順序add進threadPool實例的空閑線程列表availWorkers中;
    77)創建調度器線程:創建QuartzSchedulerThread實例,并通過threadExecutor.execute(實例)啟動調度器線程;
    88)創建調度器:創建StdScheduler實例,將上面所有配置和引用組合進實例中,并將實例存入調度器池中
    1. SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)

    2. SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);

    3. SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)

    4. Scheduler newScheduler = schedulerFactory.getScheduler();

    5. StdSchedulerFactory.getScheduler();

    6. sched = instantiate(); 包括一系列核心操作;

  • 源碼477行: 調用父類 SchedulerAccessor.registerJobsAndTriggers() 注冊任務和觸發器

    1for (Trigger trigger : this.triggers) {
    2    addTriggerToScheduler(trigger);
    3}

SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor 是SchedulerFactoryBean的父類

 1private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
2    boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null);
3    if (triggerExists && !this.overwriteExistingJobs) {
4        return false;
5    }
6    // Check if the Trigger is aware of an associated JobDetail.
7    JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
8    if (triggerExists) {
9        if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
10            this.jobDetails.add(jobDetail);
11        }
12        try {
13            getScheduler().rescheduleJob(trigger.getKey(), trigger);
14        }
15        catch (ObjectAlreadyExistsException ex) {
16            if (logger.isDebugEnabled()) {
17                logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
18                        "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
19            }
20        }
21    }
22    else {
23        try {
24            if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&
25                    (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
26                getScheduler().scheduleJob(jobDetail, trigger);
27                this.jobDetails.add(jobDetail);
28            }
29            else {
30                getScheduler().scheduleJob(trigger);
31            }
32        }
33        catch (ObjectAlreadyExistsException ex) {
34            if (logger.isDebugEnabled()) {
35                logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
36                        "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
37            }
38            if (this.overwriteExistingJobs) {
39                getScheduler().rescheduleJob(trigger.getKey(), trigger);
40            }
41        }
42    }
43    return true;
44}
  • 源碼299行: addJobToScheduler(jobDetail) 一直會調用到 RAMJobStore 進行存放任務信息到 HashMap(100)

     1public void storeJob(JobDetail newJob,
    2    boolean replaceExisting) throws ObjectAlreadyExistsException {
    3    JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
    4    boolean repl = false;
    5    synchronized (lock) {
    6        if (jobsByKey.get(jw.key) != null) {
    7            if (!replaceExisting) {
    8                throw new ObjectAlreadyExistsException(newJob);
    9            }
    10            repl = true;
    11        }
    12        if (!repl) {
    13            // get job group
    14            HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
    15            if (grpMap == null) {
    16                grpMap = new HashMap<JobKey, JobWrapper>(100);
    17                jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
    18            }
    19            // add to jobs by group
    20            grpMap.put(newJob.getKey(), jw);
    21            // add to jobs by FQN map
    22            jobsByKey.put(jw.key, jw);
    23        } else {
    24            // update job detail
    25            JobWrapper orig = jobsByKey.get(jw.key);
    26            orig.jobDetail = jw.jobDetail; // already cloned
    27        }
    28    }
    29}
  • 初始化線程組;

    SimpleThreadPool.initialize() & 這里的count是默認配置中的數量,可以更改

    1 // create the worker threads and start them
    2 Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
    3 while(workerThreads.hasNext()) {
    4     WorkerThread wt = workerThreads.next();
    5     wt.start();
    6     availWorkers.add(wt);
    7 }
    • prepareScheduler

    • createScheduler

    • schedulerFactory

    • StdSchedulerFactory.getScheduler()

    • getScheduler()->instantiate()

    • 源碼1323行: tp.initialize();

5. 啟動定時任務

案例中使用硬編碼方式調用 schedulerFactoryBean.start() 啟動線程服務。線程的協作通過Object sigLock來實現,關于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock喚醒的是只有線程QuartzSchedulerThread。核心流程如下;

java Spring定時任務Quartz執行過程是什么  
微信公眾號:bugstack蟲洞棧 & 調度啟動流程

這個啟動過程中,核心的代碼類,如下;

  • StdScheduler

  • QuartzScheduler

  • QuartzSchedulerThread

  • ThreadPool

  • RAMJobStore

  • CronTriggerImpl

  • JobRunShellFactory

QuartzScheduler.start() & 啟動

 1public void start() throws SchedulerException {
2
3    if (shuttingDown|| closed) {
4        throw new SchedulerException(
5                "The Scheduler cannot be restarted after shutdown() has been called.");
6    }
7
8    // QTZ-212 : calling new schedulerStarting() method on the listeners
9    // right after entering start()
10    notifySchedulerListenersStarting();
11
12    if (initialStart == null) {
13        initialStart = new Date();
14        this.resources.getJobStore().schedulerStarted();            
15        startPlugins();
16    } else {
17        resources.getJobStore().schedulerResumed();
18    }
19
20    // 喚醒線程
21    schedThread.togglePause(false);
22
23    getLog().info(
24            "Scheduler " + resources.getUniqueIdentifier() + " started.");
25
26    notifySchedulerListenersStarted();
27}

QuartzSchedulerThread.run() & 執行過程

 1@Override
2public void run() {
3    int acquiresFailed = 0;
4
5    // 只有調用了halt()方法,才會退出這個死循環
6    while (!halted.get()) {
7        try {
8
9            // 一、如果是暫停狀態,則循環超時等待1000毫秒
10
11            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
12
13            // 阻塞直到有空閑的線程可用并返回可用的數量
14            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
15            if(availThreadCount > 0) {
16
17                List<OperableTrigger> triggers;
18                long now = System.currentTimeMillis();
19                clearSignaledSchedulingChange();
20
21                try {
22                    // 二、獲取acquire狀態的Trigger列表,也就是即將執行的任務
23                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
24                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
25                    acquiresFailed = 0;
26                    if (log.isDebugEnabled())
27                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
28                } catch(){//...}
29
30                if (triggers != null && !triggers.isEmpty()) {
31
32                    // 三:獲取List第一個Trigger的下次觸發時刻
33                    long triggerTime = triggers.get(0).getNextFireTime().getTime();
34
35                    // 四:獲取任務觸發集合
36                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
37
38                    // 五:設置Triggers為'executing'狀態
39                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
40
41                    // 六:創建JobRunShell
42                    qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
43
44                    // 七:執行Job
45                    qsRsrcs.getThreadPool().runInThread(shell)
46
47                    continue; // while (!halted)
48                }
49            } else { // if(availThreadCount > 0)
50                // should never happen, if threadPool.blockForAvailableThreads() follows con
51                continue; // while (!halted)
52            }
53
54
55        } catch(RuntimeException re) {
56            getLog().error("Runtime error occurred in main trigger firing loop.", re);
57        }
58    }
59
60    qs = null;
61    qsRsrcs = null;
62}
  • 源碼391行: 創建JobRunShell,JobRunShell實例在initialize()方法就會把包含業務邏輯類的JobDetailImpl設置為它的成員屬性,為后面執行業務邏輯代碼做準備。執行業務邏輯代碼在runInThread(shell)方法里面。

    QuartzSchedulerThread.run() & 部分代碼

    1JobRunShell shell = null;
    2try {
    3    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    4    shell.initialize(qs);
    5} catch (SchedulerException se) {
    6    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
    7    continue;
    8}
  • 源碼398行: qsRsrcs.getThreadPool().runInThread(shell)

    SimpleThreadPool.runInThread

     1// 保存所有WorkerThread的集合
    2private List<WorkerThread> workers;
    3// 空閑的WorkerThread集合
    4private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    5// 任務的WorkerThread集合
    6private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
    7
    8/**
    9 * 維護workers、availWorkers和busyWorkers三個列表數據
    10 * 有任務需要一個線程出來執行:availWorkers.removeFirst();busyWorkers.add()
    11 * 然后調用WorkThread.run(runnable)方法
    12 */
    13public boolean runInThread(Runnable runnable) {
    14    if (runnable == null) {
    15        return false;
    16    }
    synchronized (nextRunnableLock) {
    17
    18    handoffPending = true;
    19
    20    // Wait until a worker thread is available
    21    while ((availWorkers.size() &lt; 1) &amp;&amp; !isShutdown) {
    22        try {
    23            nextRunnableLock.wait(500);
    24        } catch (InterruptedException ignore) {
    25        }
    26    }
    27
    28    if (!isShutdown) {
    29        WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
    30        busyWorkers.add(wt);
    31        wt.run(runnable);
    32    } else {
    33        // If the thread pool is going down, execute the Runnable
    34        // within a new additional worker thread (no thread from the pool).
    35
    36        WorkerThread wt = new WorkerThread(this, threadGroup,
    37                "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
    38        busyWorkers.add(wt);
    39        workers.add(wt);
    40        wt.start();
    41    }
    42    nextRunnableLock.notifyAll();
    43    handoffPending = false;
    44}
    45
    46return true;
    47
    }
  • 源碼428行: WorkerThread ,是一個內部類,主要是賦值并喚醒lock對象的等待線程隊列

    WorkerThread.run(Runnable newRunnable)

    1public void run(Runnable newRunnable) {
    2    synchronized(lock) {
    3        if(runnable != null) {
    4            throw new IllegalStateException("Already running a Runnable!");
    5        }
    6        runnable = newRunnable;
    7        lock.notifyAll();
    8    }
    9}
  • 源碼561行: WorkerThread 的run方法,方法執行lock.notifyAll()后,對應的WorkerThread就會來到run()方法。到這!接近曙光了!終于來到了執行業務的execute()方法的倒數第二步,runnable對象是一個JobRunShell對象,下面在看JobRunShell.run()方法。

    WorkerThread.run()

     1@Override
    2public void run() {
    3    boolean ran = false;
    while (run.get()) {
    4    try {
    5        synchronized(lock) {
    6            while (runnable == null &amp;&amp; run.get()) {
    7                lock.wait(500);
    8            }
    9            if (runnable != null) {
    10                ran = true;
    11                // 啟動真正執行的內容,runnable就是JobRunShell
    12                runnable.run();
    13            }
    14        }
    15    } cache(){//...}
    16}
    17//if (log.isDebugEnabled())
    18try {
    19    getLog().debug("WorkerThread is shut down.");
    20} catch(Exception e) {
    21    // ignore to help with a tomcat glitch
    22}
    23
    }

JobRunShell.run() & 從上面WorkerThread.run(),調用到這里執行

 1public void run() {
2    qs.addInternalSchedulerListener(this);
3
4    try {
5        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
6        JobDetail jobDetail = jec.getJobDetail();
7
8        do {
9            // ...
10
11            long startTime = System.currentTimeMillis();
12            long endTime = startTime;
13
14            // execute the job
15            try {
16                log.debug("Calling execute on job " + jobDetail.getKey());
17
18                // 執行業務代碼,也就是我們的task
19                job.execute(jec);
20
21                endTime = System.currentTimeMillis();
22            } catch (JobExecutionException jee) {
23                endTime = System.currentTimeMillis();
24                jobExEx = jee;
25                getLog().info("Job " + jobDetail.getKey() +
26                        " threw a JobExecutionException: ", jobExEx);
27            } catch (Throwable e) {
28                endTime = System.currentTimeMillis();
29                getLog().error("Job " + jobDetail.getKey() +
30                        " threw an unhandled Exception: ", e);
31                SchedulerException se = new SchedulerException(
32                        "Job threw an unhandled exception.", e);
33                qs.notifySchedulerListenersError("Job ("
34                        + jec.getJobDetail().getKey()
35                        + " threw an exception.", se);
36                jobExEx = new JobExecutionException(se, false);
37            }
38
39            jec.setJobRunTime(endTime - startTime);
40
41            // 其他代碼
42        } while (true);
43
44    } finally {
45        qs.removeInternalSchedulerListener(this);
46    }
47}

QuartzJobBean.execte() & 繼續往下走

 1public final void execute(JobExecutionContext context) throws JobExecutionException {
2    try {
3        BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
4        MutablePropertyValues pvs = new MutablePropertyValues();
5        pvs.addPropertyValues(context.getScheduler().getContext());
6        pvs.addPropertyValues(context.getMergedJobDataMap());
7        bw.setPropertyValues(pvs, true);
8    }
9    catch (SchedulerException ex) {
10        throw new JobExecutionException(ex);
11    }
12    executeInternal(context);
13}

MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)

 1protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
2    try {
3        // 反射執行業務代碼
4        context.setResult(this.methodInvoker.invoke());
5    }
6    catch (InvocationTargetException ex) {
7        if (ex.getTargetException() instanceof JobExecutionException) {
8            // -> JobExecutionException, to be logged at info level by Quartz
9            throw (JobExecutionException) ex.getTargetException();
10        }
11        else {
12            // -> "unhandled exception", to be logged at error level by Quartz
13            throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
14        }
15    }
16    catch (Exception ex) {
17        // -> "unhandled exception", to be logged at error level by Quartz
18        throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
19    }
20}

到此,關于“java Spring定時任務Quartz執行過程是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

永泰县| 蓝田县| 古交市| 密云县| 花莲市| 洛宁县| 昭平县| 盐池县| 阿城市| 大悟县| 广饶县| 龙川县| 九寨沟县| 灵山县| 湘潭市| 濮阳县| 嘉荫县| 托克逊县| 饶平县| 双辽市| 青海省| 许昌市| 阜平县| 甘洛县| 黔东| 东丰县| 宁津县| 高陵县| 灌云县| 郯城县| 德庆县| 北流市| 原阳县| 南平市| 九江县| 安徽省| 嘉荫县| 黄浦区| 鸡泽县| 福海县| 西峡县|