65.9K
CodeProject 正在变化。 阅读更多。
Home

在邮件中继中使用 MSMQ

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (13投票s)

2012年2月6日

Eclipse

8分钟阅读

viewsIcon

71871

downloadIcon

2235

一个完整的解决方案,介绍如何使用 MSMQ 构建一个简单的邮件中继应用程序。

引言

我一直在做一个 .NET 项目,需要通过 SMTP 服务器发送电子邮件通知。它运行得很好,但有时通知就是没有送达。原因很简单,但也很烦人:当 SmtpClient 因任何原因无法连接到服务器时,没有任何方法可以推迟操作并稍后重试发送。因此,会抛出异常,消息被丢弃。

然后我在网上搜索了一个合适的解决方案,但只找到我设想的片段。其中之一是 Amol Rawke Pune 的文章:《使用 MSMQ 发送批量邮件》[1]。

好吧,听起来不错,但有很多限制。但其中,三个主要限制是:

  1. MailMessage 对象并非所有字段都已传递
  2. 队列使用者“服务”是一个简单的控制台应用程序
  3. 在发送邮件时,它没有处理错误情况的解决方案

它主要是一个概念验证,但我必须承认,这篇文章是我的起点。经过一番谷歌搜索,我找到了一个关于第一个问题的 很棒的解决方案[2] - 在进行了一些小改动后,我将其集成到了我的解决方案中。我还使用了 Mark Gravell 的想法 [3],让服务安装程序不依赖于 installutil.exe

我还找到了 Rami Vemula 的作品 [4],这是一个关于如何用丰富内容填充 MailMessage 的绝佳演示。因此,它非常适合我来测试序列化能力和解决方案本身。

所以,感谢这些好人做出的贡献。其他所有事情我都是自己想出来的……但这很有趣,我希望它能对很多人有所帮助。

背景

消息队列是 Windows 的一个非常基本概念。所谓的 MSMQ(Microsoft Message Queuing,微软消息队列)技术并不新,它是一个非常有用且非常稳定的 IPC(Inter-Process Communication,进程间通信)工具——甚至可以在网络上的主机之间进行事务性通信。

它有自己的限制,首先,消息大小限制为 4MB。考虑到序列化、Unicode 等的开销,我们需要处理更小的消息。好吧,我必须接受这一点——但我从未打算发送大文件。

消息队列在哪里?

应该很简单就能找到它们

  1. 右键单击“我的电脑
  2. 单击“管理
  3. 展开“服务和应用程序”
  4. 现在您可以看到“消息队列
  5. 展开“专用队列”,单击“新建专用队列

嗯,在 Windows 2008 Server 上就是这么简单。但您可能在 Windows 7 上找不到它。您需要安装消息队列服务。它是(一个标准)Windows 组件,所以不是什么大问题。

与 Windows NT 以来的任何其他对象一样,消息队列也有 ACL(访问控制列表)。我稍后会回来讨论这个问题,因为它可能很棘手。

所以我的代码由四个部分组成:

  1. 一个测试应用程序 :)
  2. 用于将消息发布到队列的库
  3. 一个用于消耗队列并发送电子邮件的服务
  4. 一个 MailMessage 序列化器-反序列化器

我将很快从最后一个开始。

错误情况呢?

嗯,我的方法是将队列视为一个非标准的 FIFO 存储——MSMQ 提供了实现此功能的工具。我正在使用消息对象的属性来设置一个类似 TTL(Time To Live,生存时间)的计数器,以及一个用于存储计划时间戳的属性。当一条消息被发布时,TTL 计数器被设置为用户定义的值,时间戳被设置为当前值。该服务将使用时间戳来仅选择“已过期”的消息——每条消息在其生命周期开始时都已过期。如果在发送邮件过程中出现可能可恢复的错误,消息将被重新发布到队列并在预定的等待时间后重新处理。每次重新发布消息时,TTL 都会递减。如果 TTL 被消耗完,则消息被丢弃。当一条消息等待重新处理时,其他消息可以被处理。

Using the Code

Mailmessage 序列化器-反序列化器

如果我们想创建分布式应用程序,序列化非常重要。我认为实际上任何类都应该是可序列化的——但它们不是。嗯,System.Net.Mail.MailMessage 默认是不可序列化的。

实际上,SmtpClient 使用某种序列化方式将其消息保存为 .elm 格式(请参阅 这篇文章),但要从这样一个符合 RFC 标准的文件中重新创建 MailMessage 将是一项艰巨的任务——而且开销也很大。

[1] 的作者创建了一个可序列化的类,该类封装了 MailMessage 属性的一个小子集。但在博文 [2] 中,我们可以看到一个完整的二进制可序列化替换(SerializeableMailMessage)原始类。

由于 MailMessage 包含许多不同不可序列化类型的属性,作者实现了 SerializeableMailAddressSerializeableAttachmentSerializeableAlternateViewSerializeableLinkedResourceSerializeableContentDispositionSerializeableContentTypeSerializeableCollection 类。

最后,将 MailMessage 的可序列化版本放入 MSMQ 消息中非常简单:

public void QueueMessage(MailMessage emailMessage)
{
  //...
  Message message = new Message(); 
  message.Body = new SerializeableMailMessage(emailMessage);
  //...
}

并将其取回:

Message message = msgQueue.Receive(); 
MailMessage mailMessage = (message.Body as SerializeableMailMessage).GetMailMessage();

我的版本是将其适配到 .NET 4.0,我改用了泛型集合等等。总的来说,这是很棒的东西,但如果您发现有什么缺失,那他(作者 [2])是要负责的 :)。不,实际上,由于我打算在实际应用程序中使用它,任何评论都值得赞赏。

消息发送器库

这部分代码 intended to be called by the application willing to send a mail. It contains a MailSender class. When constructed, it tries to attach to the MSMQ specified in the constructor parameter. If it does not exist, the construction is aborted.

if (!MessageQueue.Exists(queueName))
{
    throw new ArgumentException("Cannot instantiate. MSM queue does not exist.");
}
// This class will only post to the queue     
    msgQueue = new MessageQueue(queueName, QueueAccessMode.Send);
// Will use binary formatter    
    msgQueue.Formatter = new BinaryMessageFormatter();
// Messages will be by default recoverable    
    msgQueue.DefaultPropertiesToSend.Recoverable = true;

哦,对了,代码周围到处都是日志记录:如果您想编译代码,需要安装 NLog。日志记录即使在生产环境中也是必需的,使用 NLog,您只需更改一个配置文件即可将日志记录到您想要的位置:一个记录器适用于所有目的。真的。

logger.Info("Successfully attached to MSM queue: '{0}'", queueName);

但我会在文章中尽量省略日志记录语句。

如果构造成功,您可以调用 QueueMessage 方法。

// the integer parameter is the TTL value I was talking before
public void QueueMessage(MailMessage emailMessage, int deliveryAttempts = 3)
{
   // some checks here...

   try
   {
      Message message = new Message();
      // we copy the mail in the body of the message
      message.Body = new SerializeableMailMessage(emailMessage);
      // just to be sure...
      message.Recoverable = true;
      // wi will use binary serialization to use as much as we can from that 4MB
      message.Formatter = new BinaryMessageFormatter();
      // here we sore the TTL value
      message.AppSpecific = deliveryAttempts;
      // scheduled delivery time, 'now' for the start
      message.Extension = System.BitConverter.GetBytes(DateTime.UtcNow.ToBinary()); 
      // unique, app-specific id
      message.Label = Guid.NewGuid().ToString();
      // ...see below...
      message.UseAuthentication = useAuthentication;
      // post the message
      msgQueue.Send(message);
   }
   catch (Exception ex)
   {
      throw ex;
    }
}

Extension 属性是一个字节数组,所以我不得不将当前时间戳转换为字节。我使用 UTC 来确保生产者和消费者(即使不在同一主机上)都能获得相同的时间。

队列可以设置为需要身份验证才能访问。只有在这种情况下,队列的 ACL 才有效。我还没有测试过,但似乎身份验证仅在 AD 环境中有效。嗯,代码已经准备好了。

服务

让我们先来谈谈服务安装程序。我之前写过,我采纳了一个解决方案 [3],摆脱了 installutil.exe,并有可能将服务应用程序作为普通控制台应用程序运行。这个小技巧在调试时非常有用。

// custom service executable installer class 
[RunInstaller(true)]
public sealed class MSMQ_MailRelyServiceProcessInstaller : ServiceProcessInstaller
{
    public MSMQ_MailRelyServiceProcessInstaller()
    {
        // run service as network service... remember the ACL-s!
        this.Account = ServiceAccount.NetworkService;
        this.Username = null;
        this.Password = null;
    }
}
// custom service installer class    
[RunInstaller(true)]
public class MSMQ_MailRelyServiceInstaller : ServiceInstaller
{
    public MSMQ_MailRelyServiceInstaller()
    {
        // set up basic parameters of the service
        this.DisplayName = "MSMQ Mail processor service";
        this.StartType = ServiceStartMode.Automatic;
        this.DelayedAutoStart = true;
        this.Description = 
          "Service is designed to send email messages posted in a messaging queue.";
        // this service depends on the Microsoft Messaging Queue service
        this.ServicesDependedOn = new string[] { "MSMQ" };
        this.ServiceName = "MSMQ Mail Rely";
    }
}
// This is the application itself
class Program
{
    // Install or uninstall the service
    static void Install(bool undo, string[] args) 
    { 
        try 
        { 
            Console.WriteLine(undo ? "uninstalling" : "installing");
            // The service is in this assembly, thus install the executable itself 
            using (AssemblyInstaller inst = new AssemblyInstaller(typeof(Program).Assembly, args)) 
            { 
                // Installer will return some messages
                IDictionary state = new Hashtable(); 
                inst.UseNewContext = true;
                // try to install or uninstall service, and rollback process if something fails
                try 
                { 
                    if (undo) 
                    { 
                        inst.Uninstall(state); 
                    } 
                    else 
                    { 
                        inst.Install(state); 
                        inst.Commit(state); 
                    } 
                } 
                catch 
                { 
                    try 
                    { 
                        inst.Rollback(state); 
                    } 
                    catch { } 
                    throw; 
                } 
            } 
        } 
        catch (Exception ex) 
        { 
            Console.Error.WriteLine(ex.Message); 
        } 
    }
    // entry point
    static int Main(string[] args)
    {
        bool install = false, uninstall = false, console = false, rethrow = false; 
        try 
        { 
            // let's parse arguments
            foreach (string arg in args) 
            { 
                switch (arg) 
                { 
                    case "-i": 
                    case "-install": 
                        install = true; break; 
                    case "-u": 
                    case "-uninstall": 
                        uninstall = true; break;
                    case "-c":
                    case "-console":
                        console = true; break; 
                    default: 
                        Console.Error.WriteLine("Argument not expected: " + arg); 
                        break; 
                } 
            } 
            // do the action
            if (uninstall) 
            { 
                Install(true, args); 
            } 
            if (install) 
            { 
                Install(false, args); 
            } 
            // this is the hack:
            if (console) 
            {
                // we construct the service class outside the service host
                MSMQ_MailRelyService service = new MSMQ_MailRelyService();
                Console.WriteLine("Starting...");
                // original event handlers are protected, so we need public methods to start up...
                service.StartUp(args);
                Console.WriteLine("Service '{0}' is running in console mode. " + 
                                  "Press any key to stop", service.ServiceName); 
                Console.ReadKey(true);
                // ...and to shut it down 
                service.ShutDown(); 
                Console.WriteLine("System stopped"); 
            } 
            // when loaded as real service we will create the host... 
            else if (!(install || uninstall)) 
            { 
                // so that windows sees error...
                rethrow = true;  
                ServiceBase[] services = { new MSMQ_MailRelyService() };
                // ...and run the service 
                ServiceBase.Run(services); 
                rethrow = false; 
            } 
            return 0; 
        } 
        catch (Exception ex) 
        { 
            if (rethrow) throw; 
            Console.Error.WriteLine(ex.Message); 
            return -1; 
        } 
    }
}

所以,如果您想安装服务,只需以管理员身份运行 MSMQ_MailRelyService.exe -i。要调试,只需在应用程序属性的“调试”页面中添加 -c 作为命令行参数,并在 IDE 中运行它。

接下来,我将讨论服务本身。有关完整的源代码,请在此处浏览代码或下载它。

protected override void OnStart(string[] args)
{         
  try
    {
      // If queue does not exist
      if (!MessageQueue.Exists(settings.QueueName))
      {
        // create the queue itself
        msgQueue = MessageQueue.Create(settings.QueueName);
        // use authentication in AD environment
        msgQueue.Authenticate = settings.UseAuthentication;
        // label the queue
        msgQueue.Label = "MSMQ Mail Rely message queue";
      }
      else
      {
        // attach to the queue
        msgQueue = new MessageQueue(settings.QueueName);
      }
      // we will use binary serialization
      msgQueue.Formatter = new BinaryMessageFormatter();
      // retrieve all properties
      msgQueue.MessageReadPropertyFilter.SetAll();
      // only this service can retrieve from the queue
      msgQueue.DenySharedReceive = true;
      // we start the message processor thread
      MSMQMessageProcessor = new Thread(new ThreadStart(this.ProcessMSMQMessages));
      MSMQMessageProcessor.Start();
    }
    catch (Exception ex)  
    {
      throw ex;
    }
}

请记住:您可以手动创建队列,但不要忘记设置 ACL。默认情况下,任何人都可以向队列中发布消息,但从中检索消息需要获得授权。创建者默认拥有此权限。因此,如果服务创建了它,它就能正常工作,但您需要拥有所有权才能通过 MMC 插件操作 ACL 或任何其他属性。如果您创建了队列,您必须授予运行服务的用户必要的权限。

由于扩展方法非常有用,我写了一个来获取要处理的下一条消息。该方法将遍历队列中的所有消息,并根据其计划时间戳获取所有符合条件的(eligible)消息。在这些符合条件的消息中,该方法将选择最旧的一条,并返回其 ID。如果没有找到符合条件的消息,则返回 null

public static String GetScheduledMessageID(this MessageQueue q)
{
   DateTime OldestTimestamp = DateTime.MaxValue;
   String OldestMessageID = null;

   using (MessageEnumerator messageEnumerator = q.GetMessageEnumerator2())
   {                
      while (messageEnumerator.MoveNext())
      {
         DateTime ScheduledTime = DateTime.FromBinary(
            BitConverter.ToInt64(messageEnumerator.Current.Extension, 0));
         if (ScheduledTime < DateTime.UtcNow) // Take only the proper ones 
         {
            if (ScheduledTime < OldestTimestamp)
            {
               OldestTimestamp = ScheduledTime;
               OldestMessageID = messageEnumerator.Current.Id;
            }
         }
      }
   }
   return OldestMessageID;
}

主线程将执行实际工作,所以让我们看看:

private void ProcessMSMQMessages()
{
   try
   {
      // this is a tread after all
      while (true)
      {
         // wait for available messages, thread is blocked while queue is empty
         Message message = msgQueue.Peek();
         // we look for the first scheduled message with the extension method  
         String ID = msgQueue.GetScheduledMessageID(); 
         // have we found a message to be processed?
         if (ID != null)
         {
            // retrieve the elected message by it's id
            message = msgQueue.ReceiveById(ID);
            // deserialize original email
            MailMessage mailMessage = 
              (message.Body as SerializeableMailMessage).GetMailMessage();
            // we will store the error condition for later 
            Exception CachedException = null;
            // by default we will not re-post if something fails
            RetryReason retry = RetryReason.NoRetry;
            try
            {
               using (var smtpClient = new SmtpClient())
               {
                  // try to send the mail
                  // (do not forget to set up SMTP parameters on app.config)
                  smtpClient.Send(mailMessage);
               }
            }
            // look for exceptions, if any
            catch (SmtpFailedRecipientsException ex)
            {
               // store exception
               CachedException = ex;
               // traverse inner exceptions...
               for (int i = 0; i < ex.InnerExceptions.Length; i++)
               {
                  // ...to see if worth retrying
                  SmtpStatusCode status = ex.InnerExceptions[i].StatusCode;
                  if (status == SmtpStatusCode.MailboxBusy ||
                     status == SmtpStatusCode.MailboxUnavailable ||
                     status == SmtpStatusCode.InsufficientStorage)
                  {
                     // store retry reason
                     retry = RetryReason.Postmaster;
                  }
               }
            }
            catch (SmtpException ex)
            {
               CachedException = ex;
               if (ex.InnerException != null)
               {
                  // this is the case of network errors 
                  WebExceptionStatus status = (ex.InnerException as WebException).Status;
                  // we look for possibly recoverable situations...
                  if (status == System.Net.WebExceptionStatus.NameResolutionFailure ||
                        status == System.Net.WebExceptionStatus.ConnectFailure)
                  {
                     // ...and store the reason
                     retry = RetryReason.Network;
                  }
               }
            }
            catch (Exception ex)
            {
               // nothing to do in other cases
               CachedException = ex;
            }
            // if error looks recoverable...
            if (CachedException != null)
            {
               if (retry != RetryReason.NoRetry)
               {
                  // ...and we have not consumed our chances
                  if (message.AppSpecific > 0)
                  {
                     // update schedule time
                     DateTime OriginalScheduledTime = 
                       DateTime.FromBinary(BitConverter.ToInt64(message.Extension, 0));
                     // determine wait time
                     int retryDelaySeconds;
                     if (retry == RetryReason.Network)
                     {
                        // network errors might recover sooner...
                        retryDelaySeconds = settings.NetworkRetryDelay_s;
                     }
                     else
                     {
                        // ...smtp errors can last longer
                        retryDelaySeconds = settings.PostmasterRetryDelay_s;
                     }
                     // calculate new schedule timestamp
                     message.Extension = System.BitConverter.GetBytes(
                       DateTime.UtcNow.ToUniversalTime().AddSeconds(retryDelaySeconds).ToBinary());
                     // update TTL
                     message.AppSpecific--;
                     // postpone message
                     msgQueue.Send(message);
                  }
                  else
                  {
                     logger.ErrorException("Failed to deliver, no more attempts.", CachedException);
                  }
               }
               else
               {
                  logger.ErrorException("Failed to deliver, but no use to retry", CachedException);
               }
            }
         }
         // wait only if there was nothing to process
         else
         {
            Thread.Sleep(settings.SleepInterval);
         }
      }
   }
   // Catch exception raised when thread is aborted
   catch (ThreadAbortException)
   {
      logger.Info("Thread aborted.");
   }
}

回顾应用重试的错误情况可能是有价值的。实际上,这取决于 SMTP 服务器和地址。

该服务在 app.config 中存储了几个设置。默认设置可能适合您,但您可以根据需要进行更改。

首先,我们有队列名称。由于这是服务,使用本地专用队列非常简单。技术上,也可以使用公共队列。

<applicationSettings>
    <MSMQ_MailRelyService.Properties.Settings>
        <setting name="QueueName" serializeAs="String">
            <value>.\Private$\EmailQueue</value>
        </setting>

这是线程在没有找到合适消息可供处理时休眠的时长(以毫秒为单位):

<setting name="SleepInterval" serializeAs="String">
    <value>5000</value>
</setting>

是否使用身份验证:

<setting name="UseAuthentication" serializeAs="String">
    <value>False</value>
</setting>

在发生网络错误时,推迟的投递尝试应延迟的时间(以秒为单位):

<setting name="NetworkRetryDelay_s" serializeAs="String">
    <value>120</value>
</setting>

在发生 SMTP 错误时延迟的时间(以秒为单位)。

         <setting name="PostmasterRetryDelay_s" serializeAs="String">
            <value>3600</value>
        </setting>
    </MSMQ_MailRelyService.Properties.Settings>
</applicationSettings>

不要忘记也在 app.config 中设置您的 SMTP 环境。使用 GMail 的设置方法如下:

<system.net>
    <mailSettings>
      <smtp deliveryMethod="Network" from="validatedsender@gmail.com">
        <network defaultCredentials="false" enableSsl="true" 
           host="smtp.gmail.com" port="587" 
           userName="username@gmail.com" password="password"/>
      </smtp>
    </mailSettings>
</system.net>

有关详细信息,请参阅 MSDN

测试应用程序

我不会对此多说,代码会自我解释。正如我之前提到的,我已经适配了 [4] 来获得一个近乎完整的特性测试。

MailSender sender = new MailSender(@".\Private$\EmailQueue"); 
class Program
{
   static void Main(string[] args)
   {
      // we construct the sender object with the queue name as parameter 
      MailSender sender = new MailSender(@".\Private$\EmailQueue"); 
      // this will be the mail to be sent
      MailMessage m = new MailMessage();
      // we populate the basic fields 
      m.From = new MailAddress("sender@mailserver.com", "Sender Display Name");
      m.To.Add(new MailAddress("to@mail.com", "To Display Name"));
      m.CC.Add(new MailAddress("cc@mail.com", "CC Display Name"));
      m.Subject = "Sample message";
      m.IsBodyHtml = true;
      m.Body = @"<h1>This is sample</h1><a " + 
        @"href=""http://https://codeproject.org.cn"">See this link</a>";
      // we add an attachment
      FileStream fs = new FileStream(@"C:\Windows\Microsoft.NET\Framework" + 
        @"\v4.0.30319\SetupCache\Client\SplashScreen.bmp", FileMode.Open, FileAccess.Read);
      Attachment a = new Attachment(fs, "image.bmp", MediaTypeNames.Application.Octet);
      m.Attachments.Add(a);
      // we add an alternate view...
      string str = "<html><body><h1>Picture</h1>" + 
         "<br/><img src=\"cid:image1\"></body></html>";
      AlternateView av = 
        AlternateView.CreateAlternateViewFromString(str, null, MediaTypeNames.Text.Html);
      // ...with an embedded image
      LinkedResource lr = new LinkedResource(@"C:\Windows\Microsoft.NET\Framework" + 
        @"\v4.0.30319\ASP.NETWebAdminFiles\Images\ASPdotNET_logo.jpg", MediaTypeNames.Image.Jpeg);
      lr.ContentId = "image1";
      av.LinkedResources.Add(lr);
      m.AlternateViews.Add(av);
      // and finally we try to pass the mail to the rest of the solution 
      sender.QueueMessage(m);
   }
}

就这样,各位。

关注点

我认为此解决方案可以直接用于实际应用程序。我已尽力处理了许多必须处理的情况。我认为 MSMQ 在此类应用程序中还有更多可以探索的可能性,例如使用报告类型的消息和管理队列,以便为发送者提供异步反馈,引入一些超时。由于我很快将在 AD 环境中进行测试,我将回来更新文章并提供我的发现。

历史

  • 2012.02.05:首次发布,但在撰写本文期间进行了一些改进
© . All rights reserved.