65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 IObservable<T> 构建自己的调度器

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2013年5月16日

CPOL

4分钟阅读

viewsIcon

19856

本文将帮助您构建一个具有 Observable 设计模式风格的调度器。

引言

本文将帮助您构建一个可以轻松集成到您的 .NET 应用程序中的调度器。使用此调度器,您可以配置您的调度时间并以最小的努力管理模块。在这里,我将向您展示 Observer 设计模式如何帮助构建一个可管理的调度器。

背景

在阅读本文之前,所有人都必须对 Observer 设计模式 有基本的了解。这将非常有帮助,并使本文对您来说更加轻松有趣。顺便说一句,这里有一个典型的描述。

“定义对象之间的一对多依赖关系,以便当一个对象的状态发生变化时,其所有依赖项都会自动收到通知并更新。”

图:1.1 Observer 模式的类图。

简而言之,如果我想解释的话:

具体主题继承主题,观察者继承具体观察者。

如果具体主题发生变化,它会通知其他人。

其他人是谁?

其他人指的是附加到具体主题的对象。

所以最终,具体主题的变化会通知附加的模块。

Using the Code

假设我需要构建一个调度器,它将发送电子邮件、文本短信并将文件上传到 FTP 服务器。根据我之前的讨论,我可以说我需要创建一个调度任务(具体主题),其职责(附加)是发送这些邮件。在 .NET 4.0 中,我们有两个很好的接口:

  1. IObservable<T>
  2. IObserver<T>

如果您查看这些接口,您会注意到它们的方法与图 1.1 中的主题和观察者几乎相同。

IObservable<T> 只有一个方法:

Disposable Subscribe(IObserver<T> observer)  

与我们的附加方法相比,唯一的区别在于返回值。所以我们这里不需要 detach 方法。为什么?因为它返回一个 subscribe 对象的 disposable 对象。如果您仍然感到困惑,没关系,我将在稍后的讨论中进行解释。

IObserver<T> 有三个方法:

 void OnCompleted()
 void OnError(Exception error)
 void OnNext(T value)          

OnNext (T value) 是我们主要的关注点。它类似于图 1.1 中 ObserverUpdate 方法。

令我非常惊讶的是 IObservable<T> 的 Notify 方法在哪里?好的,让我们回到调度器任务。

所以我们需要一个类来实现 IObservable<T>。下面是这个类:

public class Observable<T> : IObservable<T>
    {
        List<IObserver<T>> observers = new List<IObserver<T>>();
 
        public Observable()
        {           
            observers = new List<IObserver<T>>();
        }
 
        protected void Notify(T obj)
        {
            foreach (IObserver<T> observer in observers)
            {
                observer.OnNext(obj);
            }
        }
 
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (!observers.Contains(observer))
            {
                observers.Add(observer);
            }
 
            return new Unsubscriber(observers, observer);
        } 
 
        private class Unsubscriber : IDisposable
        {
            private List<IObserver<T>> observers;
            private IObserver<T> observer;
 
            public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
            {
                this.observers = observers;
                this.observer = observer;
            }
 
            public void Dispose()
            {
                if (observer != null && observers.Contains(observer))
                {
                    observers.Remove(observer);
                }
            }
        }
    }

所以我们有了泛型类以及 Notify 方法。在这里,您可以看到如何获取 IDisposable,在订阅之后。现在我们可以将 Observable<T> 类视为 Subject。现在我们需要我们的具体主题。

public class SchedulerTask : Observable<SchedulerTask>
    {
        bool _switchOn;
  
        public bool SwitchOn
        {
            get
            {
                return _switchOn;
            }
            set
            {
                _switchOn = value;
                if (_switchOn)
                {
 
                    Notify(this);
                }
            }
        } 
 
        public SchedulerTask()
        {
            
        }
    }

因此,我们的 SchedulerTask 类设计方式是,当属性值 SwitchOn 设置为 true 时,它会调用基类的 Notify 方法,并且所有订阅的模块都将收到通知。

因此,对于三个任务,我们需要能够处理或执行这些任务的类。当 SchedularTask 类更新时,它们将被通知。为了接收通知 (OnNext(T)),它们需要实现 IObserver<T>

这是用于 MailSend

public class SendNotificationMail : IObserver<SchedulerTask>
    {
        public SendNotificationMail()
        {
 
        }
 
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }
 
        public void OnError(Exception error)
        {
            throw new NotImplementedException();
        }
 
        public void OnNext(SchedulerTask value)
        {
            //Task that needs to be done on schedule time
            // ProcessSendNotificationMailMails(value);
        }        
    }

这是用于 TextSMS

public class SendTextMessaage : IObserver<SchedulerTask>
    {
        public SendTextMessaage()
        {
 
        }
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }
 
        public void OnError(Exception error)
        {
            throw new NotImplementedException();
        }
 
        public void OnNext(SchedulerTask value)
        {
            //Task that needs to be done on schedule time
            // ProcessSendTextMessaage(value);           
        }        
    }   

这是用于上传文件的

public class UpLoadFileFromFtp :IObserver<SchedulerTask>
    {
        public UpLoadFileFromFtp()
        {
 
        }
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }
 
        public void OnError(Exception error)
        {
            throw new NotImplementedException();
        }
 
        public void OnNext(SchedulerTask value)
        {
            //Task that needs to be done on schedule time
            // ProcessUpLoadFileFromFtp(value);
        }
    }

这里我没有展示发送邮件、短信或文件上传的完整实现。所以我们有了所有的模块。但仍有一件重要的事情遗漏了。你注意到了吗?没有,ScheduerTaskSendNotificationMailSendTextMessaageUpLoadFileFromFtp 的订阅或附加。

为了管理这些类,我们需要一个管理器类,它将它们集成在一起。

 public class ManageScheduleTasks
    {
        SchedulerTask objScheduler;
        Action<bool> TriggerScheduler;
 
        public ManageScheduleTasks()
        {
            Register();
        }
        protected  void Register()
        {            
            objScheduler = new SchedulerTask();
            GetSubsCribtionList().ForEach(p =>
            {
                objScheduler.Subscribe(p);
 
            });
            TriggerScheduler = InvokeScheduler;
        }
 
        protected List<IObserver<SchedulerTask>> GetSubsCribtionList()
        {
            return new List<IObserver<SchedulerTask>>()
            {                  
                new SendNotificationMail(),
                new SendTextMessaage(),
                new UpLoadFileFromFtp()                               
            }; 
        }
        public void StartScheduler()
        {
            double dblmilisec = ConfigurationManager.AppSettings["scheduleTimeinMinute"] != null ?
                Convert.ToDouble(ConfigurationManager.AppSettings["scheduleTimeinMinute"]) 
                * 60000 : -1;
            System.Timers.Timer t = new System.Timers.Timer();
            t.Elapsed += new System.Timers.ElapsedEventHandler(IsTimeToStartScheduler);
            t.Interval = dblmilisec;
            t.Enabled = true;
            t.AutoReset = true;
            t.Start();
        }
 
        private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
        {
            var start = ConfigurationManager.AppSettings["scheduleStartTime"] != null ?
                Convert.ToString(ConfigurationManager.AppSettings["scheduleStartTime"]) : "";
            var triger = start.Equals(DateTime.Now.ToString("hh:mm tt"));
            TriggerScheduler(triger);
        }
        private void InvokeScheduler(bool isStartTime)
        {
            objScheduler.SwitchOn = isStartTime;
        }
    }

别担心这个管理器类。我会一步一步地解释每个方法。

首先,看看 ManageScheduleTasks 类的构造函数。它做了什么?它调用 Register 方法。Register 方法实际上会订阅所有将由 SchedulerTask 通知(通过调用 GetSubsCribtionList 函数)的模块。TriggerScheduler 只是一个 Action<bool>。我也会解释它。到目前为止一切顺利,对吧?

public ManageScheduleTasks()
        {
            Register();
        }
        protected  void Register()
        {            
            objScheduler = new SchedulerTask();
            GetSubsCribtionList().ForEach(p =>
            {
                objScheduler.Subscribe(p); // Subscribe all the module

            });
            TriggerScheduler = InvoveScheduler;
        }
 
        protected List<IObserver<SchedulerTask>> GetSubsCribtionList()
        {
            return new List<IObserver<SchedulerTask>>()
            {                  
                new SendNotificationMail(),  // Return the list of modules
                new SendTextMessaage(),     // that will be notified during
                new UpLoadFileFromFtp()    //Scheduler process  running                         
            }; 
        }

现在,你可能会问一个问题:另外三个方法 StartSchedulerIsTimeToStartSchedulerInvoveScheduler 的用途是什么?以下是答案:

StartScheduler:此方法引入一个计时器,每 1 分钟调用一次 IsTimeToStartScheduler 方法。间隔值来自 *web.config* 中的 scheduleTimeinMinute。这里我设置为 1

<appSettings>
    <add  key="scheduleTimeinMinute" value="1"/>
    <add   key="scheduleStartTime" value="01:00 AM"/>
  </appSettings>
public void StartScheduler()
        {
            double dblmilisec = ConfigurationManager.AppSettings["scheduleTimeinMinute"] != null ?
                Convert.ToDouble(ConfigurationManager.AppSettings["scheduleTimeinMinute"]) 
                * 60000 : -1;
            System.Timers.Timer t = new System.Timers.Timer();
            t.Elapsed += new System.Timers.ElapsedEventHandler(IsTimeToStartScheduler);
            t.Interval = dblmilisec;
            t.Enabled = true;
            t.AutoReset = true;
            t.Start();
        }

IsTimeToStartScheduler:此方法将从 *web.config* 中的 scheduleStartTime 获取调度器确切的启动时间,并检查是否到了启动调度器的时间。在凌晨 1:00,它会将 var trigger 的值设置为 true凌晨 1:00 来自我的 *web.config* 中的 schedulerStartTime

<appSettings> 
<add key="scheduleTimeinMinute" value="1"/> 
<add key="scheduleStartTime" value="01:00 AM"/> 
</appSettings>  

如果 var trigger 的值为 true,则调度器启动。对如何启动感到困惑?以下是解释:

 private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
        {
            var start = ConfigurationManager.AppSettings["scheduleStartTime"] != null ?
                Convert.ToString(ConfigurationManager.AppSettings["scheduleStartTime"]) : "";
            var triger = start.Equals(DateTime.Now.ToString("hh:mm tt"));
            TriggerScheduler(triger);
        }

所以,正如我之前告诉你的,TriggerScheduler 是一个指向 InvokeScheduler 的操作,它实际上是在设置 SwitchOn 值。所以你还记得当它调用 notify 的属性吗?当它将值设置为 true 时。

    private void InvoveScheduler(bool isStartTime)
        {
            objScheduler.SwitchOn = isStartTime;
        }
 public bool SwitchOn
        {
            get
            {
                return _switchOn;
            }
            set
            {
                _switchOn = value;
                if (_switchOn)
                {
 
                    Notify(this);
                }
            }
        }  

关注点

那么,我们如何将 ManageScheduleTasks 类与 Http 挂钩呢?诀窍是使用你的 *Global.asax.cs* 类,并将以下代码放在 Application_Start 中:

public class Global : System.Web.HttpApplication
    {
 
        protected void Application_Start(object sender, EventArgs e)
        {
            var manageSchdule = new ManageScheduleTasks();
            manageSchdule.StartScheduler();
        }        
    }

就这样!你的调度器现在已附加到你的应用程序。

此调度器将在一天中的特定时间调用。但如果你想每 (1,2,3...n) 分钟调用一次,只需执行以下操作,并将 scheduleTimeinMinute 的值更改为你需要的任何分钟间隔。

private void IsTimeToStartScheduler(object sender, System.Timers.ElapsedEventArgs e)
        {
            
            TriggerScheduler(true);
        } 

所以我们最终开发了一个调度器。

© . All rights reserved.