Quartz框架汇总

目录

一.Quartz理论基础

(一)Timer

 二.线程池

(一)ScheduledThreadPoolExcutor

(二)SingleThreadScheduledExecutor

二.quartz使用

(一) Job:封装成JobDetail设置属性

(二)Trigger:触发器:指定执行时间,开始结束时间等

(三)Scheduler:调度器,基于trigger的设定执行job

(四)JobDetailMap:保存任务实例的状态信息

三.Quartz集群原理

注意问题

四.Quartz实现异步通知

1. 一个Job绑定多个触发器

五.创建一个任务

(一)JobDetailFactoryBean或者CronTriggerFactoryBean

(二)使用builder创建Job或者Trigger

六.监听机制

1.JobListener

 2.TriggerListener

3.SchedulerListener

七.基础代码实现

 八.数据库各表及其作用


一.Quartz理论基础

(一)Timer

        1.Quartz中的TaskQueue就是小顶堆的结构,用来存放timeTask

        2.TimerThread:任务执行线程,死循环不断检查是否有有任务开始执行了,有就还是在这个线程执行它

        3.单线程执行任务,有任务肯互相阻塞

                schedule:执行任务超时,会导致后面的任务往后推移,预想在这个间隔内存存在的任务执行就不会执行了,就没有了

                scheduleAtFixedRate:任务超时可能导致下一个任务马上开始执行

        4.运行时异常会导致timer线程终止

        5.任务调度是基于时间的,对系统时间敏感

 二.线程池

(一)ScheduledThreadPoolExcutor

1.使用多线程执行任务,不会互相阻塞

2.如果线程失活,会新建线程执行任务 

        线程抛异常,任务会被丢弃,需要做捕获处理

3.DelayedWorkQueue:小顶堆,无界队列

        在定时线程池当中,最大线程数是没有意义的

        执行时间距离当前时间越近的任务在队列的前面

        用于添加ScheduleFutureTask(继承于FutureTask,实现RunnableScheduledFuture接口)【提供异步执行的能力,并且可以返回执行】

        线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务

        实现了Delayed接口,可以通过getDelay方法来获取延迟时间

4.Leader-Follower模式:避免没必要的唤醒和阻塞操作,这样会更加有效,且节省资源

5.应用场景:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求二需要限制后台线程数量

(二)SingleThreadScheduledExecutor

1.单线程的ScheduleThreadPoolExector

2.应用场景:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行

二.quartz使用

(一) Job:封装成JobDetail设置属性

@DisallowConcurrentExecution
//禁止并发的执行同一个job的定义的多个实例(jobDetial定义的)

@PersistJobDataAfterExecution
//持久化JobDetail中的JobDetailMap(对Trigger中的dataMap无效)
//如果一个任务不是持久化的,则当没有触发器关联他的时候,Quartz会从scheduler中删除掉他
boolean recovering = jobExecutionContext.isRecovering();

//如果一个任务请求恢复,一般是该任务执行期间发生了系统崩溃或其他关闭进程的操作
//当服务再次启动的时候,会再次执行该任务,此时jobExecutionContext.isRecovering()会返回true
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TestJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext)
            throws JobExecutionException {


    }
}

(二)Trigger:触发器:指定执行时间,开始结束时间等

1.优先级:同时触发的Trigger之间才会比较优先级

                  如果trigger是可恢复的,再恢复后再调度时,优先级不变

2.错过触发:

判断misfire的条件:

(1)job到达触发时间没有被执行

(2)被执行的延迟时间超过Quartz配置的misfireThreshold阈值

可能原因:

1.当job达到触发时间时,所有的线程都被其他job占用,没有可用线程

2.在job需要触发的时间点,schedule停止了(可能是意外停止的)

3.job使用了@DisallowConcurrentExecution注解,job不能并发执行,当达到下一个job执行点的时候,上一个任务还没没有完成

4.job指定了过去开始的执行时间,例如当前时间是8:00:00指定开始的时间为7:00:00

策略:

1.默认使用 MISFIRE_INSTRUCTION_SMART_POLICY策略

2.SimpleTrigger:

        1)now*相关的策略,会立即执行第一个misfire任务,同时修改startTime和repeatCount, 一次会重新计算finalFireTime,原计划时间会被打乱

         2)next*相关的策略,不会立即执行misfire的任务,也不会修改startTime和repeatCount, 因此finalFireTime也没有发生改变,发生了misfire也是按照原计划进行执行

3.CronTrigger:

1)MISFIRE_INSTRUCTION_SMART_POLICY: Quartz不会判断发生了misfire,立即执行所有发生了misfire的任务,然后按照原计划进行执行。 例如:10:15分立即执行9:00和10:O0的任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 2)MISFIRE_INSTRUCTION_FIRE_ONCE_NOW: 立即执行第一个发生misfire的任务,忽略其他发生misfire的任务,然后按照原计划继续执行。 例如:在10:15立即执行9:00任务,忽略10:00任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 3)MISFIRE_INSTRUCTION_DO_NOTHING: 所有发生的misfire的任务都被忽略,只是按照原计划继续执行

calender: 用于排除时间段

SimpleTrigger:具体时间点,指定间隔重复执行

CronTrigger:cron表达式

(三)Scheduler:调度器,基于trigger的设定执行job

1.SchedulerFactory:

        (1)创建Scheduler

        (2)DirectSchedulerFactory:在代码里定制Schedule参数

        (3)StdSchedulerFactory:读取classpath下的quartz.properties配置来实例化Scheduler

2.JobStore:

        存储运行时信息的,包括Trigger,Scheduler,JobDetail,业务锁等

        RAMJobStore(内存实现)

        JobStoreTX(JDBC,事务由Quartz管理)

        JobStoreCMT(JDBC,使用容器事务)

        ClusteredJobStore(集群实现)

        TerracottaJobStore(Terracotta中间件)

3.ThreadPool:SimpleThreadPool 、自定义线程池

(四)JobDetailMap:保存任务实例的状态信息

   1.jobDetail:

        默认只在Job被添加到调度程序(任务执行计划表)schedule的时候,存储一次关于该任务的状态信息数据,可以使用注解@PersistJobDataAfterExecution标明在一个任务执行完毕后就存储一次

2.trigger

        任务被多个触发器引用的时候,根据不同的触发时机,可以输入不同的输入条件

所以,JobDataMap在Job和Trigger存储的数据并不一致。

三.Quartz集群原理

Quartz急群众每个节点是一个独立的Quartz任务应用,它又管理者其他节点。该集群需要分别对每个节点分别启动或停止,独立的Quartz节点并不与另一个节点或是管理节点通信。Quartz应用是通过共有相同数据库表来感知到另一应用。也就是说只有使用持久化JobStore存储Job和Trigger才能完成Quartz集群。

Quartz的集群部署方案是分布式的,没有负责集中管理的节点,而是利用数据库杭锁的方式来实现集群环境下的并发控制。

一个scheduler实例在集群模式下首先获取{0}LOCKS表中的行锁;

Quartz主要由两个行级锁。

lock_name desc
STATE_ACCESS 状态访问锁
TRIGGER_ACCESS 触发器访问锁

Quartz集群争用触发器行锁,锁被占用只能等待,获取触发器行锁之后,先获取需要等待触发的其他触发器信息。数据库更新触发器状态信息,及时是否触发器行锁,供其他调度实例获取,然后在进行触发器任务调度操作,对数据库操作就要先获取行锁。

同一集群下,instanceName必须相同,instanceId可自动生成,isClustered为true,持久化存储,指定数据库类型对应的驱动类和数据源连接。

注意问题

1. 时间同步问题

Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。

2. 节点争抢Job问题

因为Quartz使用了一个随机的负载均衡算法,Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。

四.Quartz实现异步通知

1. 一个Job绑定多个触发器

一个Trigger只能绑定一个Job,但是一个Job可以绑定多个Trigger。

那么,若是我们将一个Job上绑定多个触发器,且每个触发器只是触发一次的话,那么,实际上可以实现阶梯式的异步通知,下面的基础代码实现中监听器StartApplicationListener就使用了异步输出。

五.创建一个任务

(一)JobDetailFactoryBean或者CronTriggerFactoryBean

JobDetailFactoryBean或者CronTriggerFactoryBean都实现了FactoryBean接口和InitializingBean接口。虽然我们new JobDetailFactoryBean(),但是实际上是将JobDetail交由的IOC管理。而InitializingBean接口会在属性装载完毕之后,自动的回调afterPropertiesSet()方法,完成Bean对象的最终构建:

    @Bean
    public JobDetailFactoryBean jobDetail(){
        //查询数据库或者配置文件
        JobDetailFactoryBean jobDetailFactoryBean=new JobDetailFactoryBean();
        jobDetailFactoryBean.setName("");
        jobDetailFactoryBean.setBeanName("");
        jobDetailFactoryBean.setJobClass((Class<? extends Job>) aClass);
        jobDetailFactoryBean.setGroup("");
        jobDetailFactoryBean.setDurability(true);
        return jobDetailFactoryBean;
    }

或者最后直接进行

     JobDetail jobDetail= jobFactory.getObject();

(二)使用builder创建Job或者Trigger

// 定义任务调度实例, 并与TestJob绑定
JobDetail job = JobBuilder.newJob(TestJob.class)
    .withIdentity("testJob", "testJobGroup")//调度器的名字和组别
    .usingJobData("job","jobDetail")//给调度器当中定义key并赋值,相当于参数,之后可以在job中取到
    .storeDurably()  //表示当没有触发器与之关联时,仍然将job继续保存在Scheduler中
    .build();
// 定义触发器, 会马上执行一次, 接着5秒执行一次
Trigger trigger = TriggerBuilder.newTrigger()
    .withIdentity("testTrigger", "testTriggerGroup")//触发器的名字和分组
    .startNow()//定义好后马上就启动
    .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5))//按照一定的策略触发
    .usingJobData("job","jobDedfafafafafdsfal")//给触发器当中定义key并赋值,相当于参数,之后可        
以在job中取到
    //.repeatForever()表示一直执行
    .forJob(jobKey.getName(), jobKey.getGroup()) //制定Trigger和Job的关联关系
    .usingJobData(jobDataMap)  //具体执行的方法中可以拿到这个传进去的信息。
    .build();

六.监听机制

Quartz的监听器用于当任务调度中关注的事件发生时,能够及时获取这一事件的通知。

  1. Quartz监听的种类:
  • JobListener:任务监听;
  • TriggerListener:触发器监听;
  • SchedulerListener:调度器监听;
  1. 监听器的作用域
  • 全局监听器:能够接收到所有的Job/Trigger的事件通知;
  • 局部监听器:只能接收在其上注册Job或者Trigger的事件;

1.JobListener

public interface JobListener {
    //获取该JobListener的名称
    String getName();
    //Scheduler在JobDetail将要被执行时调用该方法
    void jobToBeExecuted(JobExecutionContext context);
    //Scheduler在JobDetail将要被执行时,但又被TriggerListener否决调用
    void jobExecutionVetoed(JobExecutionContext context);
    //任务执行完毕调用该方法
    void jobWasExecuted(JobExecutionContext context,
            JobExecutionException jobException);

}

将JobListener绑定到Scheduler中:

//监听所有的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), EverythingMatcher.allJobs());
//监听特定的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("HelloWorld1_Job", "HelloWorld1_Group")));
//监听同一任务组的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), GroupMatcher.jobGroupEquals("HelloWorld2_Group"));
//监听两个任务组的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), OrMatcher.or(GroupMatcher.jobGroupEquals("HelloWorld1_Group"), GroupMatcher.jobGroupEquals("HelloWorld2_Group")));

 2.TriggerListener

触发器监听,即在任务调度过程中,与触发器Trigger相关的事件:触发器触发、触发器未正常触发、触发器触发完成等。

public interface TriggerListener {
    //获取触发器的名字
    public String getName();
    //Job的execute()方法被调用时调用该方法。
    public void triggerFired(Trigger trigger, JobExecutionContext context);
    //Trigger触发后,TriggerListener给了一个选择否定Job的执行。假如该方法返回true,该Job将不会被触发
    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context);
    //Trigger错过触发时间触发该方法,此方法不应该含有长时间的处理逻辑。
    public void triggerMisfired(Trigger trigger);
    //Trigger被触发并且完成Job后触发。
    public void triggerComplete(Trigger trigger, JobExecutionContext context,
            int triggerInstructionCode);
}

将TriggerListener绑定到Scheduler中:

//监听所有的Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), EverythingMatcher.allTriggers());
//监听特定的Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), KeyMatcher.keyEquals(TriggerKey.triggerKey("HelloWord1_Job", "HelloWorld1_Group")));
//监听一组Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), GroupMatcher.groupEquals("HelloWorld1_Group"));
//移除监听器
scheduler.getListenerManager().removeTriggerListener("SimpleTrigger");

3.SchedulerListener

SchedulerListener会在Scheduler的生命周期关键事件发生时调用。与Scheduler有关的事件包括:增加一个job/trigger,删除一个job/Trigger,scheduler发生严重错误,关闭Scheduler等。

public interface SchedulerListener {
     //用于部署JobDetail时调用
    public void jobScheduled(Trigger trigger);
    //用于卸载JobDetail时调用
    public void jobUnscheduled(String triggerName, String triggerGroup);
    //当一个Trigger没有触发次数时调用。
    public void triggerFinalized(Trigger trigger);

    public void triggersPaused(String triggerName, String triggerGroup);

    public void triggersResumed(String triggerName, String triggerGroup);
    //当一个或一组 JobDetail 暂停时调用这个方法。
    public void jobsPaused(String jobName, String jobGroup);
    //当一个或一组 Job 从暂停上恢复时调用这个方法。假如是一个 Job 组,jobName 参数将为 null。
    public void jobsResumed(String jobName, String jobGroup);
    //在 Scheduler 的正常运行期间产生一个严重错误时调用这个方法。
    public void schedulerError(String msg, SchedulerException cause);
    //当Scheduler 开启时,调用该方法
    public void schedulerStarted();
    //当Scheduler处于StandBy模式时,调用该方法
    public void schedulerInStandbyMode();
    //当Scheduler停止时,调用该方法
    public void schedulerShutdown();
    //当Scheduler中的数据被清除时,调用该方法。
    public void schedulingDataCleared();
}

将SchedulerListener绑定到Scheduler中:

//创建监听
scheduler.getListenerManager().addSchedulerListener(new SimpleSchedulerListener());
//移除监听
scheduler.getListenerManager().removeSchedulerListener(new SimpleSchedulerListener());

七.基础代码实现

Start.java

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        SpringApplication.run(Start.class,args);
    }
}
QuartzConfiguration.java

@Configuration
public class QuartzConfiguration {

    @Autowired
    private DataSource dataSource;
    @Bean
    public Scheduler scheduler() throws IOException {
        return schedulerFactoryBean().getScheduler();
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/application.yml"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    @Bean
    public Executor schedulerThreadPool(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
        return executor;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("cluster_scheduler");
        factory.setDataSource(dataSource);
        factory.setApplicationContextSchedulerContextKey("application");
        factory.setQuartzProperties(quartzProperties());
        factory.setTaskExecutor(schedulerThreadPool());
        factory.setStartupDelay(10);
        return factory;
    }
}
StartApplicationListener.java

@Component
//@Controller也可以
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private Scheduler scheduler;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try {
            TriggerKey triggerKey1 = TriggerKey.triggerKey("trigger1","group1");
            Trigger trigger1 = scheduler.getTrigger(triggerKey1);
            if (trigger1 ==null){
                trigger1 = TriggerBuilder.newTrigger()
                        .withIdentity(triggerKey1)
                        .withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次
                        .build();
                JobDetail jobDetail = JobBuilder.newJob(TestJob.class)
                        .withIdentity("job1","group1")
                        .build();
                scheduler.scheduleJob(jobDetail,trigger1);

            }



            TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2","group2");
            Trigger trigger2 = scheduler.getTrigger(triggerKey2);
            if (trigger2 ==null) {
                trigger2 = TriggerBuilder.newTrigger()
                        .withIdentity(triggerKey2)
                        .withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次
                        .build();
                JobDetail jobDetail = JobBuilder.newJob(TestJob.class)
                        .withIdentity("job2", "group2")
                        .build();
                scheduler.scheduleJob(jobDetail, trigger2);
            }


            scheduler.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}

从Start.java程序入口进入程序,俩个trigger一起异步输出如下

//间隔3s输出一次

开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:36
开启时间 :2022-10-28 16:50:36

进程已结束,退出代码为 -1

 八.数据库各表及其作用

        

1.qrtz_job_details:记录每个任务的详细信息。
2.qrtz_triggers:记录每个触发器的详细信息。
3.qrtz_corn_triggers:记录cornTrigger的信息。
4.qrtz_scheduler_state:记录 调度器(每个机器节点)的生命状态。
5.qrtz_fired_triggers:记录每个正在执行的触发器。
6. qrtz_simple_triggers:存储简单的trigger,包括重复次数,间隔,以及触发次数。
7. qrtz_simprop_triggers:存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型的触发器,
8. qrtz_blob_triggers:自定义的triggers使用blog类型进行存储,非自定义的triggers不会存放在此表中,
Quartz提供的triggers包括:CronTrigger,CalendarIntervalTrigger,DailyTimeIntervalTrigger以及SimpleTrigger。
9. qrtz_calendars:以 Blob 类型存储 Quartz 的 Calendar 信息
10. qrtz_paused_trigger_grps:存储已暂停的 Trigger 组的信息
11.qrtz_locks:记录程序的悲观锁(防止多个节点同时执行同一个定时任务)。

 本笔记基于图灵的课堂和https://www.jianshu.com/p/ece144448134这位大佬的笔记

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>