使用 IObservable<T> 构建自己的调度器





5.00/5 (4投票s)
本文将帮助您构建一个具有 Observable 设计模式风格的调度器。
引言
本文将帮助您构建一个可以轻松集成到您的 .NET 应用程序中的调度器。使用此调度器,您可以配置您的调度时间并以最小的努力管理模块。在这里,我将向您展示 Observer 设计模式如何帮助构建一个可管理的调度器。
背景
在阅读本文之前,所有人都必须对 Observer 设计模式 有基本的了解。这将非常有帮助,并使本文对您来说更加轻松有趣。顺便说一句,这里有一个典型的描述。
“定义对象之间的一对多依赖关系,以便当一个对象的状态发生变化时,其所有依赖项都会自动收到通知并更新。”
简而言之,如果我想解释的话:
具体主题继承主题,观察者继承具体观察者。
如果具体主题发生变化,它会通知其他人。
其他人是谁?
其他人指的是附加到具体主题的对象。
所以最终,具体主题的变化会通知附加的模块。
Using the Code
假设我需要构建一个调度器,它将发送电子邮件、文本短信并将文件上传到 FTP 服务器。根据我之前的讨论,我可以说我需要创建一个调度任务(具体主题),其职责(附加)是发送这些邮件。在 .NET 4.0 中,我们有两个很好的接口:
如果您查看这些接口,您会注意到它们的方法与图 1.1 中的主题和观察者几乎相同。
IObservable<T> 只有一个方法:
Disposable Subscribe(IObserver<T> observer)
与我们的附加方法相比,唯一的区别在于返回值。所以我们这里不需要 detach 方法。为什么?因为它返回一个 subscribe 对象的 disposable 对象。如果您仍然感到困惑,没关系,我将在稍后的讨论中进行解释。
IObserver<T> 有三个方法:
void OnCompleted()
void OnError(Exception error)
void OnNext(T value)
OnNext (T value)
是我们主要的关注点。它类似于图 1.1 中 Observer
的 Update
方法。
令我非常惊讶的是 IObservable<T> 的 Notify 方法在哪里?好的,让我们回到调度器任务。
所以我们需要一个类来实现 IObservable<T>。下面是这个类:
public class Observable<T> : IObservable<T>
{
List<IObserver<T>> observers = new List<IObserver<T>>();
public Observable()
{
observers = new List<IObserver<T>>();
}
protected void Notify(T obj)
{
foreach (IObserver<T> observer in observers)
{
observer.OnNext(obj);
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (!observers.Contains(observer))
{
observers.Add(observer);
}
return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
private List<IObserver<T>> observers;
private IObserver<T> observer;
public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this.observers = observers;
this.observer = observer;
}
public void Dispose()
{
if (observer != null && observers.Contains(observer))
{
observers.Remove(observer);
}
}
}
}
所以我们有了泛型类以及 Notify
方法。在这里,您可以看到如何获取 IDisposable
,在订阅之后。现在我们可以将 Observable<T>
类视为 Subject
。现在我们需要我们的具体主题。
public class SchedulerTask : Observable<SchedulerTask>
{
bool _switchOn;
public bool SwitchOn
{
get
{
return _switchOn;
}
set
{
_switchOn = value;
if (_switchOn)
{
Notify(this);
}
}
}
public SchedulerTask()
{
}
}
因此,我们的 SchedulerTask
类设计方式是,当属性值 SwitchOn
设置为 true
时,它会调用基类的 Notify
方法,并且所有订阅的模块都将收到通知。
因此,对于三个任务,我们需要能够处理或执行这些任务的类。当 SchedularTask
类更新时,它们将被通知。为了接收通知 (OnNext(T)
),它们需要实现 IObserver<T>
。
这是用于 MailSend
的
public class SendNotificationMail : IObserver<SchedulerTask>
{
public SendNotificationMail()
{
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnNext(SchedulerTask value)
{
//Task that needs to be done on schedule time
// ProcessSendNotificationMailMails(value);
}
}
这是用于 TextSMS
的
public class SendTextMessaage : IObserver<SchedulerTask>
{
public SendTextMessaage()
{
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnNext(SchedulerTask value)
{
//Task that needs to be done on schedule time
// ProcessSendTextMessaage(value);
}
}
这是用于上传文件的
public class UpLoadFileFromFtp :IObserver<SchedulerTask>
{
public UpLoadFileFromFtp()
{
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnNext(SchedulerTask value)
{
//Task that needs to be done on schedule time
// ProcessUpLoadFileFromFtp(value);
}
}
这里我没有展示发送邮件、短信或文件上传的完整实现。所以我们有了所有的模块。但仍有一件重要的事情遗漏了。你注意到了吗?没有,ScheduerTask
与 SendNotificationMail
、SendTextMessaage
和 UpLoadFileFromFtp
的订阅或附加。
为了管理这些类,我们需要一个管理器类,它将它们集成在一起。
public class ManageScheduleTasks
{
SchedulerTask objScheduler;
Action<bool> TriggerScheduler;
public ManageScheduleTasks()
{
Register();
}
protected void Register()
{
objScheduler = new SchedulerTask();
GetSubsCribtionList().ForEach(p =>
{
objScheduler.Subscribe(p);
});
TriggerScheduler = InvokeScheduler;
}
protected List<IObserver<SchedulerTask>> GetSubsCribtionList()
{
return new List<IObserver<SchedulerTask>>()
{
new SendNotificationMail(),
new SendTextMessaage(),
new UpLoadFileFromFtp()
};
}
public void StartScheduler()
{
double dblmilisec = ConfigurationManager.AppSettings["scheduleTimeinMinute"] != null ?
Convert.ToDouble(ConfigurationManager.AppSettings["scheduleTimeinMinute"])
* 60000 : -1;
System.Timers.Timer t = new System.Timers.Timer();
t.Elapsed += new System.Timers.ElapsedEventHandler(IsTimeToStartScheduler);
t.Interval = dblmilisec;
t.Enabled = true;
t.AutoReset = true;
t.Start();
}
private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
{
var start = ConfigurationManager.AppSettings["scheduleStartTime"] != null ?
Convert.ToString(ConfigurationManager.AppSettings["scheduleStartTime"]) : "";
var triger = start.Equals(DateTime.Now.ToString("hh:mm tt"));
TriggerScheduler(triger);
}
private void InvokeScheduler(bool isStartTime)
{
objScheduler.SwitchOn = isStartTime;
}
}
别担心这个管理器类。我会一步一步地解释每个方法。
首先,看看 ManageScheduleTasks
类的构造函数。它做了什么?它调用 Register
方法。Register
方法实际上会订阅所有将由 SchedulerTask
通知(通过调用 GetSubsCribtionList
函数)的模块。TriggerScheduler
只是一个 Action<bool>
。我也会解释它。到目前为止一切顺利,对吧?
public ManageScheduleTasks()
{
Register();
}
protected void Register()
{
objScheduler = new SchedulerTask();
GetSubsCribtionList().ForEach(p =>
{
objScheduler.Subscribe(p); // Subscribe all the module
});
TriggerScheduler = InvoveScheduler;
}
protected List<IObserver<SchedulerTask>> GetSubsCribtionList()
{
return new List<IObserver<SchedulerTask>>()
{
new SendNotificationMail(), // Return the list of modules
new SendTextMessaage(), // that will be notified during
new UpLoadFileFromFtp() //Scheduler process running
};
}
现在,你可能会问一个问题:另外三个方法 StartScheduler
、IsTimeToStartScheduler
、InvoveScheduler
的用途是什么?以下是答案:
StartScheduler
:此方法引入一个计时器,每 1 分钟调用一次 IsTimeToStartScheduler
方法。间隔值来自 *web.config* 中的 scheduleTimeinMinute
。这里我设置为 1
。
<appSettings>
<add key="scheduleTimeinMinute" value="1"/>
<add key="scheduleStartTime" value="01:00 AM"/>
</appSettings>
public void StartScheduler()
{
double dblmilisec = ConfigurationManager.AppSettings["scheduleTimeinMinute"] != null ?
Convert.ToDouble(ConfigurationManager.AppSettings["scheduleTimeinMinute"])
* 60000 : -1;
System.Timers.Timer t = new System.Timers.Timer();
t.Elapsed += new System.Timers.ElapsedEventHandler(IsTimeToStartScheduler);
t.Interval = dblmilisec;
t.Enabled = true;
t.AutoReset = true;
t.Start();
}
IsTimeToStartScheduler
:此方法将从 *web.config* 中的 scheduleStartTime
获取调度器确切的启动时间,并检查是否到了启动调度器的时间。在凌晨 1:00,它会将 var trigger
的值设置为 true
。凌晨 1:00 来自我的 *web.config* 中的 schedulerStartTime
。
<appSettings>
<add key="scheduleTimeinMinute" value="1"/>
<add key="scheduleStartTime" value="01:00 AM"/>
</appSettings>
如果 var trigger
的值为 true
,则调度器启动。对如何启动感到困惑?以下是解释:
private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
{
var start = ConfigurationManager.AppSettings["scheduleStartTime"] != null ?
Convert.ToString(ConfigurationManager.AppSettings["scheduleStartTime"]) : "";
var triger = start.Equals(DateTime.Now.ToString("hh:mm tt"));
TriggerScheduler(triger);
}
所以,正如我之前告诉你的,TriggerScheduler
是一个指向 InvokeScheduler
的操作,它实际上是在设置 SwitchOn
值。所以你还记得当它调用 notify
的属性吗?当它将值设置为 true
时。
private void InvoveScheduler(bool isStartTime)
{
objScheduler.SwitchOn = isStartTime;
}
public bool SwitchOn
{
get
{
return _switchOn;
}
set
{
_switchOn = value;
if (_switchOn)
{
Notify(this);
}
}
}
关注点
那么,我们如何将 ManageScheduleTasks
类与 Http
挂钩呢?诀窍是使用你的 *Global.asax.cs* 类,并将以下代码放在 Application_Start
中:
public class Global : System.Web.HttpApplication
{
protected void Application_Start(object sender, EventArgs e)
{
var manageSchdule = new ManageScheduleTasks();
manageSchdule.StartScheduler();
}
}
就这样!你的调度器现在已附加到你的应用程序。
此调度器将在一天中的特定时间调用。但如果你想每 (1,2,3...n)
分钟调用一次,只需执行以下操作,并将 scheduleTimeinMinute
的值更改为你需要的任何分钟间隔。
private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
{
TriggerScheduler(true);
}
所以我们最终开发了一个调度器。