65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Quartz.NET 进行依赖作业调度

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2017 年 8 月 19 日

CPOL

7分钟阅读

viewsIcon

26813

downloadIcon

968

使用 Quartz.net 按顺序调度依赖作业

引言

一种使用 Quartz.NET 调度相互依赖且需要特定顺序执行的作业的方法。

https://www.quartz-scheduler.net/

背景

有一些教程提供了顺序执行作业的方法,但它们没有清楚地说明 Quartz 中的持久性(Durable)与非持久性(Non-Durable)的概念。Quartz 将非持久性作业定义为寿命与其触发器(triggers)的存在相关的作业。在下面的顺序执行方式中,只有一个触发器存在于首次执行时,因此我遇到的问题是如何将其他依赖作业定义为持久性。这使得通过 Windows 服务实现的作业能够按顺序执行所有作业,而不仅仅是第一个计划的作业。

我希望解决一些在学习 Quartz 时遇到的问题:

  • 设置作业以在首次执行后继续执行
  • 在最后一个顺序执行的作业运行后,并发运行任意数量的作业。

我将在这里介绍的情况包括 3 个步骤。1) 你需要通过一个计划作业为大量待处理付款收费。2) 然后,一旦所有付款都已收费,你将生成一个已收费付款的报告并将其上传到某个地方。3) 最后,在付款收费、批处理报告已生成后,发送电子邮件和短信通知客户他们的卡已被收费。

我实际上不会包含为付款收费、生成报告或向他人发送电子邮件/短信的代码,而是介绍 2) 仅在 1) 完成后才能运行,然后启动 3) 中仅依赖于 2) 完成的多个作业的情况。

使用代码

此示例的各个部分可以分解为:

  • 作业类 (Job classes)
  • 作业监听器类 (JobListener classes)
  • 调度器本身 (The scheduler itself)

Quartz.net 可以通过其服务器/企业服务功能来实现,但就本示例而言,它只是一个控制台应用程序。你可以轻松地将相同的代码应用于 Windows 服务。

作业

声明你的应用程序中可能需要执行的作业。为了我们的目的,我们将创建 `ChargePaymentsJob`、`BatchFileUploadJob`、`TextMessageJob` 和 `EmailJob`。ChargePayments 必须首先运行,然后是 BatchFileUploadJob。一旦 BatchFileUploadJob 完成,我们就可以让 Quartz.NET 同时启动 TextMessageJob 和 EmailJob,因为这些作业中的任何一个都不依赖于另一个来完成。

你的所有作业都需要继承 `IJob` 接口,以便调度器知道它是一个调度器应该执行的对象。

public class EmailJob : IJob
{
    public void Execute(IJobExecutionContext context)
    {
        // Do something here to hit the database and get a list of payments that were charged, then loop over those payments to send an email to the customer
        //   indicating their payment was successfully charged for customers who want notifications via email.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine("Email sent to customer {0} payment was charged successfully", i);
        }
    }
}

public class TextMessageJob : IJob
{
    public void Execute(IJobExecutionContext context)
    {
        // Do something here to hit the database and get a list of payments that were charged, then loop over those payments to send a text message to the customer
        //   indicating their payment was successfully charged for customers who want notifications via text message.

        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine("Text mesasge sent to customer {0} payment was charged successfully", i);
        }
    }
}

public class BatchFileUploadJob : IJob
{
    public void Execute(IJobExecutionContext context)
    {
        // Idea here would be to hit query your table, get all payments that were successfull charged and generate a report
        Console.WriteLine("Report of payments processed successfully generated");
    }
}

public class ChargePaymentsJob : IJob
{
    public void Execute(IJobExecutionContext context)
    {
        // Query your table and get all payments that are due at this time and loop through those customers, get the required data and charge those payments.

        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine("Charging Customer {0}s payment", i);
        }
    }
}

作业监听器 (JobListeners)

作业监听器是响应调度器/已执行作业发生的事件的类。Quartz 也有触发器监听器(TriggerListeners)的概念,但我在这里不会深入探讨,因为使用触发器监听器对于此示例来说不是必需的。
 
在此示例中,我们将有一个 `DependentJobListener` 和一个 `NonDepedentJobListener`。你可以为两种类型的作业应用相同的监听器,但可能存在不同情况需要为依赖作业与非依赖作业执行不同的操作,因此最好将它们完全分开。
 
你的作业监听器必须继承 `IJobListener` 接口,该接口提供了三个方法:`JobToBeVetoed`、`JobToBeExecuted` 和 `JobWasExecuted`。在此示例中,我们关心的是 `JobWasExecuted`。然而,`JobToBeVetoed` 仅在作业被触发器监听器否决时调用,而 `JobToBeExecuted` 仅在作业未被否决时调用。否决的概念在这里是指作业是否基于触发器条件或实现的任何自定义触发器监听器而被拒绝。
 
在 `JobWasExecuted` 方法中,我们首先通过检查 `jobException` 参数是否为 null 来处理可能发生的任何错误。
 
var jobname = context.JobDetail.Key.Name;

if (jobException != null)
{
    // Do something here to respond to any issues that arise from your jobs that executed.
    Console.WriteLine("Job {0} exploded, unable to complete the tasks it required", jobname);
    return;
}
然后,如果一切正常,一旦 `BatchFileUploadJob` 作业执行完毕,我们将调度所有依赖于 `ChargePaymentsJob` 和 `BatchFileUploadJob` 已运行但彼此之间不依赖的作业,通过添加一个条件“如果当前执行的作业名称等于最后一个依赖作业”,这将提供触发器来调度所有进一步的非依赖作业立即执行。
 
if (jobname == "BatchFileUploadJob")
{
   // Code to come below
}
 
之后,我提供了一种通过反射实现这些作业的方法,而不是仅仅硬编码它们。这样做的目的是你可以将这些内容移到数据库的一个表中,随着你需求的不断变化,你可以简单地在你的表中添加一个非依赖作业(NonDependent Job),然后像下面一样循环遍历它,以便在所有依赖作业运行后立即执行。
 
这里需要注意的重要一点是,你需要存储作业的完全限定类名以及要执行的作业的唯一名称。
 
var nonDependentJobsFromDataStore = new Dictionary<string, string>();

nonDependentJobsFromDataStore.Add("RemindersJob", "QuartzDependentJobScheduling.TextMessageJob");
nonDependentJobsFromDataStore.Add("EmailJob", "QuartzDependentJobScheduling.EmailJob");

var jobs = new List<Tuple<JobKey, string>>();

foreach (var ndjob in nonDependentJobsFromDataStore)
{
    jobs.Add(Tuple.Create(new JobKey(ndjob.Key, "NonDependentJob"), ndjob.Value));
}
一旦所有作业都从外部源(或硬编码)加载完毕,你需要将该数据整理成调度器可以执行的作业(Jobs)和触发器(Triggers)。
 
创建 `JobDetails` 列表将允许你在第二个循环中为这些作业附加触发器。你必须为每个要添加的作业(无论是硬编码还是其他方式)创建新的触发器,否则 Quartz.NET 会抛出异常。我记不清确切的异常是什么了,但它基本上表示同一个触发器不能用于多个作业,每个作业都必须有自己的触发器。
var jobDetails = new List<IJobDetail>();

foreach (var item in jobs)
{
    IJobDetail job = JobBuilder.Create(Type.GetType(item.Item2))
    .WithIdentity(item.Item1)
    .Build();

    jobDetails.Add(job);
}

foreach (var detail in jobDetails)
{
    ITrigger trigger = TriggerBuilder.Create()
            .StartNow()
            .Build();

    context.Scheduler.ScheduleJob(detail, trigger);
}
完整的作业监听器类
public class DependentJobListener : IJobListener
{
    public void JobToBeExecuted(IJobExecutionContext context)
    {

    }

    public void JobExecutionVetoed(IJobExecutionContext context)
    {

    }

    public void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException)
    {
        var jobname = context.JobDetail.Key.Name;

        if (jobException != null)
        {
            // Do something here to respond to any issues that arise from your jobs that executed.
            Console.WriteLine("Job {0} exploded, unable to complete the tasks it required", jobname);
            return;
        }

        if (jobname == "BatchFileUploadJob")
        {
            var nonDependentJobsFromDataStore = new Dictionary<string, string>();

            nonDependentJobsFromDataStore.Add("RemindersJob", "QuartzDependentJobScheduling.TextMessageJob");
            nonDependentJobsFromDataStore.Add("EmailJob", "QuartzDependentJobScheduling.EmailJob");

            var jobs = new List<Tuple<JobKey, string>>();
            
            foreach (var ndjob in nonDependentJobsFromDataStore)
            {
                jobs.Add(Tuple.Create(new JobKey(ndjob.Key, "NonDependentJob"), ndjob.Value));
            }

            
            var jobDetails = new List<IJobDetail>();

            foreach (var item in jobs)
            {
                IJobDetail job = JobBuilder.Create(Type.GetType(item.Item2))
                .WithIdentity(item.Item1)
                .Build();

                jobDetails.Add(job);
            }

            foreach (var detail in jobDetails)
            {
                ITrigger trigger = TriggerBuilder.Create()
                        .StartNow()
                        .Build();

                context.Scheduler.ScheduleJob(detail, trigger);
            }
        }
    }

    public string Name
    {
        get { return "DependentJobListener"; }
    }
}

public class NonDependentJobListener : IJobListener
{
    public void JobToBeExecuted(IJobExecutionContext context)
    {

    }

    public void JobExecutionVetoed(IJobExecutionContext context)
    {

    }

    public void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException)
    {
        var jobname = context.JobDetail.Key.Name;

        if (jobException != null)
        {
            // Do something here to respond to any issues that arise from your jobs that executed.
            Console.WriteLine("NonDependent Job {0} exploded, unable to complete the tasks it required", jobname);
            return;
        }

        Console.WriteLine("NonDependent Job - {0} executed successfully", jobname);
    }

    public string Name
    {
        get { return "NonDependentJobListener"; }
    }
}

调度器

此示例中的调度器是通过控制台应用程序实现的,但如果你需要,可以轻松地将相同的代码放入 Windows 服务中。Quartz 提供了集群功能,这可能会变得相当复杂,更多信息请参阅:https://www.quartz-scheduler.net/documentation/quartz-2.x/tutorial/advanced-enterprise-features.html
 
首先要做的就是初始化调度器,然后声明需要运行的作业。让我感到困惑的是在声明每个作业时调用 `.StoreDurably(true)` 方法。如果你在这里不这样做,所有没有通过 `.ScheduleJob` 方法直接调用的作业将只运行一次。
 
你也可以从数据库调用你的依赖作业,并在此时通过反射声明作业。但为了方便演示作业链,所有作业都是硬编码的。
 
ISchedulerFactory schedFactory = new StdSchedulerFactory();
IScheduler scheduler = schedFactory.GetScheduler();
scheduler.Start();

var chargePaymentsJobKey = JobKey.Create("ChargePaymentsJob", "DependentJob");
var batchFileUploadJobKey = JobKey.Create("BatchFileUploadJob", "DependentJob");

IJobDetail chargePaymentsJob = JobBuilder.Create<ChargePaymentsJob>()
    .WithIdentity(chargePaymentsJobKey)
    .Build();

IJobDetail batchFileUploadJob = JobBuilder.Create<BatchFileUploadJob>()
    .WithIdentity(batchFileUploadJobKey)
    .StoreDurably(true)
    .Build();
下一步是声明启动你的依赖作业执行的触发器。你可以通过调用 `.StartNow()` 方法简单地做到这一点,但我选择了使用 `.WithCronSchedule()` 方法。它们还有其他更声明式的方式来调度你的作业,但这里有一个创建 Quartz cron 作业字符串的链接:https://www.quartz-scheduler.net/documentation/quartz-3.x/tutorial/crontrigger.html
 
这个示例 cron 字符串设置为每分钟运行一次作业。你可以将其更改为 `0/5` 表示每五分钟,或者遵循提供的教程并根据你的需求安排作业。
ITrigger dependentJobTrigger = TriggerBuilder.Create()
                .WithCronSchedule("0 0/1 * * * ?")
                .Build();
创建作业后,你应该添加你的作业监听器。需要注意的是,如果你选择使用多个作业监听器,你需要将每个作业监听器分配给与其对应的作业组。在这种情况下,我们的两个组是 `DependentJob` 和 `NonDependentJob`。
 
scheduler.ListenerManager.AddJobListener(new DependentJobListener(), GroupMatcher<JobKey>.GroupEquals("DependentJob"));
scheduler.ListenerManager.AddJobListener(new NonDependentJobListener(), GroupMatcher<JobKey>.GroupEquals("NonDependentJob"));
最后一步是链接和调度你的作业。在这个简单的示例中,只有一个级别的作业链接在进行,因此我包含了一个注释掉的行来指示如何链接其他作业。
 
JobChainingJobListener listener = new JobChainingJobListener("DependentJobChain");
listener.AddJobChainLink(chargePaymentsJobKey, batchFileUploadJobKey);
//listener.AddJobChainLink(batchFileUploadJobKey, nextDependentJobKey);

scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("DependentJob"));
在注释掉的行中,`nextDependentJobKey` 将是第三个(也是最后一个)要执行的依赖作业。多个依赖作业链接的格式将是
listener.AddJobChainLink(firstJobKey, secondJobKey);
listener.AddJobChainLink(secondJobKey, thirdJobKey);
listener.AddJobChainLink(thirdJobKey, fourthJobKey);
这将遵循上面提到的 `jobKey` 变量来将它们全部链接在一起。一旦你链接了所有必要的作业,你就通过 `AddJobListener` 方法将监听器变量添加到调度器的 `ListenerManager` 中,并指明它适用的组。
scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("DependentJob"));
 
最后,你使用第一个也是唯一的触发器 `dependentJobTrigger` 来调度第一个作业 `chargePaymentsJob`。然后对于之后的所有作业,你调用 `.AddJob()` 方法,因为每个附加作业的执行都依赖于第一个计划的作业。
scheduler.ScheduleJob(chargePaymentsJob, dependentJobTrigger);
scheduler.AddJob(batchFileUploadJob, false, false);

历史

2017 年 8 月 18 日 - 初始文章撰写

© . All rights reserved.