使用 Quartz.NET 进行依赖作业调度





5.00/5 (6投票s)
使用 Quartz.net 按顺序调度依赖作业
引言
一种使用 Quartz.NET 调度相互依赖且需要特定顺序执行的作业的方法。
https://www.quartz-scheduler.net/
背景
有一些教程提供了顺序执行作业的方法,但它们没有清楚地说明 Quartz 中的持久性(Durable)与非持久性(Non-Durable)的概念。Quartz 将非持久性作业定义为寿命与其触发器(triggers)的存在相关的作业。在下面的顺序执行方式中,只有一个触发器存在于首次执行时,因此我遇到的问题是如何将其他依赖作业定义为持久性。这使得通过 Windows 服务实现的作业能够按顺序执行所有作业,而不仅仅是第一个计划的作业。
我希望解决一些在学习 Quartz 时遇到的问题:
- 设置作业以在首次执行后继续执行
- 在最后一个顺序执行的作业运行后,并发运行任意数量的作业。
我将在这里介绍的情况包括 3 个步骤。1) 你需要通过一个计划作业为大量待处理付款收费。2) 然后,一旦所有付款都已收费,你将生成一个已收费付款的报告并将其上传到某个地方。3) 最后,在付款收费、批处理报告已生成后,发送电子邮件和短信通知客户他们的卡已被收费。
我实际上不会包含为付款收费、生成报告或向他人发送电子邮件/短信的代码,而是介绍 2) 仅在 1) 完成后才能运行,然后启动 3) 中仅依赖于 2) 完成的多个作业的情况。
使用代码
此示例的各个部分可以分解为:
- 作业类 (Job classes)
- 作业监听器类 (JobListener classes)
- 调度器本身 (The scheduler itself)
Quartz.net 可以通过其服务器/企业服务功能来实现,但就本示例而言,它只是一个控制台应用程序。你可以轻松地将相同的代码应用于 Windows 服务。
作业
声明你的应用程序中可能需要执行的作业。为了我们的目的,我们将创建 `ChargePaymentsJob`、`BatchFileUploadJob`、`TextMessageJob` 和 `EmailJob`。ChargePayments 必须首先运行,然后是 BatchFileUploadJob。一旦 BatchFileUploadJob 完成,我们就可以让 Quartz.NET 同时启动 TextMessageJob 和 EmailJob,因为这些作业中的任何一个都不依赖于另一个来完成。
你的所有作业都需要继承 `IJob` 接口,以便调度器知道它是一个调度器应该执行的对象。
public class EmailJob : IJob { public void Execute(IJobExecutionContext context) { // Do something here to hit the database and get a list of payments that were charged, then loop over those payments to send an email to the customer // indicating their payment was successfully charged for customers who want notifications via email. for (int i = 0; i < 3; i++) { Console.WriteLine("Email sent to customer {0} payment was charged successfully", i); } } } public class TextMessageJob : IJob { public void Execute(IJobExecutionContext context) { // Do something here to hit the database and get a list of payments that were charged, then loop over those payments to send a text message to the customer // indicating their payment was successfully charged for customers who want notifications via text message. for (int i = 0; i < 3; i++) { Console.WriteLine("Text mesasge sent to customer {0} payment was charged successfully", i); } } } public class BatchFileUploadJob : IJob { public void Execute(IJobExecutionContext context) { // Idea here would be to hit query your table, get all payments that were successfull charged and generate a report Console.WriteLine("Report of payments processed successfully generated"); } } public class ChargePaymentsJob : IJob { public void Execute(IJobExecutionContext context) { // Query your table and get all payments that are due at this time and loop through those customers, get the required data and charge those payments. for (int i = 0; i < 5; i++) { Console.WriteLine("Charging Customer {0}s payment", i); } } }
作业监听器 (JobListeners)
var jobname = context.JobDetail.Key.Name; if (jobException != null) { // Do something here to respond to any issues that arise from your jobs that executed. Console.WriteLine("Job {0} exploded, unable to complete the tasks it required", jobname); return; }
if (jobname == "BatchFileUploadJob") { // Code to come below }
var nonDependentJobsFromDataStore = new Dictionary<string, string>(); nonDependentJobsFromDataStore.Add("RemindersJob", "QuartzDependentJobScheduling.TextMessageJob"); nonDependentJobsFromDataStore.Add("EmailJob", "QuartzDependentJobScheduling.EmailJob"); var jobs = new List<Tuple<JobKey, string>>(); foreach (var ndjob in nonDependentJobsFromDataStore) { jobs.Add(Tuple.Create(new JobKey(ndjob.Key, "NonDependentJob"), ndjob.Value)); }
var jobDetails = new List<IJobDetail>(); foreach (var item in jobs) { IJobDetail job = JobBuilder.Create(Type.GetType(item.Item2)) .WithIdentity(item.Item1) .Build(); jobDetails.Add(job); } foreach (var detail in jobDetails) { ITrigger trigger = TriggerBuilder.Create() .StartNow() .Build(); context.Scheduler.ScheduleJob(detail, trigger); }
完整的作业监听器类
public class DependentJobListener : IJobListener { public void JobToBeExecuted(IJobExecutionContext context) { } public void JobExecutionVetoed(IJobExecutionContext context) { } public void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException) { var jobname = context.JobDetail.Key.Name; if (jobException != null) { // Do something here to respond to any issues that arise from your jobs that executed. Console.WriteLine("Job {0} exploded, unable to complete the tasks it required", jobname); return; } if (jobname == "BatchFileUploadJob") { var nonDependentJobsFromDataStore = new Dictionary<string, string>(); nonDependentJobsFromDataStore.Add("RemindersJob", "QuartzDependentJobScheduling.TextMessageJob"); nonDependentJobsFromDataStore.Add("EmailJob", "QuartzDependentJobScheduling.EmailJob"); var jobs = new List<Tuple<JobKey, string>>(); foreach (var ndjob in nonDependentJobsFromDataStore) { jobs.Add(Tuple.Create(new JobKey(ndjob.Key, "NonDependentJob"), ndjob.Value)); } var jobDetails = new List<IJobDetail>(); foreach (var item in jobs) { IJobDetail job = JobBuilder.Create(Type.GetType(item.Item2)) .WithIdentity(item.Item1) .Build(); jobDetails.Add(job); } foreach (var detail in jobDetails) { ITrigger trigger = TriggerBuilder.Create() .StartNow() .Build(); context.Scheduler.ScheduleJob(detail, trigger); } } } public string Name { get { return "DependentJobListener"; } } } public class NonDependentJobListener : IJobListener { public void JobToBeExecuted(IJobExecutionContext context) { } public void JobExecutionVetoed(IJobExecutionContext context) { } public void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException) { var jobname = context.JobDetail.Key.Name; if (jobException != null) { // Do something here to respond to any issues that arise from your jobs that executed. Console.WriteLine("NonDependent Job {0} exploded, unable to complete the tasks it required", jobname); return; } Console.WriteLine("NonDependent Job - {0} executed successfully", jobname); } public string Name { get { return "NonDependentJobListener"; } } }
调度器
ISchedulerFactory schedFactory = new StdSchedulerFactory(); IScheduler scheduler = schedFactory.GetScheduler(); scheduler.Start(); var chargePaymentsJobKey = JobKey.Create("ChargePaymentsJob", "DependentJob"); var batchFileUploadJobKey = JobKey.Create("BatchFileUploadJob", "DependentJob"); IJobDetail chargePaymentsJob = JobBuilder.Create<ChargePaymentsJob>() .WithIdentity(chargePaymentsJobKey) .Build(); IJobDetail batchFileUploadJob = JobBuilder.Create<BatchFileUploadJob>() .WithIdentity(batchFileUploadJobKey) .StoreDurably(true) .Build();
ITrigger dependentJobTrigger = TriggerBuilder.Create() .WithCronSchedule("0 0/1 * * * ?") .Build();
scheduler.ListenerManager.AddJobListener(new DependentJobListener(), GroupMatcher<JobKey>.GroupEquals("DependentJob")); scheduler.ListenerManager.AddJobListener(new NonDependentJobListener(), GroupMatcher<JobKey>.GroupEquals("NonDependentJob"));
JobChainingJobListener listener = new JobChainingJobListener("DependentJobChain"); listener.AddJobChainLink(chargePaymentsJobKey, batchFileUploadJobKey); //listener.AddJobChainLink(batchFileUploadJobKey, nextDependentJobKey); scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("DependentJob"));
listener.AddJobChainLink(firstJobKey, secondJobKey); listener.AddJobChainLink(secondJobKey, thirdJobKey); listener.AddJobChainLink(thirdJobKey, fourthJobKey);
scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("DependentJob"));
scheduler.ScheduleJob(chargePaymentsJob, dependentJobTrigger); scheduler.AddJob(batchFileUploadJob, false, false);
历史
2017 年 8 月 18 日 - 初始文章撰写