SQLXAgent - SQL Express 的作业 - 第 4 部分(共 6 部分)
创建和运行类似 SQL Server Enterprise 的作业 - 作业调度代码。
第 1 部分 - 使用 SQLXAgent
第 2 部分 - 架构和设计决策
第 3 部分 - CSV 和 Excel 导入器代码
第 4 部分 - 作业调度代码 (本文)
第 5 部分 - 包如何运行
第 6 部分 - 有趣的编码
引言
本文是关于 SQLXAgent 工具的系列文章的第 4 部分。在本文中,我将介绍作业调度处理代码。毫无疑问,这是整个解决方案中最困难的部分。我想我不得不重构它六次才能做得对。
由于可用的调度组合几乎是无限的,测试调度部分最实用的方法是创建一个测试应用程序,允许您生成未来的作业执行日期/时间,而无需实际运行作业。
本文专门讨论 SQLXThreadManager 程序集中的代码,以及仅处理 SQLXAgentSvc 应用程序处理的作业的代码。在下面的代码片段中,为了简洁起见,大多数对 DebugMsgs
的调用都被删除了,并且格式经过修改,以尝试消除水平滚动条。
注意: 文章中提供的代码片段可能不完全反映最新、最出色的代码版本。如果它与实际代码不完全匹配,也会相当接近。这就是在撰写文章时发现问题的本质。
欢迎使用 System.Reactive(又名“Rx”)
想出一个合适的模型来运行计划好的作业很痛苦。起初,我尝试使用线程,很快就变得一团糟。接下来我尝试了 TPL 库,在某些方面,这甚至更糟。然后我偶然发现了 System.Reactive(本文中称为 Rx)库。使用 Rx,我能够显着减少跟踪计划和执行作业所需的工作量、复杂性以及 CPU 负载。
使用 Rx,我设置了一个计时器,然后订阅了“滴答”事件。我想让每个作业负责自己的计时器,因为作业调度不一定以固定的间隔运行。这是描述调度处理的基本流程图。
事实上,我使用 Rx 来执行以下任务:
- 保持服务运行。尽管
JobThreadManager
正在运行,但我希望服务能够在其他所有进程处于空闲状态并等待所需时间时保持自身运行。此计时器每秒运行一次。不执行任何实际工作。
- 监视作业管理器中的配置文件更改。作业管理器监视配置文件中的更改,以便知道何时作业定义已更改。当您使用 SQLXAgent 添加/删除或以其他方式更改作业和/或步骤时,该应用程序会保存配置文件。这会触发“作业刷新”事件。只有添加、删除或更改的作业会受到影响。
- 为给定的作业建立并等待下一次执行日期/时间。当“滴答”事件发生时,将执行作业。
本文远远超出了提供关于 Rx 的任何教程的范围,因此请使用 Google 查找相关信息。毕竟,我就是这样了解它的。
JobThreadManager 类
作业线程管理器类负责管理作业的加载和更新。它不执行任何其他功能。实例化作业线程管理器时,它会从数据库加载所有作业,然后创建一个可运行作业的列表(它们已启用并且未超过其结束日期)。为无法运行的作业浪费内存确实没有意义,对吧?
以下小节将讨论更重要和更有趣的单个方法。
Start() 方法
此方法启动作业管理器,我们在其中订阅计时器。
/// <summary>
/// Starts the job manager thread.
/// </summary>
public void Start()
{
if (this.isValidJobList)
{
// we always start on the next even minute (or if we're too close to that minute, the one after).
DateTime dueStart = DateTime.Now;
dueStart = ((dueStart.Second > 55)
? dueStart.AddMinutes(2).ZeroSeconds()
: dueStart.AddMinutes(1)).ZeroSeconds();
// we need to "do work" every second
this.Timer = Observable.Timer(dueStart - DateTime.Now, TimeSpan.FromSeconds(1));
this.JobsStarted = false;
//subscribe to the timer event
this.TimerSub = this.Timer.Subscribe(x => this.DoWork(), ex => this.WorkError());
}
else
{
DebugMsgs.Show(this,
"Found no jobs that were enabled, or no jobs with steps that were enabled.",
DebugLevel.Info);
}
}
DoWork() 方法
DowWork
方法由 Rx 计时器每秒调用一次。此方法将在尚未启动时启动作业,检查 SQLXAgent.exe.config
文件以查看是否需要更新作业,如果日期已更改,将删除过期的历史记录项。
/// <summary>
/// Do the work we need to do
/// </summary>
/// <returns></returns>
private IObserver<long> DoWork()
{
IObserver<long> result = null;
try
{
// if we haven't started the jobs yet, do that
if (!this.JobsStarted)
{
this.JobsStarted = true;
this.StartJobs();
}
// otherwise, check the config file to see if it's changed, and refresh
// the jobs if necessary.
else
{
// make sure the file still exists
if (File.Exists(this.configFileInfo.FullName))
{
// if it does, refresh the FileInfo object
this.configFileInfo.Refresh();
// if the file date is > the last file date
if (configFileInfo.LastWriteTimeUtc > this.lastConfigFileDate)
{
this.lastConfigFileDate = configFileInfo.LastWriteTimeUtc;
this.HistoryExpireDays = Convert.ToInt32(SQLXCommon.Globals.
AppConfig.AppSettings.
Settings["HistoryExpireDays"].
Value);
this.RefreshJobs();
}
}
}
// see if we need to delete history items
if (DateTime.Now.Date > this.LastHistoryDelete.Date)
{
this.RemoveExpiredHistoryItems();
}
}
catch (Exception ex)
{
AppLog.Error(ex);
}
finally
{
}
return result;
}
RefreshJobs() 方法
/// <summary>
/// Refreshes the jobs because of a config file change.
/// </summary>
public void RefreshJobs()
{
// Start out unrefreshed - as jobs are added/updated, the Refreshed
// flag will be set to true.
foreach (JobThread item in this)
{
item.Refreshed = false;
}
// get the jobs from the database
JobList newJobs = new JobList();
newJobs.GetFromDatabase();
// check each job to see if it is new or changed
foreach(JobItem newJob in newJobs)
{
// get the current instance of the job
JobItem oldJob = this.Jobs.FirstOrDefault(x => x.ID == newJob.ID);
// if we didn't find it, it must be a new job
if (oldJob == null)
{
// create a new JobThread object
JobThread job = new JobThread(newJob);
// add it to this manager
this.Add(job);
// start it if the other jobs have already started
if (this.JobsStarted)
{
job.Start();
}
}
else
{
// get the existing job thread that is handling this job
JobThread jobThread = this.FirstOrDefault(x => x.Job.ID == oldJob.ID);
// if we found it, update it
if (jobThread != null)
{
jobThread.UpdateJob(newJob);
DebugMsgs.Show(this,
string.Format("Job Manager - job [{0}] updated",
oldJob.Name),
DebugLevel.Info);
}
else
{
// error msg
}
}
}
// stop/remove jobs that were not refreshed.
var toRemove = this.Where(x => x.Refreshed == false);
foreach(JobThread oldItem in toRemove)
{
oldItem.Stop();
this.Remove(oldItem);
}
}
JobThread 类
作业线程负责处理关联作业的调度。不执行任何其他处理。

Start() 方法
Start() 方法创建/连接到 Observable.Timer 对象。
/// <summary>
/// Starts the job "thread" if possible.
/// </summary>
public void Start()
{
// we don't have an update pending
this.UpdatePending = false;
// if the job can start (is enabled and has at least one enabled step)
if (this.Job.CanRestart)
{
// set the value used to determine the next interval. A value of 0 is how
// we indicate to the RunSchedule method that we're just starting out.
this.NextTime = new DateTime(0);
// start the timer with an initial interval of 1 second. The RunSchedule
// method will determine the actual interval.
this.CreateAndSubscribe(TimeSpan.FromSeconds(1));
}
else
{
// the code that actually lives here merely supports the call to DebugMsgs
// and is omitted in the interest of brevity
}
}
RunSchedule() 方法
此方法是类中的实际工作者。它是响应计时器滴答事件的委托方法。
/// <summary>
/// Runs the scheduled job. The method is called by the timer when the interval
/// expires.
/// </summary>
public void RunSchedule()
{
bool isRunning = true;
// if the NextTime is 0 ticks, we're just starting out, so we don't have anything to do.
bool canWork = (this.NextTime.Ticks != 0);
// determine the next execution datetime (this will cause canWork to be true next time around)
DateTime now = DateTime.Now;
this.NextTime = (now + this.CalculateNextRunTime(now)).ZeroSeconds();
// if we can do the work
if (canWork)
{
try
{
// execute the job (method is in JobThreadBase class)
this.ExecuteJob();
}
catch (Exception ex)
{
AppLog.Error(ex);
}
finally
{
// if our next execution datetime is later than the schedule
// duration end date
if (this.NextTime >= this.Job.SchedDurEndDate)
{
// stop the timer
this.Stop();
isRunning = false;
}
}
}
else
{
// DebugMsgs.Show(...);
}
// if we have an update pending, do the update
if (this.UpdatePending)
{
this.UpdateJob(this.PendingJob);
this.PendingJob = null;
}
// otherwise, if we're still running, reset the timer for the new
// interval.
else if (isRunning)
{
this.ResetTimer();
}
}
计算下一次计划的作业执行
有几种方法涉及计算作业执行时间,具体使用哪种方法取决于作业的调度属性。由于我在家没有一个功能齐全的 SQL Server 实例,并且因为我无法在工作机器上安装(可以预测作业计划的)软件,所以我无法确定我的调度代码在多大程度上接近 SQL Server 的工作方式。如果有人有能力和意愿来检查这一点,我很想听听我为什么做得不对(我所有这些代码的目标是大致复制 SQL Server 的工作方式)。
计算下一次计划的作业执行日期/时间始于名为 CalculateNextRunTime
的方法。
/// <summary>
/// Determines how to calculate the next execution datetime, and calls
/// the appropriate methods.
/// </summary>
/// <param name="now">The current datetime</param>
/// <returns>A timespan that will be added to the current datetime to establish
/// the subsequent execution datetime.</returns>
protected TimeSpan CalculateNextRunTime(DateTime now)
{
TimeSpan result = new TimeSpan(0);
// Establish our time of day based on the frequency properties
TimeSpan time = CalculateDailyFrequencyTime(now);
// call the appropriate period method
switch (this.Job.SchedOccurPeriod)
{
case "Daily" :
result = this.CalcNextDailyDay(now, time);
break;
case "Weekly" :
result = this.CalcNextWeeklyDay(now, time);
break;
case "Monthly" :
result = this.CalcNextMonthlyDay(now, time);
break;
default :
result = new TimeSpan(0);
break;
}
return result;
}
从这个方法中,我们调用另外两个方法。第一个是 CalculateDailyFrequencyTime
。这
/// <summary>
/// Calculate the next time to execute based on the daily frequency. This method
/// only sets the time of the next run.
/// </summary>
/// <param name="now"></param>
/// <returns></returns>
protected TimeSpan CalculateDailyFrequencyTime(DateTime now)
{
DateTime result = now;
switch (this.Job.SchedDailyFreq)
{
case true :
{
// set the time
result = result.SetTime(this.Job.SchedFreqOnceAt.TimeOfDay);
// if the current datetime is greater than new datetime, add a day.
if (now >= result)
{
result = result.AddDays(1);
}
}
break;
case false :
{
// determine the number of seconds we're using (minutes * 60, or hours * 3600)
int periodSeconds = 0;
switch (this.Job.SchedFreqEveryPeriod)
{
case "minute(s)" : periodSeconds = this.Job.SchedFreqEveryN * 60; break;
case "hour(s)" : periodSeconds = this.Job.SchedFreqEveryN * 3600; break;
}
// set the new time
result = result.AddSeconds(periodSeconds);
// if the new start time is less than the specified start time
if (result.TimeOfDay < this.Job.SchedFreqStart.TimeOfDay)
{
// set the time to the specified start time
result = result.SetTime(this.Job.SchedFreqStart.TimeOfDay);
}
// if the new time is past the specified end time
else if (result.TimeOfDay > this.Job.SchedFreqEnd.TimeOfDay)
{
// add a day and set the time to the specified start time
result = result.AddDays(1).SetTime(this.Job.SchedFreqStart.TimeOfDay);
}
}
break;
}
// return the difference between the current datetime and the new datetime
return (result - now);
}
调用 CalculateDailyFrequencyTime
后,将调用以下方法之一来确定下一次执行的日期。
/// <summary>
/// Calculate the next appropriate date for a daily schedule
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextDailyDay(DateTime now, TimeSpan time)
{
DateTime result = now;
// add the time difference
result = now.AddSeconds(time.TotalSeconds);
// if the new day is not the current day
if (result.Day != now.Day)
{
// add the number of days - 1 (because we already added a day by
// changing the time)
result.AddDays(this.Job.SchedFreqEveryN-1);
}
return (result - now);
}
/// <summary>
/// Determines the next run date for weekly schedules
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextWeeklyDay(DateTime now, TimeSpan time)
{
// the number of weeks between runs
int weekInterval = this.Job.SchedOccurWeeklyInterval;
// the weekdays we can run on
List<dayofweek> weekDays = this.GetScheduledWeekdays();
// add the time
DateTime result = now.AddSeconds(time.TotalSeconds);
// if the new day is not the current day
if (result.Day != now.Day)
{
// determine if the new day is an selected day of the week
// if it's not
if (weekDays.IndexOf(result.DayOfWeek) < 0)
{
// get the index of the current day of week
int index = weekDays.IndexOf(now.DayOfWeek);
// if the current day of week is not in the list
if (index < 0)
{
// find the index of the next appropriate day of week
List<dayofweek> daysAvailable = weekDays.Where(x=>x > now.DayOfWeek).ToList();
// the index will be the nextDOW (if found) or 0
index = (daysAvailable != null && daysAvailable.Count >= 1)
? weekDays.IndexOf(daysAvailable[0])
: 0;
}
else
{
// if the new index (index + 1) is out of range, we use
// the first item, otherwise, we se the new index
index = (index+1 > weekDays.Count-1) ? 0 : index+1;
}
// get the actual new day of week based on our index value
DayOfWeek newDOW = weekDays[index];
// if the index is 0, we have to add the weekly interval
// (every n weeks) to the new date
if (index == 0)
{
// add the days (interval * 7)
result = result.AddDays(weekInterval * 7);
// set the date to the first day of the new week
result = result.FirstDayOfWeek();
// backup one so we can find the correct next actual day
result = result.AddDays(-1);
}
// this method starts at the current day, and adds one uti it gets to
// the appropriate day of wek.
result = result.GetNextDayOfWeek(newDOW);
} // if the new day is not the current day
}
return (result - now);
}
/// <summary>
/// Caluclates the next execution date for a monthly schedule.
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextMonthlyDay(DateTime now, TimeSpan time)
{
// add the time
DateTime result = now.AddSeconds(time.TotalSeconds);
if (result.Day != now.Day)
{
if (this.Job.SchedOccurMonthlyTypeIsDay)
{
// advance the date by the month interval
result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
// Reset the day to the lowest of the actual number of days in the
// month, and the preferred day. This allows for months with fewer
// days than the specified day number. For instance, if you specified
// the 31st in the schedule form, and a day has fewer days, the last
// day of the month will be used instead.
result = result.SetDay(Math.Min(this.Job.SchedOccurMonthlyDayNumber, result.DaysInMonth()));
}
else
{
// advance the date by the month interval
result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
// get the weekday we're intersted in
DayOfWeek dow = SQLXCommon.Globals.StringToEnum(this.Job.SchedOccurMonthlyTheNthDay, DayOfWeek.Sunday);
// get the ordinal occurrence of the specified weekday
result = result.GetDateByOrdinalDay(dow, this.GetOrdinalDay());
}
}
return (result - now);
}
ExecuteJob() 方法
此方法位于 JobTheadBase
类中。之所以在基类中,是因为 SQLXAgent 应用程序允许您按需手动运行作业,而不考虑其计划。
/// <summary>
/// Executes all of the steps for the job in (step.Position) sequential order.
/// </summary>
public virtual void ExecuteJob()
{
if (!this.IsWorking)
{
DateTime jobStart = DateTime.Now;
// send event (only used when job is run manually)
this.OnJobStart(jobStart);
this.IsWorking = true;
string status = "SUCCESS";
string reason = string.Empty;
long execID = 0;
string debugMsg = string.Empty;
foreach(StepItem step in this.Job.Steps)
{
if (step.StepIsEnabled)
{
DateTime start = DateTime.Now;
// send event (only used when job is run manually)
OnStepStart(start, step.ID, step.Name);
// this is just for easing typing later on
string stepName = string.Format("-Job {0}.[STEP {1}].[{2}].[{3}]",
this.DebugJobName,
step.Position,
step.StepType,
step.Name);
switch (step.StepType)
{
case "SQL" :
{
try
{
// this should never happen, but we check for it nonetheless.
if (string.IsNullOrEmpty(step.SqlQuery))
{
status = "FAIL";
reason = string.Format("{0}",
SQLXExceptionCodes.
Codes[(int)SQLXExceptionEnum.
QueryTextNullEmpty]);
}
else
{
// execute the batch query. Remember, SQL Server throws an
// exception when it encounters a "GO" statement in batch
// queries.
DBObject2.NonQuery(step.SqlQuery,
null,
CommandType.Text);
status = "SUCCESS";
reason = string.Empty;
}
}
catch (Exception ex)
{
status = "FAIL";
reason = ex.Message;
}
}
break;
case "PKG" :
{
try
{
// this should never happen, but we check nonetheless
if (string.IsNullOrEmpty(step.SsisFilePath))
{
status = "FAIL";
reason = SQLXExceptionCodes.Codes[(int)SQLXExceptionEnum.
PkgPathNullEmpty];
}
else
{
string pkgDLLFileName = step.SsisFilePath;
string path = System.IO.Path.Combine(Globals.AppPath,
"SQLXPkgRunner.exe");
string args = string.Format("-p\"{0}\" -s\"{1}\" -c\"{2}\"",
pkgDLLFileName,
step.ID,
step.ConnectionString);
Process app = new Process();
ProcessStartInfo info = new ProcessStartInfo()
{
Arguments = args,
CreateNoWindow = true,
FileName = path,
UseShellExecute = true,
};
app.StartInfo = info;
app.Start();
app.WaitForExit();
int result = app.ExitCode;
if (result > 0)
{
status = "FAIL";
SQLXExceptionEnum exception = Globals.IntToEnum(result,
SQLXExceptionEnum.Unknown);
switch (exception)
{
case SQLXExceptionEnum.PkgFileNotFound :
reason = string.Concat(SQLXExceptionCodes.Codes[(int)exception],
" - ",
pkgDLLFileName);
break;
default : reason = SQLXExceptionCodes.
Codes[(int)exception] ; break;
}
}
else
{
status = "SUCCESS";
reason = string.Empty;
}
}
}
catch (Exception ex)
{
status = "FAIL";
reason = ex.Message;
}
// DebugMsgs...
}
break;
}
DateTime finish = DateTime.Now;
// save the history item for this execution
this.SaveHistory(ref execID, step, start, finish, status, reason);
// send event (only used by manual job run form)
this.OnStepFinish(start, finish, step.ID, step.Name,
(status=="SUCCESS"), reason);
}
else
{
}
}
DateTime jobFinish = DateTime.Now;
// send event (only used by manual job run form)
this.OnJobFinish(jobStart, jobFinish, (status=="SUCCESS"));
this.IsWorking = false;
}
}
线程测试应用程序
在项目开发过程中,我创建了一个应用程序来测试调度和实际线程执行功能。该应用程序是一个普通的 WPF 工具,如下所示。

第一对按钮允许您可视化调度将如何执行。

左侧是当前配置的作业列表。此列表包括所有作业,无论它们是否已启用。
在顶部/右侧,有单选按钮允许您配置要创建的投影日期的数量。如果作业有结束日期,则“显示所有投影”单选按钮将自动为您选中。否则,“显示“n”个投影”单选按钮将自动为您选中。当您单击“生成”按钮时,按钮下方的列表框将填充。
“日期”列表框包含生成了一个或多个投影的唯一日期。单击列表中的日期将导致“时间”列表框填充所选日期生成的日期。
所有日期/时间都从当前日期和时间进行预测。鉴于它只是一个测试应用程序,我认为这是一种合理的方法。
第二组按钮根据指定的计划实际运行作业,就像 Windows 服务运行它们一样。消息会随着作业的处理发送到输出窗口。

历史
- 2017 年 9 月 29 日 - 首次发布。