温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

java Spring定时任务Quartz执行过程是什么

发布时间:2021-11-18 10:17:32 阅读:215 作者:iii 栏目:编程语言
Java开发者专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要介绍“java Spring定时任务Quartz执行过程是什么”,在日常操作中,相信很多人在java Spring定时任务Quartz执行过程是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java Spring定时任务Quartz执行过程是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、前言介绍

在日常开发中经常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。因为有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;

  1. 默认初始化多少个任务线程

  2. JobStore有几种实现,你平时用的都是哪个

  3. 一个定时任务的执行流程简述下

二、案例工程

为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。

 1itstack-demo-code-schedule 2└── src 3    ├── main 4    │   ├── java 5    │   │   └── org.itstack.demo 6    │   │       ├── DemoTask.java 7    │   │       └── JobImpl.java    8    │   └── resources     9    │       ├── props    10    │       │   └── config.properties11    │       ├── spring12    │       │   └── spring-config-schedule-task.xml13    │       ├── logback.xml14    │       └── spring-config.xml15    └── test16         └── java17             └── org.itstack.demo.test18                 ├── ApiTest.java19                 ├── MyQuartz.java                20                 └── MyTask.java

三、环境配置

  1. JDK 1.8

  2. IDEA 2019.3.1

  3. Spring 4.3.24.RELEASE

  4. quartz 2.3.2 {不同版本略有代码差异}

四、源码分析

1<dependency>2    <groupId>org.quartz-scheduler</groupId>3    <artifactId>quartz</artifactId>4    <version>2.3.2</version>5</dependency>

依赖于Spring版本升级quartz选择2.3.2,同时如果你如本文案例中所示使用xml配置任务。那么会有如下更改;

Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean

<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">2     <property name="jobDetail" ref="taskHandler"/>3     <property name="cronExpression" value="0/5 * * * * ?"/></bean>

Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean

<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">2     <property name="jobDetail" ref="taskHandler"/>3     <property name="cronExpression" value="0/5 * * * * ?"/></bean>

在正式分析前,可以看下quartz的默认配置,很多初始化动作都要从这里取得参数,同样你可以配置自己的配置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。

quart.properties

 1Default Properties file for use by StdSchedulerFactory 2to create a Quartz Scheduler Instance, if a different 3# properties file is not explicitly specified. 4# 5 6org.quartz.scheduler.instanceName: DefaultQuartzScheduler 7org.quartz.scheduler.rmi.export: false 8org.quartz.scheduler.rmi.proxy: false 9org.quartz.scheduler.wrapJobExecutionInUserTransaction: false1011org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool12org.quartz.threadPool.threadCount: 1013org.quartz.threadPool.threadPriority: 514org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true1516org.quartz.jobStore.misfireThreshold: 600001718org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

1. 从一个简单案例开始

平时我们使用Schedule基本都是注解或者xml配置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。

DemoTask.java & 定义一个等待被执行的任务

1public class DemoTask {23    private Logger logger = LoggerFactory.getLogger(DemoTask.class);45    public void execute() throws Exception{6        logger.info("定时处理用户信息任务:0/5 * * * * ?");7    }89}

MyTask.java & 测试类,将配置在xml中的代码抽离出来

 1public class MyTask { 2 3    public static void main(String[] args) throws Exception { 4 5        DemoTask demoTask = new DemoTask(); 6 7        // 定义了;执行的内容 8        MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean(); 9        methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);10        methodInvokingJobDetailFactoryBean.setTargetMethod("execute");11        methodInvokingJobDetailFactoryBean.setConcurrent(true);12        methodInvokingJobDetailFactoryBean.setName("demoTask");13        methodInvokingJobDetailFactoryBean.afterPropertiesSet();1415        // 定义了;执行的计划16        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();17        cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());18        cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");19        cronTriggerFactoryBean.setName("demoTask");20        cronTriggerFactoryBean.afterPropertiesSet();2122        // 实现了;执行的功能23        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();24        schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());25        schedulerFactoryBean.setAutoStartup(true);26        schedulerFactoryBean.afterPropertiesSet();2728        schedulerFactoryBean.start();2930        // 暂停住31        System.in.read();3233    }3435}

如果一切顺利,那么会有如下结果:

 12020-01-04 10:47:16.369 [main] INFO  org.quartz.impl.StdSchedulerFactory[1220- Using default implementation for ThreadExecutor 22020-01-04 10:47:16.421 [main] INFO  org.quartz.core.SchedulerSignalerImpl[61- Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl 32020-01-04 10:47:16.422 [main] INFO  org.quartz.core.QuartzScheduler[229- Quartz Scheduler v.2.3.2 created. 42020-01-04 10:47:16.423 [main] INFO  org.quartz.simpl.RAMJobStore[155- RAMJobStore initialized. 52020-01-04 10:47:16.424 [main] INFO  org.quartz.core.QuartzScheduler[294- Scheduler meta-data: Quartz Scheduler (v2.3.2'QuartzScheduler' with instanceId 'NON_CLUSTERED' 6  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. 7  NOT STARTED. 8  Currently in standby mode. 9  Number of jobs executed: 010  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.11  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.12132020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1374- Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.142020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1378- Quartz scheduler version: 2.3.2152020-01-04 10:47:16.426 [main] INFO  org.quartz.core.QuartzScheduler[2293- JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010162020-01-04 10:47:16.651 [main] INFO  org.quartz.core.QuartzScheduler[547- Scheduler QuartzScheduler_$_NON_CLUSTERED started.17一月 042020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler18信息: Starting Quartz Scheduler now192020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO  org.itstack.demo.DemoTask[11- 定时处理用户信息任务:0/5 * * * * ?202020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO  org.itstack.demo.DemoTask[11- 定时处理用户信息任务:0/5 * * * * ?212020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO  org.itstack.demo.DemoTask[11- 定时处理用户信息任务:0/5 * * * * ?222020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO  org.itstack.demo.DemoTask[11- 定时处理用户信息任务:0/5 * * * * ?232020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO  org.itstack.demo.DemoTask[11- 定时处理用户信息任务:0/5 * * * * ?2425Process finished with exit code -126

2. 定义执行内容(MethodInvokingJobDetailFactoryBean)

1// 定义了;执行的内容2MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();3methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);4methodInvokingJobDetailFactoryBean.setTargetMethod("execute");5methodInvokingJobDetailFactoryBean.setConcurrent(true);6methodInvokingJobDetailFactoryBean.setName("demoTask");7methodInvokingJobDetailFactoryBean.afterPropertiesSet();

这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置必要信息;

  • targetObject:目标对象bean,也就是demoTask

  • targetMethod:目标方法name,也就是execute

  • concurrent:是否并行执行,非并行执行任务,如果上一个任务没有执行完,下一刻不会执行

  • name:xml配置非必传,源码中可以获取beanName

最后我们通过手动调用 afterPropertiesSet() 来模拟初始化。如果我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类配置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean 接口,因为这个接口实现后就可以通过 T getObject() 获取自己自定义初始化的类。这也常常用在一些框架开发中。

MethodInvokingJobDetailFactoryBean.afterPropertiesSet()

 1public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException { 2    prepare(); 3    // Use specific name if given, else fall back to bean name. 4    String name = (this.name != null ? this.name : this.beanName); 5    // Consider the concurrent flag to choose between stateful and stateless job. 6    Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class); 7    // Build JobDetail instance. 8    JobDetailImpl jdi = new JobDetailImpl(); 9    jdi.setName(name);10    jdi.setGroup(this.group);11    jdi.setJobClass((Class) jobClass);12    jdi.setDurability(true);13    jdi.getJobDataMap().put("methodInvoker", this);14    this.jobDetail = jdi;1516    postProcessJobDetail(this.jobDetail);17}
  • 源码168行: 根据是否并行执行选择任务类,这两个类都是MethodInvokingJobDetailFactoryBean的内部类,非并行执行的StatefulMethodInvokingJob只是继承MethodInvokingJob添加了标记注解。

  • 源码171行: 创建JobDetailImpl,添加任务明细信息,注意这类的jdi.setJobClass((Class) jobClass)实际就是MethodInvokingJob。MethodInvokingJob也是我们最终要反射调用执行的内容。

  • 源码177行: 初始化任务后赋值给this.jobDetail = jdi,也就是最终的类对象

    MethodInvokingJobDetailFactoryBean.getObject()

    1@Override2public JobDetail getObject() {3    return this.jobDetail;4}
  • 源码:220行: 获取对象时返回 this.jobDetail,这也就解释了为什么 MethodInvokingJobDetailFactoryBean 初始化后直接赋值给了一个 JobDetail ;

    java Spring定时任务Quartz执行过程是什么
    微信公众号:bugstack虫洞栈 & Schedule.xml

3. 定义执行计划(CronTriggerFactoryBeann)

1// 定义了;执行的计划2CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();3cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());4cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");5cronTriggerFactoryBean.setName("demoTask");6cronTriggerFactoryBean.afterPropertiesSet();

这一块主要定义任务的执行计划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置必要信息;

  • jobDetail:设置任务体,xml 中可以直接将对象赋值,硬编码中设置执行的 JobDetail 对象信息。也就是我们上面设置的 JobDetailImpl ,通过 getObject() 获取出来。

  • cronExpression:计划表达式;秒、分、时、日、月、周、年

CronTriggerFactoryBean.afterPropertiesSet()

 1@Override 2public void afterPropertiesSet() throws ParseException { 3 4    // ... 校验属性信息 5 6    CronTriggerImpl cti = new CronTriggerImpl(); 7    cti.setName(this.name); 8    cti.setGroup(this.group); 9    if (this.jobDetail != null) {10        cti.setJobKey(this.jobDetail.getKey());11    }12    cti.setJobDataMap(this.jobDataMap);13    cti.setStartTime(this.startTime);14    cti.setCronExpression(this.cronExpression);15    cti.setTimeZone(this.timeZone);16    cti.setCalendarName(this.calendarName);17    cti.setPriority(this.priority);18    cti.setMisfireInstruction(this.misfireInstruction);19    cti.setDescription(this.description);20    this.cronTrigger = cti;21}
  • 源码237行: 创建触发器 CronTriggerImpl 并设置相关属性信息

  • 源码245行: 生成执行计划类 cti.setCronExpression(this.cronExpression);

    1public void setCronExpression(String cronExpression) throws ParseException {2    TimeZone origTz = getTimeZone();3    this.cronEx = new CronExpression(cronExpression);4    this.cronEx.setTimeZone(origTz);5}

    CronExpression.java & 解析Cron表达式

     1protected void buildExpression(String expression) throws ParseException { 2    expressionParsed = true; 3    try {    // ... 初始化 TreeSet xxx = new TreeSet&lt;Integer&gt;(); 4 5    int exprOn = SECOND; 6    StringTokenizer exprsTok = new StringTokenizer(expression, " \t", 7            false); 8 9    while (exprsTok.hasMoreTokens() &amp;&amp; exprOn &lt;= YEAR) {10        String expr = exprsTok.nextToken().trim();1112        // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符1314        StringTokenizer vTok = new StringTokenizer(expr, ",");15        while (vTok.hasMoreTokens()) {16            String v = vTok.nextToken();17            storeExpressionVals(0, v, exprOn);18        }19        exprOn++;20    }2122    // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符2324} catch (ParseException pe) {25    throw pe;26} catch (Exception e) {27    throw new ParseException("Illegal cron expression format ("28            + e.toString() + ")"0);29}30AI代码助手复制代码}
    • Cron表达式有7个字段,CronExpression 把7个字段解析为7个 TreeSet 对象。

    • 填充TreeSet对象值的时候,表达式都会转换为起始值、结束值和增量的计算模式,然后计算出匹配的值放进TreeSet对象

CronTriggerFactoryBean.getObject()

1@Override2public CronTrigger getObject() {3    return this.cronTrigger;4}
  • 源码257行: 获取对象时返回 this.cronTrigger ,也就是 CronTriggerImpl 对象

4. 调度执行计划(SchedulerFactoryBean)

1// 调度了;执行的计划(scheduler)2SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();3schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());4schedulerFactoryBean.setAutoStartup(true);5schedulerFactoryBean.afterPropertiesSet();67schedulerFactoryBean.start();

这一部分如名字一样调度工厂,相当于一个指挥官,可以从全局做调度,比如监听哪些trigger已经ready、分配线程等等,同样也需要设置必要的属性信息;

  • triggers:按需可以设置多个触发器,本文设置了一个 cronTriggerFactoryBean.getObject() 也就是 CronTriggerImpl 对象

  • autoStartup:默认是否自动启动任务,默认值为true

这个过程较长包括:调度工厂、线程池、注册任务等等,整体核心加载流程如下;

java Spring定时任务Quartz执行过程是什么  
微信公众号:bugstack虫洞栈 & 调度工程初始化流程
  • 整个加载过程较长,抽取部分核心代码块进行分析,其中包括的类;

    • StdScheduler

    • StdSchedulerFactory

    • SimpleThreadPool

    • QuartzScheduler

    • QuartzSchedulerThread

    • RAMJobStore

    • CronTriggerImpl

    • CronExpression

SchedulerFactoryBean.afterPropertiesSet()

 1public void afterPropertiesSet() throws Exception { 2    if (this.dataSource == null && this.nonTransactionalDataSource != null) { 3        this.dataSource = this.nonTransactionalDataSource; 4    } 5    if (this.applicationContext != null && this.resourceLoader == null) { 6        this.resourceLoader = this.applicationContext; 7    } 8    // Initialize the Scheduler instance... 9    this.scheduler = prepareScheduler(prepareSchedulerFactory());10    try {11        registerListeners();12        registerJobsAndTriggers();13    }14    catch (Exception ex) {15        try {16            this.scheduler.shutdown(true);17        }18        catch (Exception ex2) {19            logger.debug("Scheduler shutdown exception after registration failure", ex2);20        }21        throw ex;22    }23}
  • 源码474行: 为调度器做准备工作 prepareScheduler(prepareSchedulerFactory()) ,依次执行如下;

    11)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。22)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。33)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。44)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;55)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;66)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;77)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;88)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
    1. SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)

    2. SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);

    3. SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)

    4. Scheduler newScheduler = schedulerFactory.getScheduler();

    5. StdSchedulerFactory.getScheduler();

    6. sched = instantiate(); 包括一系列核心操作;

  • 源码477行: 调用父类 SchedulerAccessor.registerJobsAndTriggers() 注册任务和触发器

    1for (Trigger trigger : this.triggers) {2    addTriggerToScheduler(trigger);3}

SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor 是SchedulerFactoryBean的父类

 1private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException { 2    boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null); 3    if (triggerExists && !this.overwriteExistingJobs) { 4        return false; 5    } 6    // Check if the Trigger is aware of an associated JobDetail. 7    JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail"); 8    if (triggerExists) { 9        if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {10            this.jobDetails.add(jobDetail);11        }12        try {13            getScheduler().rescheduleJob(trigger.getKey(), trigger);14        }15        catch (ObjectAlreadyExistsException ex) {16            if (logger.isDebugEnabled()) {17                logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +18                        "cluster race condition: " + ex.getMessage() + " - can safely be ignored");19            }20        }21    }22    else {23        try {24            if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&25                    (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {26                getScheduler().scheduleJob(jobDetail, trigger);27                this.jobDetails.add(jobDetail);28            }29            else {30                getScheduler().scheduleJob(trigger);31            }32        }33        catch (ObjectAlreadyExistsException ex) {34            if (logger.isDebugEnabled()) {35                logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +36                        "cluster race condition: " + ex.getMessage() + " - can safely be ignored");37            }38            if (this.overwriteExistingJobs) {39                getScheduler().rescheduleJob(trigger.getKey(), trigger);40            }41        }42    }43    return true;44}
  • 源码299行: addJobToScheduler(jobDetail) 一直会调用到 RAMJobStore 进行存放任务信息到 HashMap(100)

     1public void storeJob(JobDetail newJob, 2    boolean replaceExisting) throws ObjectAlreadyExistsException { 3    JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); 4    boolean repl = false; 5    synchronized (lock) { 6        if (jobsByKey.get(jw.key) != null) { 7            if (!replaceExisting) { 8                throw new ObjectAlreadyExistsException(newJob); 9            }10            repl = true;11        }12        if (!repl) {13            // get job group14            HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());15            if (grpMap == null) {16                grpMap = new HashMap<JobKey, JobWrapper>(100);17                jobsByGroup.put(newJob.getKey().getGroup(), grpMap);18            }19            // add to jobs by group20            grpMap.put(newJob.getKey(), jw);21            // add to jobs by FQN map22            jobsByKey.put(jw.key, jw);23        } else {24            // update job detail25            JobWrapper orig = jobsByKey.get(jw.key);26            orig.jobDetail = jw.jobDetail; // already cloned27        }28    }29}
  • 初始化线程组;

    SimpleThreadPool.initialize() & 这里的count是默认配置中的数量,可以更改

    1 // create the worker threads and start them2 Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();3 while(workerThreads.hasNext()) {4     WorkerThread wt = workerThreads.next();5     wt.start();6     availWorkers.add(wt);7 }
    • prepareScheduler

    • createScheduler

    • schedulerFactory

    • StdSchedulerFactory.getScheduler()

    • getScheduler()->instantiate()

    • 源码1323行: tp.initialize();

5. 启动定时任务

案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。核心流程如下;

java Spring定时任务Quartz执行过程是什么  
微信公众号:bugstack虫洞栈 & 调度启动流程

这个启动过程中,核心的代码类,如下;

  • StdScheduler

  • QuartzScheduler

  • QuartzSchedulerThread

  • ThreadPool

  • RAMJobStore

  • CronTriggerImpl

  • JobRunShellFactory

QuartzScheduler.start() & 启动

 1public void start() throws SchedulerException { 2 3    if (shuttingDown|| closed) { 4        throw new SchedulerException( 5                "The Scheduler cannot be restarted after shutdown() has been called."); 6    } 7 8    // QTZ-212 : calling new schedulerStarting() method on the listeners 9    // right after entering start()10    notifySchedulerListenersStarting();1112    if (initialStart == null) {13        initialStart = new Date();14        this.resources.getJobStore().schedulerStarted();            15        startPlugins();16    } else {17        resources.getJobStore().schedulerResumed();18    }1920    // 唤醒线程21    schedThread.togglePause(false);2223    getLog().info(24            "Scheduler " + resources.getUniqueIdentifier() + " started.");2526    notifySchedulerListenersStarted();27}

QuartzSchedulerThread.run() & 执行过程

 1@Override 2public void run() { 3    int acquiresFailed = 0; 4 5    // 只有调用了halt()方法,才会退出这个死循环 6    while (!halted.get()) { 7        try { 8 9            // 一、如果是暂停状态,则循环超时等待1000毫秒1011            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..1213            // 阻塞直到有空闲的线程可用并返回可用的数量14            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();15            if(availThreadCount > 0) {1617                List<OperableTrigger> triggers;18                long now = System.currentTimeMillis();19                clearSignaledSchedulingChange();2021                try {22                    // 二、获取acquire状态的Trigger列表,也就是即将执行的任务23                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(24                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat25                    acquiresFailed = 0;26                    if (log.isDebugEnabled())27                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers28                } catch(){//...}2930                if (triggers != null && !triggers.isEmpty()) {3132                    // 三:获取List第一个Trigger的下次触发时刻33                    long triggerTime = triggers.get(0).getNextFireTime().getTime();3435                    // 四:获取任务触发集合36                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);3738                    // 五:设置Triggers为'executing'状态39                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));4041                    // 六:创建JobRunShell42                    qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);4344                    // 七:执行Job45                    qsRsrcs.getThreadPool().runInThread(shell)4647                    continue// while (!halted)48                }49            } else { // if(availThreadCount > 0)50                // should never happen, if threadPool.blockForAvailableThreads() follows con51                continue// while (!halted)52            }535455        } catch(RuntimeException re) {56            getLog().error("Runtime error occurred in main trigger firing loop.", re);57        }58    }5960    qs = null;61    qsRsrcs = null;62}
  • 源码391行: 创建JobRunShell,JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做准备。执行业务逻辑代码在runInThread(shell)方法里面。

    QuartzSchedulerThread.run() & 部分代码

    1JobRunShell shell = null;2try {3    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);4    shell.initialize(qs);5catch (SchedulerException se) {6    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);7    continue;8}
  • 源码398行: qsRsrcs.getThreadPool().runInThread(shell)

    SimpleThreadPool.runInThread

     1// 保存所有WorkerThread的集合 2private List<WorkerThread> workers; 3// 空闲的WorkerThread集合 4private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); 5// 任务的WorkerThread集合 6private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); 7 8/** 9 * 维护workers、availWorkers和busyWorkers三个列表数据10 * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()11 * 然后调用WorkThread.run(runnable)方法12 */13public boolean runInThread(Runnable runnable) {14    if (runnable == null) {15        return false;16    }synchronized (nextRunnableLock) {1718    handoffPending = true;1920    // Wait until a worker thread is available21    while ((availWorkers.size() &lt; 1) &amp;&amp; !isShutdown) {22        try {23            nextRunnableLock.wait(500);24        } catch (InterruptedException ignore) {25        }26    }2728    if (!isShutdown) {29        WorkerThread wt = (WorkerThread)availWorkers.removeFirst();30        busyWorkers.add(wt);31        wt.run(runnable);32    } else {33        // If the thread pool is going down, execute the Runnable34        // within a new additional worker thread (no thread from the pool).3536        WorkerThread wt = new WorkerThread(this, threadGroup,37                "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);38        busyWorkers.add(wt);39        workers.add(wt);40        wt.start();41    }42    nextRunnableLock.notifyAll();43    handoffPending = false;44}4546return true;47AI代码助手复制代码}
  • 源码428行: WorkerThread ,是一个内部类,主要是赋值并唤醒lock对象的等待线程队列

    WorkerThread.run(Runnable newRunnable)

    1public void run(Runnable newRunnable) {2    synchronized(lock) {3        if(runnable != null) {4            throw new IllegalStateException("Already running a Runnable!");5        }6        runnable = newRunnable;7        lock.notifyAll();8    }9}
  • 源码561行: WorkerThread 的run方法,方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法。到这!接近曙光了!终于来到了执行业务的execute()方法的倒数第二步,runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。

    WorkerThread.run()

     1@Override 2public void run() { 3    boolean ran = false;while (run.get()) { 4    try { 5        synchronized(lock) { 6            while (runnable == null &amp;&amp; run.get()) { 7                lock.wait(500); 8            } 9            if (runnable != null) {10                ran = true;11                // 启动真正执行的内容,runnable就是JobRunShell12                runnable.run();13            }14        }15    } cache(){//...}16}17//if (log.isDebugEnabled())18try {19    getLog().debug("WorkerThread is shut down.");20} catch(Exception e) {21    // ignore to help with a tomcat glitch22}23AI代码助手复制代码}

JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行

 1public void run() { 2    qs.addInternalSchedulerListener(this); 3 4    try { 5        OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); 6        JobDetail jobDetail = jec.getJobDetail(); 7 8        do { 9            // ...1011            long startTime = System.currentTimeMillis();12            long endTime = startTime;1314            // execute the job15            try {16                log.debug("Calling execute on job " + jobDetail.getKey());1718                // 执行业务代码,也就是我们的task19                job.execute(jec);2021                endTime = System.currentTimeMillis();22            } catch (JobExecutionException jee) {23                endTime = System.currentTimeMillis();24                jobExEx = jee;25                getLog().info("Job " + jobDetail.getKey() +26                        " threw a JobExecutionException: ", jobExEx);27            } catch (Throwable e) {28                endTime = System.currentTimeMillis();29                getLog().error("Job " + jobDetail.getKey() +30                        " threw an unhandled Exception: ", e);31                SchedulerException se = new SchedulerException(32                        "Job threw an unhandled exception.", e);33                qs.notifySchedulerListenersError("Job ("34                        + jec.getJobDetail().getKey()35                        + " threw an exception.", se);36                jobExEx = new JobExecutionException(se, false);37            }3839            jec.setJobRunTime(endTime - startTime);4041            // 其他代码42        } while (true);4344    } finally {45        qs.removeInternalSchedulerListener(this);46    }47}

QuartzJobBean.execte() & 继续往下走

 1public final void execute(JobExecutionContext context) throws JobExecutionException { 2    try { 3        BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this); 4        MutablePropertyValues pvs = new MutablePropertyValues(); 5        pvs.addPropertyValues(context.getScheduler().getContext()); 6        pvs.addPropertyValues(context.getMergedJobDataMap()); 7        bw.setPropertyValues(pvs, true); 8    } 9    catch (SchedulerException ex) {10        throw new JobExecutionException(ex);11    }12    executeInternal(context);13}

MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)

 1protected void executeInternal(JobExecutionContext context) throws JobExecutionException { 2    try { 3        // 反射执行业务代码 4        context.setResult(this.methodInvoker.invoke()); 5    } 6    catch (InvocationTargetException ex) { 7        if (ex.getTargetException() instanceof JobExecutionException) { 8            // -> JobExecutionException, to be logged at info level by Quartz 9            throw (JobExecutionException) ex.getTargetException();10        }11        else {12            // -> "unhandled exception", to be logged at error level by Quartz13            throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());14        }15    }16    catch (Exception ex) {17        // -> "unhandled exception", to be logged at error level by Quartz18        throw new JobMethodInvocationFailedException(this.methodInvoker, ex);19    }20}

到此,关于“java Spring定时任务Quartz执行过程是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/itstack/blog/4409556

AI

开发者交流群×