在邮件中继中使用 MSMQ






4.93/5 (13投票s)
一个完整的解决方案,
引言
我一直在做一个 .NET 项目,需要通过 SMTP 服务器发送电子邮件通知。它运行得很好,但有时通知就是没有送达。原因很简单,但也很烦人:当 SmtpClient
因任何原因无法连接到服务器时,没有任何方法可以推迟操作并稍后重试发送。因此,会抛出异常,消息被丢弃。
然后我在网上搜索了一个合适的解决方案,但只找到我设想的片段。其中之一是 Amol Rawke Pune 的文章:《使用 MSMQ 发送批量邮件》[1]。
好吧,听起来不错,但有很多限制。但其中,三个主要限制是:
MailMessage
对象并非所有字段都已传递- 队列使用者“服务”是一个简单的控制台应用程序
- 在发送邮件时,它没有处理错误情况的解决方案
它主要是一个概念验证,但我必须承认,这篇文章是我的起点。经过一番谷歌搜索,我找到了一个关于第一个问题的 很棒的解决方案[2] - 在进行了一些小改动后,我将其集成到了我的解决方案中。我还使用了 Mark Gravell 的想法 [3],让服务安装程序不依赖于 installutil.exe。
我还找到了 Rami Vemula 的作品 [4],这是一个关于如何用丰富内容填充 MailMessage
的绝佳演示。因此,它非常适合我来测试序列化能力和解决方案本身。
所以,感谢这些好人做出的贡献。其他所有事情我都是自己想出来的……但这很有趣,我希望它能对很多人有所帮助。
背景
消息队列是 Windows 的一个非常基本概念。所谓的 MSMQ(Microsoft Message Queuing,微软消息队列)技术并不新,它是一个非常有用且非常稳定的 IPC(Inter-Process Communication,进程间通信)工具——甚至可以在网络上的主机之间进行事务性通信。
它有自己的限制,首先,消息大小限制为 4MB。考虑到序列化、Unicode 等的开销,我们需要处理更小的消息。好吧,我必须接受这一点——但我从未打算发送大文件。
消息队列在哪里?
应该很简单就能找到它们
- 右键单击“我的电脑”
- 单击“管理”
- 展开“服务和应用程序”
- 现在您可以看到“消息队列”
- 展开“专用队列”,单击“新建专用队列”
嗯,在 Windows 2008 Server 上就是这么简单。但您可能在 Windows 7 上找不到它。您需要安装消息队列服务。它是(一个标准)Windows 组件,所以不是什么大问题。
与 Windows NT 以来的任何其他对象一样,消息队列也有 ACL(访问控制列表)。我稍后会回来讨论这个问题,因为它可能很棘手。
所以我的代码由四个部分组成:
- 一个测试应用程序 :)
- 用于将消息发布到队列的库
- 一个用于消耗队列并发送电子邮件的服务
- 一个
MailMessage
序列化器-反序列化器
我将很快从最后一个开始。
错误情况呢?
嗯,我的方法是将队列视为一个非标准的 FIFO 存储——MSMQ 提供了实现此功能的工具。我正在使用消息对象的属性来设置一个类似 TTL(Time To Live,生存时间)的计数器,以及一个用于存储计划时间戳的属性。当一条消息被发布时,TTL 计数器被设置为用户定义的值,时间戳被设置为当前值。该服务将使用时间戳来仅选择“已过期”的消息——每条消息在其生命周期开始时都已过期。如果在发送邮件过程中出现可能可恢复的错误,消息将被重新发布到队列并在预定的等待时间后重新处理。每次重新发布消息时,TTL 都会递减。如果 TTL 被消耗完,则消息被丢弃。当一条消息等待重新处理时,其他消息可以被处理。
Using the Code
Mailmessage 序列化器-反序列化器
如果我们想创建分布式应用程序,序列化非常重要。我认为实际上任何类都应该是可序列化的——但它们不是。嗯,System.Net.Mail.MailMessage
默认是不可序列化的。
实际上,SmtpClient
使用某种序列化方式将其消息保存为 .elm 格式(请参阅 这篇文章),但要从这样一个符合 RFC 标准的文件中重新创建 MailMessage
将是一项艰巨的任务——而且开销也很大。
[1] 的作者创建了一个可序列化的类,该类封装了 MailMessage
属性的一个小子集。但在博文 [2] 中,我们可以看到一个完整的二进制可序列化替换(SerializeableMailMessage
)原始类。
由于 MailMessage
包含许多不同不可序列化类型的属性,作者实现了 SerializeableMailAddress
、SerializeableAttachment
、SerializeableAlternateView
、SerializeableLinkedResource
、SerializeableContentDisposition
、SerializeableContentType
和 SerializeableCollection
类。
最后,将 MailMessage
的可序列化版本放入 MSMQ 消息中非常简单:
public void QueueMessage(MailMessage emailMessage)
{
//...
Message message = new Message();
message.Body = new SerializeableMailMessage(emailMessage);
//...
}
并将其取回:
Message message = msgQueue.Receive();
MailMessage mailMessage = (message.Body as SerializeableMailMessage).GetMailMessage();
我的版本是将其适配到 .NET 4.0,我改用了泛型集合等等。总的来说,这是很棒的东西,但如果您发现有什么缺失,那他(作者 [2])是要负责的 :)。不,实际上,由于我打算在实际应用程序中使用它,任何评论都值得赞赏。
消息发送器库
这部分代码 intended to be called by the application willing to send a mail. It contains a MailSender
class. When constructed, it tries to attach to the MSMQ specified in the constructor parameter. If it does not exist, the construction is aborted.
if (!MessageQueue.Exists(queueName))
{
throw new ArgumentException("Cannot instantiate. MSM queue does not exist.");
}
// This class will only post to the queue
msgQueue = new MessageQueue(queueName, QueueAccessMode.Send);
// Will use binary formatter
msgQueue.Formatter = new BinaryMessageFormatter();
// Messages will be by default recoverable
msgQueue.DefaultPropertiesToSend.Recoverable = true;
哦,对了,代码周围到处都是日志记录:如果您想编译代码,需要安装 NLog。日志记录即使在生产环境中也是必需的,使用 NLog,您只需更改一个配置文件即可将日志记录到您想要的位置:一个记录器适用于所有目的。真的。
logger.Info("Successfully attached to MSM queue: '{0}'", queueName);
但我会在文章中尽量省略日志记录语句。
如果构造成功,您可以调用 QueueMessage
方法。
// the integer parameter is the TTL value I was talking before
public void QueueMessage(MailMessage emailMessage, int deliveryAttempts = 3)
{
// some checks here...
try
{
Message message = new Message();
// we copy the mail in the body of the message
message.Body = new SerializeableMailMessage(emailMessage);
// just to be sure...
message.Recoverable = true;
// wi will use binary serialization to use as much as we can from that 4MB
message.Formatter = new BinaryMessageFormatter();
// here we sore the TTL value
message.AppSpecific = deliveryAttempts;
// scheduled delivery time, 'now' for the start
message.Extension = System.BitConverter.GetBytes(DateTime.UtcNow.ToBinary());
// unique, app-specific id
message.Label = Guid.NewGuid().ToString();
// ...see below...
message.UseAuthentication = useAuthentication;
// post the message
msgQueue.Send(message);
}
catch (Exception ex)
{
throw ex;
}
}
Extension
属性是一个字节数组,所以我不得不将当前时间戳转换为字节。我使用 UTC 来确保生产者和消费者(即使不在同一主机上)都能获得相同的时间。
队列可以设置为需要身份验证才能访问。只有在这种情况下,队列的 ACL 才有效。我还没有测试过,但似乎身份验证仅在 AD 环境中有效。嗯,代码已经准备好了。
服务
让我们先来谈谈服务安装程序。我之前写过,我采纳了一个解决方案 [3],摆脱了 installutil.exe,并有可能将服务应用程序作为普通控制台应用程序运行。这个小技巧在调试时非常有用。
// custom service executable installer class
[RunInstaller(true)]
public sealed class MSMQ_MailRelyServiceProcessInstaller : ServiceProcessInstaller
{
public MSMQ_MailRelyServiceProcessInstaller()
{
// run service as network service... remember the ACL-s!
this.Account = ServiceAccount.NetworkService;
this.Username = null;
this.Password = null;
}
}
// custom service installer class
[RunInstaller(true)]
public class MSMQ_MailRelyServiceInstaller : ServiceInstaller
{
public MSMQ_MailRelyServiceInstaller()
{
// set up basic parameters of the service
this.DisplayName = "MSMQ Mail processor service";
this.StartType = ServiceStartMode.Automatic;
this.DelayedAutoStart = true;
this.Description =
"Service is designed to send email messages posted in a messaging queue.";
// this service depends on the Microsoft Messaging Queue service
this.ServicesDependedOn = new string[] { "MSMQ" };
this.ServiceName = "MSMQ Mail Rely";
}
}
// This is the application itself
class Program
{
// Install or uninstall the service
static void Install(bool undo, string[] args)
{
try
{
Console.WriteLine(undo ? "uninstalling" : "installing");
// The service is in this assembly, thus install the executable itself
using (AssemblyInstaller inst = new AssemblyInstaller(typeof(Program).Assembly, args))
{
// Installer will return some messages
IDictionary state = new Hashtable();
inst.UseNewContext = true;
// try to install or uninstall service, and rollback process if something fails
try
{
if (undo)
{
inst.Uninstall(state);
}
else
{
inst.Install(state);
inst.Commit(state);
}
}
catch
{
try
{
inst.Rollback(state);
}
catch { }
throw;
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.Message);
}
}
// entry point
static int Main(string[] args)
{
bool install = false, uninstall = false, console = false, rethrow = false;
try
{
// let's parse arguments
foreach (string arg in args)
{
switch (arg)
{
case "-i":
case "-install":
install = true; break;
case "-u":
case "-uninstall":
uninstall = true; break;
case "-c":
case "-console":
console = true; break;
default:
Console.Error.WriteLine("Argument not expected: " + arg);
break;
}
}
// do the action
if (uninstall)
{
Install(true, args);
}
if (install)
{
Install(false, args);
}
// this is the hack:
if (console)
{
// we construct the service class outside the service host
MSMQ_MailRelyService service = new MSMQ_MailRelyService();
Console.WriteLine("Starting...");
// original event handlers are protected, so we need public methods to start up...
service.StartUp(args);
Console.WriteLine("Service '{0}' is running in console mode. " +
"Press any key to stop", service.ServiceName);
Console.ReadKey(true);
// ...and to shut it down
service.ShutDown();
Console.WriteLine("System stopped");
}
// when loaded as real service we will create the host...
else if (!(install || uninstall))
{
// so that windows sees error...
rethrow = true;
ServiceBase[] services = { new MSMQ_MailRelyService() };
// ...and run the service
ServiceBase.Run(services);
rethrow = false;
}
return 0;
}
catch (Exception ex)
{
if (rethrow) throw;
Console.Error.WriteLine(ex.Message);
return -1;
}
}
}
所以,如果您想安装服务,只需以管理员身份运行 MSMQ_MailRelyService.exe -i。要调试,只需在应用程序属性的“调试”页面中添加 -c 作为命令行参数,并在 IDE 中运行它。
接下来,我将讨论服务本身。有关完整的源代码,请在此处浏览代码或下载它。
protected override void OnStart(string[] args)
{
try
{
// If queue does not exist
if (!MessageQueue.Exists(settings.QueueName))
{
// create the queue itself
msgQueue = MessageQueue.Create(settings.QueueName);
// use authentication in AD environment
msgQueue.Authenticate = settings.UseAuthentication;
// label the queue
msgQueue.Label = "MSMQ Mail Rely message queue";
}
else
{
// attach to the queue
msgQueue = new MessageQueue(settings.QueueName);
}
// we will use binary serialization
msgQueue.Formatter = new BinaryMessageFormatter();
// retrieve all properties
msgQueue.MessageReadPropertyFilter.SetAll();
// only this service can retrieve from the queue
msgQueue.DenySharedReceive = true;
// we start the message processor thread
MSMQMessageProcessor = new Thread(new ThreadStart(this.ProcessMSMQMessages));
MSMQMessageProcessor.Start();
}
catch (Exception ex)
{
throw ex;
}
}
请记住:您可以手动创建队列,但不要忘记设置 ACL。默认情况下,任何人都可以向队列中发布消息,但从中检索消息需要获得授权。创建者默认拥有此权限。因此,如果服务创建了它,它就能正常工作,但您需要拥有所有权才能通过 MMC 插件操作 ACL 或任何其他属性。如果您创建了队列,您必须授予运行服务的用户必要的权限。
由于扩展方法非常有用,我写了一个来获取要处理的下一条消息。该方法将遍历队列中的所有消息,并根据其计划时间戳获取所有符合条件的(eligible)消息。在这些符合条件的消息中,该方法将选择最旧的一条,并返回其 ID。如果没有找到符合条件的消息,则返回 null
。
public static String GetScheduledMessageID(this MessageQueue q)
{
DateTime OldestTimestamp = DateTime.MaxValue;
String OldestMessageID = null;
using (MessageEnumerator messageEnumerator = q.GetMessageEnumerator2())
{
while (messageEnumerator.MoveNext())
{
DateTime ScheduledTime = DateTime.FromBinary(
BitConverter.ToInt64(messageEnumerator.Current.Extension, 0));
if (ScheduledTime < DateTime.UtcNow) // Take only the proper ones
{
if (ScheduledTime < OldestTimestamp)
{
OldestTimestamp = ScheduledTime;
OldestMessageID = messageEnumerator.Current.Id;
}
}
}
}
return OldestMessageID;
}
主线程将执行实际工作,所以让我们看看:
private void ProcessMSMQMessages()
{
try
{
// this is a tread after all
while (true)
{
// wait for available messages, thread is blocked while queue is empty
Message message = msgQueue.Peek();
// we look for the first scheduled message with the extension method
String ID = msgQueue.GetScheduledMessageID();
// have we found a message to be processed?
if (ID != null)
{
// retrieve the elected message by it's id
message = msgQueue.ReceiveById(ID);
// deserialize original email
MailMessage mailMessage =
(message.Body as SerializeableMailMessage).GetMailMessage();
// we will store the error condition for later
Exception CachedException = null;
// by default we will not re-post if something fails
RetryReason retry = RetryReason.NoRetry;
try
{
using (var smtpClient = new SmtpClient())
{
// try to send the mail
// (do not forget to set up SMTP parameters on app.config)
smtpClient.Send(mailMessage);
}
}
// look for exceptions, if any
catch (SmtpFailedRecipientsException ex)
{
// store exception
CachedException = ex;
// traverse inner exceptions...
for (int i = 0; i < ex.InnerExceptions.Length; i++)
{
// ...to see if worth retrying
SmtpStatusCode status = ex.InnerExceptions[i].StatusCode;
if (status == SmtpStatusCode.MailboxBusy ||
status == SmtpStatusCode.MailboxUnavailable ||
status == SmtpStatusCode.InsufficientStorage)
{
// store retry reason
retry = RetryReason.Postmaster;
}
}
}
catch (SmtpException ex)
{
CachedException = ex;
if (ex.InnerException != null)
{
// this is the case of network errors
WebExceptionStatus status = (ex.InnerException as WebException).Status;
// we look for possibly recoverable situations...
if (status == System.Net.WebExceptionStatus.NameResolutionFailure ||
status == System.Net.WebExceptionStatus.ConnectFailure)
{
// ...and store the reason
retry = RetryReason.Network;
}
}
}
catch (Exception ex)
{
// nothing to do in other cases
CachedException = ex;
}
// if error looks recoverable...
if (CachedException != null)
{
if (retry != RetryReason.NoRetry)
{
// ...and we have not consumed our chances
if (message.AppSpecific > 0)
{
// update schedule time
DateTime OriginalScheduledTime =
DateTime.FromBinary(BitConverter.ToInt64(message.Extension, 0));
// determine wait time
int retryDelaySeconds;
if (retry == RetryReason.Network)
{
// network errors might recover sooner...
retryDelaySeconds = settings.NetworkRetryDelay_s;
}
else
{
// ...smtp errors can last longer
retryDelaySeconds = settings.PostmasterRetryDelay_s;
}
// calculate new schedule timestamp
message.Extension = System.BitConverter.GetBytes(
DateTime.UtcNow.ToUniversalTime().AddSeconds(retryDelaySeconds).ToBinary());
// update TTL
message.AppSpecific--;
// postpone message
msgQueue.Send(message);
}
else
{
logger.ErrorException("Failed to deliver, no more attempts.", CachedException);
}
}
else
{
logger.ErrorException("Failed to deliver, but no use to retry", CachedException);
}
}
}
// wait only if there was nothing to process
else
{
Thread.Sleep(settings.SleepInterval);
}
}
}
// Catch exception raised when thread is aborted
catch (ThreadAbortException)
{
logger.Info("Thread aborted.");
}
}
回顾应用重试的错误情况可能是有价值的。实际上,这取决于 SMTP 服务器和地址。
该服务在 app.config 中存储了几个设置。默认设置可能适合您,但您可以根据需要进行更改。
首先,我们有队列名称。由于这是服务,使用本地专用队列非常简单。技术上,也可以使用公共队列。
<applicationSettings>
<MSMQ_MailRelyService.Properties.Settings>
<setting name="QueueName" serializeAs="String">
<value>.\Private$\EmailQueue</value>
</setting>
这是线程在没有找到合适消息可供处理时休眠的时长(以毫秒为单位):
<setting name="SleepInterval" serializeAs="String">
<value>5000</value>
</setting>
是否使用身份验证:
<setting name="UseAuthentication" serializeAs="String">
<value>False</value>
</setting>
在发生网络错误时,推迟的投递尝试应延迟的时间(以秒为单位):
<setting name="NetworkRetryDelay_s" serializeAs="String">
<value>120</value>
</setting>
在发生 SMTP 错误时延迟的时间(以秒为单位)。
<setting name="PostmasterRetryDelay_s" serializeAs="String">
<value>3600</value>
</setting>
</MSMQ_MailRelyService.Properties.Settings>
</applicationSettings>
不要忘记也在 app.config 中设置您的 SMTP 环境。使用 GMail 的设置方法如下:
<system.net>
<mailSettings>
<smtp deliveryMethod="Network" from="validatedsender@gmail.com">
<network defaultCredentials="false" enableSsl="true"
host="smtp.gmail.com" port="587"
userName="username@gmail.com" password="password"/>
</smtp>
</mailSettings>
</system.net>
有关详细信息,请参阅 MSDN。
测试应用程序
我不会对此多说,代码会自我解释。正如我之前提到的,我已经适配了 [4] 来获得一个近乎完整的特性测试。
MailSender sender = new MailSender(@".\Private$\EmailQueue");
class Program
{
static void Main(string[] args)
{
// we construct the sender object with the queue name as parameter
MailSender sender = new MailSender(@".\Private$\EmailQueue");
// this will be the mail to be sent
MailMessage m = new MailMessage();
// we populate the basic fields
m.From = new MailAddress("sender@mailserver.com", "Sender Display Name");
m.To.Add(new MailAddress("to@mail.com", "To Display Name"));
m.CC.Add(new MailAddress("cc@mail.com", "CC Display Name"));
m.Subject = "Sample message";
m.IsBodyHtml = true;
m.Body = @"<h1>This is sample</h1><a " +
@"href=""http://https://codeproject.org.cn"">See this link</a>";
// we add an attachment
FileStream fs = new FileStream(@"C:\Windows\Microsoft.NET\Framework" +
@"\v4.0.30319\SetupCache\Client\SplashScreen.bmp", FileMode.Open, FileAccess.Read);
Attachment a = new Attachment(fs, "image.bmp", MediaTypeNames.Application.Octet);
m.Attachments.Add(a);
// we add an alternate view...
string str = "<html><body><h1>Picture</h1>" +
"<br/><img src=\"cid:image1\"></body></html>";
AlternateView av =
AlternateView.CreateAlternateViewFromString(str, null, MediaTypeNames.Text.Html);
// ...with an embedded image
LinkedResource lr = new LinkedResource(@"C:\Windows\Microsoft.NET\Framework" +
@"\v4.0.30319\ASP.NETWebAdminFiles\Images\ASPdotNET_logo.jpg", MediaTypeNames.Image.Jpeg);
lr.ContentId = "image1";
av.LinkedResources.Add(lr);
m.AlternateViews.Add(av);
// and finally we try to pass the mail to the rest of the solution
sender.QueueMessage(m);
}
}
就这样,各位。
关注点
我认为此解决方案可以直接用于实际应用程序。我已尽力处理了许多必须处理的情况。我认为 MSMQ 在此类应用程序中还有更多可以探索的可能性,例如使用报告类型的消息和管理队列,以便为发送者提供异步反馈,引入一些超时。由于我很快将在 AD 环境中进行测试,我将回来更新文章并提供我的发现。
历史
- 2012.02.05:首次发布,但在撰写本文期间进行了一些改进