使用 Revalee 和 MVC 安排任务






4.94/5 (9投票s)
一个关于从 MVC 控制器发送长时间延迟电子邮件通知的示例。
引言
您几乎完成了 MVC 项目的用户注册部分,但需要能够在使用户的 30 天试用期到期前 3 天向每位用户发送电子邮件。现在该怎么办?
嗯,您可以创建一个带外进程,该进程每天晚上将过期的用户帐户分批处理,并在凌晨 4:00 发送电子邮件通知。但这会带来很多麻烦。首先,您必须在 Web 应用程序之外创建一个作业。接下来,您必须安排该作业。然后,您必须处理夜间作业生成的任何错误,当然,您必须自己监视它,尤其是在 Ted(你知道那个总是在离开办公室前重启服务器的人)负责另一台服务器时。
还有另一种方法。
使用您的 Web 应用程序。没错:它已经可以访问数据库,可以渲染和发送出站电子邮件(您已经发送了欢迎消息,对吧?),并且它是您测试工具的一部分。现在,在您因为必须保持 Web 应用程序加载数天或任何其他牵强的计划(又名 hack)而惊慌失措之前,我再说一遍:还有另一种方法。
背景
我所在开发团队遇到的这个问题太多次了,以至于我们决定编写自己的解决方案来解决这个问题,并将其开源。它叫做 Revalee——就像 reveille(起床号)一样:一个起床的信号——它是一个 Windows 服务,它会将您的 Web 请求冻结在 碳酸氢盐(好吧,不是真的)中,直到它们可以被解冻。更具体地说,Revalee 会记录您最初的 Web 请求,并在预设的未来日期和时间调用您的 Web 应用程序。
免责声明:如果您在前两句话中错过了,Revalee 是一个免费的开源项目,由我所属的开发团队编写。它可在 GitHub 上获取,并受 MIT 许可证的保护。如果您有兴趣,请下载并查看。
如果您查看 GitHub 项目,您会看到该项目有三个部分:Revalee.Service
、Revalee.SampleSite
和 Revalee.Client
。服务本身就是一个 Windows 服务,它接收您原始的 Web 请求并处理您未来回调的调度。示例站点是一个迷你 MVC 项目,它说明了有人如何在自己的 MVC Web 应用程序中使用 Revalee。(本文将尝试涵盖相同的主题。)最后,项目的一部分是客户端库,它有助于自动化向服务发送请求。
首先。让我们安装服务。
安装 Revalee
无论您是下载 GitHub 项目并自己编译 C# 解决方案,还是仅从 项目网站下载预编译文件,安装 Revalee 都很简单。首先,将以下编译后的文件复制到您想要的文件夹(例如:C:\Program Files\Revalee\)
- Esent.Interop.dll
- License.txt
- Revalee.Service.exe
- Revalee.Service.exe.config
接下来,从命令提示符(自然需要管理员权限)安装 Windows 服务
Revalee.Service.exe -install
就是这样。您已完成安装。还有一件事:为方便本文的讨论,我们假设您的服务器 IP 地址是 172.31.46.200。(稍后我们需要用到它。)
顺带一提,Revalee 服务在端口 46200 上侦听调度请求。这是服务的默认端口号,但可以根据需要进行更改。但是,请牢记端口 46200(或您配置的任何端口),尤其是在 Revalee 安装所在的服务器与您的 Web 服务器之间存在内部防火墙时。
使用 Revalee
回到我们正在解决的问题,首先将 Revalee.Client.dll 文件复制到您的 MVC 项目的 bin 文件夹中。接下来,将对 Revalee.Client
程序集的引用添加到您的 MVC 项目中。
还有一个 Revalee.Client
NuGet 包,如果您更喜欢这种方式,可以简化此过程。
与之前一样,为方便本文的讨论,我们假设将处理您回调操作的控制器名为 ScheduledCallback
。在您的 MVC 应用程序的该控制器顶部,不要忘记添加命名空间引用
using Revalee.Client;
在您的 ScheduledCallback
控制器中,创建一个名为,例如,SendExpirationMessage
的新操作,该操作带有一个整数参数
public ActionResult SendExpirationMessage(int userId)
此控制器操作将是您未来的回调将请求的操作。使用传入的 userId
,您可以在格式化您出站的到期电子邮件消息之前,从数据库中查找有关指定用户的任何信息,例如:
[AllowAnonymous]
[HttpPost]
public ActionResult SendExpirationMessage(int userId)
{
// Validate the incoming request to ensure that this web app requested this callback
if (!RevaleeRegistrar.ValidateCallback(this.Request))
{
// Return a '401 Unauthorized' response to the Revalee service if the callback doesn't validate
return new HttpStatusCodeResult(HttpStatusCode.Unauthorized);
}
// Validate the incoming parameter
if (userId <= 0)
{
// Either throw an exception, log an invalid (or malformed) request,
// or track something else appropriate for your application.
}
// Look up the user in your database.
// Format the email message with user-specific information, like name, email address, etc.
// Send the expiration email message to the user.
// Return a '200 OK' response to the Revalee service after successfully processing the callback
return new HttpStatusCodeResult(HttpStatusCode.OK);
}
最后,创建一个将在需要调度未来回调(在本例中为 27 天后)时调用的辅助方法。这是当用户在您的网站上注册时您将调用的方法
private void ScheduleExpirationMessage(int userId)
{
// The server address where the Revalee service is installed
string revaleeServiceHost = "172.31.46.200";
// The callback will occur 27 days from now
DateTimeOffset callbackTime = DateTimeOffset.Now.AddDays(27.0);
// The url that will be called back, including userId
Uri callbackUrl = new Uri(
string.Format("http://mywebapp.com/ScheduledCallback/SendExpirationMessage/{0}", userId));
// Register the callback request with the Revalee service
RevaleeRegistrar.ScheduleCallback(revaleeServiceHost, callbackTime, callbackUrl);
}
万一您需要为您的 Web 应用程序自身的内部目的进行跟踪,ScheduleCallback()
会返回一个 Guid
。这样,您就可以使用该唯一标识符引用未来的回调,或者如果需要,可以简单地取消该回调(见下文)。
就是这样。您已安排了未来的到期电子邮件消息,而无需离开您的 Visual Studio 开发环境或您的 MVC 应用程序。这比 Boba Fett 还要酷。嗯,差不多吧。
那么,有什么大不了的?
等等,什么?哦,所以你就像:我才不用什么开源垃圾软件。好吧,让我们撸起袖子,深入了解一下 Revalee。
Revalee.Client
在RevaleeRegistrar
类中,有两个方法突出了您与服务交互的简单性private static string BuildScheduleRequestUrl(Uri serviceBaseUri, DateTime callbackUtcTime, Uri callbackUrl)
{
return string.Format(
"{0}://{1}/Schedule?CallbackTime={2:s}Z&CallbackUrl={3}",
serviceBaseUri.Scheme,
serviceBaseUri.Authority,
callbackUtcTime,
EscapeCallbackUrl(callbackUrl));
}
private static string BuildCancelRequestUrl(Uri serviceBaseUri, Guid callbackId, Uri callbackUrl)
{
return string.Format(
"{0}://{1}/Cancel?CallbackId={2:D}&CallbackUrl={3}",
serviceBaseUri.Scheme,
serviceBaseUri.Authority,
callbackId,
EscapeCallbackUrl(callbackUrl));
}
当然,代码还有很多,但归结起来,您可以安排一个回调或取消一个(使用您最初安排回调时收到的 Guid
)。生成的所有 URL 都用于调用 REST Web 服务。就是这样。简单,对吧?
好吧……
Revalee.Service
Revalee.Service 项目承担了真正的繁重工作。或者继续使用星球大战的引用,这是 Ugnaughts 工作的地方(哦,别假装不知道他们是谁)。更具体地说,Supervisor
类负责管理所有这些,就像 Lobot(嘘!)一样。
using System;
using System.Diagnostics;
using System.Runtime.ConstrainedExecution;
namespace Revalee.Service
{
internal sealed class Supervisor : CriticalFinalizerObject, IDisposable
{
private readonly ILoggingProvider _LoggingProvider;
private readonly ConfigurationManager _ConfigurationManager;
private readonly TelemetryManager _TelemetryManager;
private readonly StateManager _StateManager;
private readonly TimeManager _TimeManager;
private readonly RequestManager _RequestManager;
private readonly WorkManager _WorkManager;
private readonly object _SyncRoot = new object();
private bool _IsStarted;
private bool _IsPaused;
private Supervisor()
{
try
{
_LoggingProvider = new TraceListenerLoggingProvider();
try
{
_ConfigurationManager = new ConfigurationManager();
_TelemetryManager = new TelemetryManager();
_StateManager = new StateManager();
_TimeManager = new TimeManager();
_RequestManager = new RequestManager();
_WorkManager = new WorkManager();
}
catch (Exception ex2)
{
try
{
_LoggingProvider.WriteEntry(
string.Format("{0} [Critical startup error.]", ex2.Message),
TraceEventType.Critical);
}
catch (Exception ex3)
{
Console.WriteLine("Could not write to the error log.");
Console.WriteLine("* {0}", ex3.Message);
}
throw;
}
}
catch (Exception ex1)
{
Console.WriteLine("Could not initialize logging subsystem.");
Console.WriteLine("* {0}", ex1.Message);
throw;
}
}
// ...
// (Omitted for brevity; see source code for specifics)
// ...
}
}
如您所见,Supervisor
类实例化了许多子管理器类,每个类都负责处理自己的领域。它们是:
类 | 职责 |
ConfigurationManager |
从 Revalee.Service.exe.config 文件加载配置设置 |
TelemetryManager |
通过 Windows 性能监视器跟踪活动 |
StateManager |
存储回调请求 |
TimeManager |
唤醒服务以处理回调 |
RequestManager |
处理回调的传入请求 |
WorkManager |
执行回调到您的 Web 应用程序 |
是的,您看到了好的 _SyncRoot
来处理多线程部分的锁定。
可扩展存储引擎(又名 JET Blue)
关于持久性的一两句话。作为 Windows 服务,Revalee 需要能够管理数据持久性,因为服务器随时可能重启(感谢 Ted!)。
因此,Revalee 使用了久经考验的可扩展存储引擎 (ESE),也称为 JET Blue。根据维基百科,“ESE 运行时 (ESENT.DLL) 自 Windows 2000 起已包含在每个 Windows 版本中”。这很好,因为它意味着 Revalee 可以依赖数据库访问,而无需安装任何新东西。幸运的是,由于存在 ESENT Managed Interface 开源项目,从 .NET 访问 ESE 变得很容易。
Revalee 中的数据持久性在 EseTaskPersistenceProvider
类中处理。下面重点介绍了用于通过 Esent.Interop
与 ESE 交互的 AddTask()
和 ListTasksDueBetween()
方法。当然,此类相当长,但为了简洁起见,此处已进行了编辑。
using Microsoft.Isam.Esent.Interop;
using Microsoft.Isam.Esent.Interop.Windows7;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
namespace Revalee.Service.EsePersistence
{
internal class EseTaskPersistenceProvider : ITaskPersistenceProvider
{
private const string _DatabaseName = "RevaleeTasks";
private const string _StorageEngineBaseName = "edb";
private const int _ConnectionPoolSize = 10;
private const string _TableNameCallbacks = "Callbacks";
private const string _ColumnNameCallbackId = "CallbackId";
private const string _ColumnNameCreatedTime = "CreatedTime";
private const string _ColumnNameCallbackTime = "CallbackTime";
private const string _ColumnNameCallbackUrl = "CallbackUrl";
private const string _ColumnNameAttemptsRemaining = "AttemptsRemaining";
private const string _ColumnNameAuthorizationCipher = "AuthorizationCipher";
private Instance _EseInstance;
private EseConnectionPool _ConnectionPool;
private string _DatabasePath;
private sealed class EseConnection : EsentResource
{
// (Omitted for brevity; see source code for specifics)
}
private sealed class EseConnectionPool : IDisposable
{
// (Omitted for brevity; see source code for specifics)
}
public void Open(string connectionString)
{
// (Omitted for brevity; see source code for specifics)
}
public void Close()
{
// (Omitted for brevity; see source code for specifics)
}
public RevaleeTask GetTask(Guid callbackId)
{
// (Omitted for brevity; see source code for specifics)
}
public void AddTask(RevaleeTask task)
{
if (task == null)
{
throw new ArgumentNullException("task");
}
if (_EseInstance == null)
{
throw new InvalidOperationException("Storage provider has not been opened.");
}
EseConnection connection = _ConnectionPool.OpenConnection();
try
{
using (Table table = connection.GetTable(_TableNameCallbacks, OpenTableGrbit.Updatable))
{
IDictionary<string, JET_COLUMNID> columnIds = connection.GetSchema(_TableNameCallbacks);
using (var transaction = new Transaction(connection))
{
using (var update = new Update(connection, table, JET_prep.Insert))
{
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameCallbackId],
task.CallbackId);
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameCreatedTime],
task.CreatedTime);
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameCallbackTime],
task.CallbackTime);
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameCallbackUrl],
task.CallbackUrl.OriginalString,
Encoding.Unicode);
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameAttemptsRemaining],
task.AttemptsRemaining);
if (task.AuthorizationCipher != null)
{
Api.SetColumn(
connection,
table,
columnIds[_ColumnNameAuthorizationCipher],
task.AuthorizationCipher,
Encoding.Unicode);
}
update.Save();
}
transaction.Commit(CommitTransactionGrbit.None);
}
}
}
finally
{
_ConnectionPool.CloseConnection(connection);
}
}
public void RemoveTask(RevaleeTask task)
{
// (Omitted for brevity; see source code for specifics)
}
public IEnumerable<revaleetask> ListAllTasks()
{
// (Omitted for brevity; see source code for specifics)
}
public IEnumerable<revaleetask> ListTasksDueBetween(DateTime startTime, DateTime endTime)
{
if (_EseInstance == null)
{
throw new InvalidOperationException("Storage provider has not been opened.");
}
DateTime rangeStartTime = NormalizeDateTime(startTime);
DateTime rangeEndTime = NormalizeDateTime(endTime);
// Inclusive Upper Limit does not work properly for the CLR DateTime type.
// Add the smallest amount of time that the Esent engine will detect to include
// the ending range inclusively.
rangeEndTime = rangeEndTime.AddMilliseconds(1.0);
var taskList = new List<revaleetask>();
EseConnection connection = this._ConnectionPool.OpenConnection();
try
{
using (Table table = connection.GetTable(_TableNameCallbacks, OpenTableGrbit.DenyWrite
| OpenTableGrbit.Preread
| OpenTableGrbit.ReadOnly
| OpenTableGrbit.Sequential))
{
IDictionary<string,> columnIds = connection.GetSchema(_TableNameCallbacks);
Api.JetSetCurrentIndex(connection, table, "due");
Api.MakeKey(connection, table, rangeStartTime, MakeKeyGrbit.NewKey);
if (Api.TrySeek(connection, table, SeekGrbit.SeekGE))
{
Api.MakeKey(connection, table, rangeEndTime, MakeKeyGrbit.NewKey);
if (Api.TrySetIndexRange(connection, table, SetIndexRangeGrbit.RangeInclusive
| SetIndexRangeGrbit.RangeUpperLimit))
{
JET_SESID jetSession = connection;
JET_TABLEID jetTable = table;
JET_COLUMNID jetColumnCallbackId = columnIds[_ColumnNameCallbackId];
JET_COLUMNID jetColumnCreatedTime = columnIds[_ColumnNameCreatedTime];
JET_COLUMNID jetColumnCallbackTime = columnIds[_ColumnNameCallbackTime];
JET_COLUMNID jetColumnCallbackUrl = columnIds[_ColumnNameCallbackUrl];
JET_COLUMNID jetColumnAttemptsRemaining = columnIds[_ColumnNameAttemptsRemaining];
JET_COLUMNID jetColumnAuthorizationCipher = columnIds[_ColumnNameAuthorizationCipher];
do
{
Guid? callbackId = Api.RetrieveColumnAsGuid(
jetSession,
jetTable,
jetColumnCallbackId);
DateTime? createdTime = Api.RetrieveColumnAsDateTime(
jetSession,
jetTable,
jetColumnCreatedTime);
DateTime? callbackTime = Api.RetrieveColumnAsDateTime(
jetSession,
jetTable,
jetColumnCallbackTime);
string callbackUrl = Api.RetrieveColumnAsString(
jetSession,
jetTable,
jetColumnCallbackUrl);
int? attemptsRemainingColumn = Api.RetrieveColumnAsInt32(
jetSession,
jetTable,
jetColumnAttemptsRemaining);
string authorizationCipher = Api.RetrieveColumnAsString(
jetSession,
jetTable,
jetColumnAuthorizationCipher);
Uri callbackUri = null;
if (callbackTime.HasValue
&& Uri.TryCreate(callbackUrl, UriKind.Absolute, out callbackUri)
&& createdTime.HasValue
&& callbackId.HasValue
&& attemptsRemainingColumn.HasValue)
{
RevaleeTask revivedTask = RevaleeTask.Revive(
DateTime.SpecifyKind(callbackTime.Value, DateTimeKind.Utc),
callbackUri,
DateTime.SpecifyKind(createdTime.Value, DateTimeKind.Utc),
callbackId.Value,
attemptsRemainingColumn.Value,
string.IsNullOrEmpty(authorizationCipher) ? null : authorizationCipher);
taskList.Add(revivedTask);
}
} while (Api.TryMoveNext(jetSession, jetTable));
}
}
}
}
finally
{
_ConnectionPool.CloseConnection(connection);
}
return taskList;
}
private void CreateTaskTable(EseConnection connection)
{
// (Omitted for brevity; see source code for specifics)
}
private static DateTime NormalizeDateTime(DateTime time)
{
if (time.Kind == DateTimeKind.Local)
{
return time.ToUniversalTime();
}
else if (time.Kind == DateTimeKind.Utc)
{
return time;
}
else
{
return DateTime.SpecifyKind(time, DateTimeKind.Utc);
}
}
~EseTaskPersistenceProvider()
{
this.Dispose(false);
}
public void Dispose()
{
// Re-opening a disposed provider will cause an ObjectDisposedException,
// use Close() to re-open a provider.
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool isDisposing)
{
// (Omitted for brevity; see source code for specifics)
}
}
}
使用 ESE 进行持久性对 Revalee 来说效果很好,因为它紧凑且快速。
结论
如上所述,在 MVC 应用程序中使用 Revalee 调度未来任务非常简单。它使通常超出 Web 应用程序范围的操作成为 Web 应用程序的核心部分。Revalee 接收的 Web 回调将这些请求提升为与您的 MVC 应用程序执行的所有其他操作同等的首要地位。作为首要进程,您永远不必担心您的 Web 应用程序在未处理请求的操作时卸载。这使得处理未来的到期电子邮件消息等操作变得轻而易举。
现在去看 星球大战第五集:帝国反击战 吧,您也想看。
延伸阅读
历史
- [2014.Mar.04] 初始发布。
- [2014.Mar.05] 从代码块中删除了错误的文本:“
</string,>
”。 - [2014.May.19] 添加了“延伸阅读”部分。
- [2014.May.23] 修正了“延伸阅读”部分,添加了“UrlValidator,Revalee 使用的一个项目小部件”。