Quartz定时任务处理涉及不少线程,Quartz中由调度线程专门负责任务的调度和触发,集群状态下节点间检测和故障恢复由集群管理线程负责处理,对于那些错过触发的任务会由错过触发处理线程负责根据错误触发策略决定是否忽略还是执行等等。当然还有Quartz默认实现的简单线程池和工作线程。
调度线程
分析
调度线程由调度工厂StdSchedulerFactory
创建调度实例QuartzScheduler
时,创建调度线程QuartzSchedulerThread
单独的线程实例。调度线程会根据下次触发时间NextFireTime
小于(当前时间+30s),触发状态为等待状态(WAITING
)等条件获取触发器队列,等待到第一个触发器的下次触发时间NextFireTime
到达前2ms,并更新触发器状态,然后获取触发器绑定的任务线程,并交由线程池org.quartz.simpl.SimpleThreadPool
(默认配置)负责处理,它会获取空闲线程异步执行任务工作线程,任务工作线程初始化后更新触发器状态。
代码参见
1、创建调度线程SchedulerFactory->Scheduer->QuartzSchedulerThread
org.quartz.impl.StdSchedulerFactory.instantiate()
org.quartz.core.QuartzScheduler.QuartzScheduler(QuartzSchedulerResources, long, long)
2、调度线程运行方法
org.quartz.core.QuartzSchedulerThread.run()
3、获取待触发队列方法
org.quartz.impl.jdbcjobstore.JobStoreSupport.acquireNextTriggers(long, int, long)
4、查询待触发队列方法
org.quartz.impl.jdbcjobstore.StdJDBCDelegate.selectTriggerToAcquire(Connection, long, long, int)
其中查询语句:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
5、触发待触发队列
org.quartz.impl.jdbcjobstore.JobStoreSupport.triggersFired(List<OperableTrigger>)
6、任务线程执行完成后通知触发器更新
org.quartz.core.QuartzScheduler.notifyJobStoreJobComplete(OperableTrigger, JobDetail, CompletedExecutionInstruction)
7、Cron触发器计算更新下一次触发时间
org.quartz.impl.triggers.CronTriggerImpl.triggered(Calendar)
8、任务运行线程执行方法,反射调用自定义Job实现
org.quartz.core.JobRunShell.run()
错过触发处理线程
分析
错过触发处理线程MisfireHandler
是由调度实例QuartzScheduler
开始启动调度start()
时创建,单独的线程实例。错过触发处理线程会休眠容忍错过触发阈值(配置文件org.quartz.jobStore.misfireThreshold
)范围内的时间,扫描错过触发的触发器,判断是否错过触发的条件:下次触发时间NextFireTime小于当前时间减去容忍错过触发阈值。如果存在错过触发的触发器,则根据错过触发策略更新下次触发时间(QRTZ_TRIGGER
表),并信号通知调度线程。任务的调度就转交由调度线程处理了。
代码参见
1、恢复错过触发任务
org.quartz.impl.jdbcjobstore.JobStoreSupport.recoverMisfiredJobs(Connection, boolean)
2、获取错过触发的触发器队列
org.quartz.impl.jdbcjobstore.DriverDelegate.hasMisfiredTriggersInState(Connection, String, long, int, List<TriggerKey>)
其中判断是否错过触发的SQL语句:
SELECT TRIGGER_NAME, TRIGGER_GROUP FROM QRTZ_TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ‘WAITING’ ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
3、更新错过触发的触发器
org.quartz.impl.jdbcjobstore.JobStoreSupport.doUpdateOfMisfiredTrigger(Connection, OperableTrigger, boolean, String, boolean)
4、信号通知调度线程
org.quartz.impl.jdbcjobstore.JobStoreSupport.signalSchedulingChangeImmediately(long)
线程间通信sigLock对象锁的wait和notifyAll,还做了加锁同步。
集群管理线程
分析
集群管理线程ClusterManager
是由调度实例StdSchedulerFactory
开始启动调度start()
时创建,也是单独的线程实例。集群管理线程休眠到下次检测周期(配置文件org.quartz.jobStore.clusterCheckinInterval
)到来,检测集群各兄弟节点的健康情况,检测是否存在故障节点。如果存在故障节点,则更新故障节点的触发器状态,并删除故障节点实例状态。这样集群节点间共享触发任务数据就可以进行故障切换,并信号通知调度线程。故障节点的任务的调度就交由调度处理线程处理了。
代码参见
1、查找集群兄弟节点存在故障节点的方法
org.quartz.impl.jdbcjobstore.JobStoreSupport.findFailedInstances(Connection)
2、恢复集群故障节点方法
org.quartz.impl.jdbcjobstore.JobStoreSupport.clusterRecover(Connection, List<SchedulerStateRecord>)
3、集群检测并更新当前检测节点最后检测时间LAST_CHECKIN_TIME
org.quartz.impl.jdbcjobstore.JobStoreSupport.clusterCheckIn(Connection)
Quartz定时任务主要由调度处理线程、集群管理线程和错过触发处理线程协调合作完成,多线程间通信。这里默认配置任务存储为JDBC方式,对任务调度最终都会体现为对数据库的操作。代码部分只是节选认为主要的片段,基于个人见解和能力难免疏漏或者流于偏见,还望自斟自琢,共同进步。
发表评论