使用 MSMQ 排队 Windows 服务工作项,并使用 WCF Net Named Pipe 进行实时进度显示






4.89/5 (8投票s)
如何使用 MSMQ 为 Windows 服务排队工作项以供执行,然后使用 WCF Net Named Pipe 向 WPF 应用程序报告进度
引言
本文将演示如何使用 MSMQ 排队工作项,通过 Windows 服务处理它们,然后使用 WCF NetNamedPipe 向 WPF 应用程序报告进度
背景
不久前,我需要定期(每天)解析 3GB 以上的关系数据。最初,我构建的解析器将所有内容都放在一个应用程序中。当我需要解析时,我只需启动应用程序,让它运行几个小时处理数据,然后关闭它。
这样做是有效的,但这种架构会长时间占用我的机器,而且如果解析器失败,我必须从头开始。重新启动失败的部分并从工作项列表中恢复进度会带来很多额外的工作。此外,我并不总是关心可视化进度;当我关心时,我不想在应用程序的其余部分中拥有一些宏大的命令行标志逻辑来仅用于切换 UI。
我想要的是在一个专用框上排队特定操作,并让它自动开始处理。我想能够随时单独排队操作,或者乱序排队。我还希望能够启动框上的专用应用程序来查看进度。
为了实现这一点,我需要 MSMQ 来处理操作,一个 Windows 服务来消耗队列,以及 WCF Net Named Pipe 来将进度进行进程间通信到另一个应用程序(如果它在监听的话)。
Outline
本文分为两部分
设置
首先,如果尚未启用 MSMQ,我们需要启用它。转到“控制面板”,然后选择“启用或关闭 Windows 功能”
然后,选择 MSMQ 以启用它。
接下来,我们将启动 compmgmt.msc
,并向下滚动到“专用队列”
右键单击,然后添加一个新队列。
如果深入查看队列的属性,您可以进行设置,以便在队列项被消耗时收到日志条目,以及一些其他 企业级功能。
如果您决定启用日志记录,这里有一个来之不易的建议: 您必须不时手动清空日志 -“将日志存储限制为 (KB)”并不意味着“达到此大小后刷新旧条目”,它的意思是“当日志达到此大小时停止写入日志”。
既然我们在这里,让我们记下这个队列的名称。我的队列是 win81-tqgmg4i5\private$\HardWorkingQueue
。稍后我们将需要它。
注意: 现在您应该花时间点击“安全”选项卡
您会注意到,默认情况下,“Everyone”被允许使用我们的队列。就本文而言,目前这样是可以的;但在生产环境中部署之前,您需要删除“Everyone”,并授予本地计算机的“System”帐户完全控制权限。我们不希望任何可以登录到此框的用户都能使用我们的队列。
更重要的是,我们需要添加我们的 Windows 服务将要运行的帐户(即 System),如果我们现在不这样做,将来会遇到问题。
代码
现在我们已经完成了这些,让我们开始编写一些代码!
代码包含以下部分
入队消息
让我们先将一个工作项放入队列。暂时保持简单,并添加一个控制台应用程序。
class Program
{
static void Main(string[] args)
{
GatherAndSend();
}
private static void GatherAndSend()
{
Console.WriteLine("type some stuff to queue up");
var input = Console.ReadLine();
using (var mq = new MessageQueue(ConfigurationManager.AppSettings["OperationMessageQueuePath"]))
{
var qItem = new QueuedWorkItem() { Name = input };
using (var msg = new System.Messaging.Message(qItem))
{
msg.Label = "Queued Item from the console";
msg.Formatter = new XmlMessageFormatter(new Type[] { typeof(QueuedWorkItem) });
mq.Send(msg);
Console.WriteLine("Message sent. Message ID is {0}", msg.Id);
}
}
Console.WriteLine("write another message to the queue? Y/N");
var decision = Console.ReadLine();
if(decision == "y")
{ GatherAndSend(); }
}
}
很简单,对吧?使用我们上面获得的名称实例化消息队列,创建一个消息,声明格式化程序,然后发送它。让我们运行我们的控制台,输入一些内容,然后查看消息队列!
这就是我们使用 XML 而不是二进制的原因 - 请看,这清楚易懂,可以告诉我们正在排队什么?
好的,继续。
WCF 命名管道
首先,让我们通过定义 WCF 服务来开始,该服务将充当两个进程之间的中介
所有 WCF 服务都以服务协定开始
[ServiceContract]
internal interface IPipeService
{
[OperationContract]
void RecieveMessage(String message);
}
然后是实现
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
internal class PipeService : IPipeService
{
public static string URI = "net.pipe:///Pipe";
public Action<String> MessageReceived = null;
public void ReceiveMessage(String message)
{
if (MessageReceived != null)
{ MessageReceived(items); }
}
}
请注意,我们的实现实际上并没有处理当 WCF 服务接收到数据时发生的业务逻辑 - 它依赖于委托来处理。我们稍后会回来讨论这个问题。
我选择了 `internal` 关键字。通常,我们只声明协定和实现就认为完成了 - 然而,发送方和接收方都需要进行一些小的设置和清理工作才能使用 WCF 服务 - 出于应用程序逻辑的考虑,我希望在每次发送或接收消息时 不 进行这些操作 - 因此,我们将提供一种更简单的方式来与管道交互,并向我们的开发人员公开 这个,而不是原始的 WCF 基础结构。
在发送方,我们将构建以下内容
public class Sender { private static Object _Lock = new Object(); private static EndpointAddress _endpointAddress = new EndpointAddress(String.Format("{0}/{1}", PipeService.URI, Receiver.DefaultPipeName)); /// <summary> /// Attempts to send the message to the proxy at the pre-configured endpoint /// </summary> /// <param name="message">The message to send</param> /// <returns>True, upon success</returns> public static Boolean SendMessage(String message) { var success = false; try { lock (_Lock) //ensure thread exclusivity when sending messages across the wire { var proxy = ChannelFactory<IPipeService>.CreateChannel(new NetNamedPipeBinding(), _endpointAddress); proxy.RecieveMessage(message); } success = true; } catch (Exception ex) //Most likely, there was nobody to send a message to. { } //TODO : Add some logging return success; } }
这使得交互非常简单 - 要通过管道发送消息到接收方,我们只需要调用 Sender.SendMessage
即可。- 简单得多。(而且,我们是线程安全的!)
请注意,我们每次尝试发送消息时都在重新创建代理 - 这是因为当 Windows 服务运行时(并且发送消息)时,服务的宿主可能会间歇性地上线和下线,因为我们会定期检查进度。我们希望它在我们有人观看时工作,而在无人观看时默默失败。如果我们将其设为单例,我们只有一次机会检查宿主是否存在;而通过这种方式,我们在发送之前都有机会检查。
现在,接收方有点不同
首先,我们将开始声明成员变量
public const String DefaultPipeName = "Pipe1";
private PipeService _ps = new PipeService();
private ServiceHost _host = null;
private Boolean _operational { get; set; }
#region PipeName
private String _PipeName = String.Empty;
/// <summary>
/// Gets the name of the pipe being used by this reciever
/// </summary>
public String PipeName
{
get { return _PipeName; }
}
#endregion
然后,我们将声明构造函数
public Receiver(Action<String> messageReceivedAction) : this(DefaultPipeName, messageReceivedAction) { }
public Receiver(String pipeName, Action<String> messageReceivedAction)
{
_PipeName = pipeName;
_ps.MessageReceived = messageReceivedAction;
}
在构造函数中,我们传入一个委托,该委托将在从发送方接收到数据时执行。我们默认使用预定义的名称,但也允许调用者根据需要用其他名称覆盖它。
这里是我们的魔法开始的地方 - 我们实际启动服务的地方
/// <summary>
/// Performs the act of starting the WCF host service
/// </summary>
/// <returns>true, upon success</returns>
public Boolean ServiceOn()
{
try
{
_host = new ServiceHost(_ps, new Uri(PipeService.URI));
_host.AddServiceEndpoint(typeof(IPipeService), new NetNamedPipeBinding(), _PipeName);
_host.Open();
_operational = true;
}
catch (Exception ex)
{
_operational = false;
}
return _operational;
}
我们正在启动一个新的 WCF 服务宿主,基于我们构造的 IPipeService
实例,然后使用 NetNamedPipeBinding
作为我们的绑定协议,将 IPipeService
服务协定的操作绑定到位于 _PipeName
的管道。如果一切成功启动,我们就存储 `true` 并返回它。
停止 WCF 服务
/// <summary>
/// Stops the hosting service
/// </summary>
public void ServiceOff()
{
if (_host == null)
{ return; } //already turned off
if (_host.State != CommunicationState.Closed)
{ _host.Close(); }
_operational = false;
}
Windows 服务
让我们通过向解决方案添加一个 Windows 服务来开始这一步。您可以随意命名。Windows 服务基本上是在后台一直运行且不提供 UI 的程序。如果您感兴趣,维基百科有一篇很好的文章 提供了更多关于它的详细信息。
我需要在此刻严厉批评一下微软,因为他们从默认项目中省略了大量必要的东西,而这些东西是 Windows 服务在安装后拥有满意用户所必需的。
- 他们省略了安装配置脚手架
- 他们省略了
System.Configuration.ConfigurationManager
的代理
我在这里稍作停顿,以对第二点提供进一步的解释。当您的 Windows 服务安装时,它不是在其自己的应用程序上下文中运行 - 它是在服务控制管理器(或更恰当地说,InstallUtil.exe)的上下文中运行。由于在安装时我们实际上不是在自己的执行上下文中运行,因此我们使用 app.config 以及通过代理使用 ConfigurationManager
的能力被移除了。
让我们首先声明我们的安装脚手架。添加对 System.Configuration.Install
的引用,并添加以下类
[RunInstaller(true)]
public partial class HardWorkingServiceInstaller : System.Configuration.Install.Installer
{
}
我们在这里做的是创建一个自定义安装程序。我们将使用它来设置 Windows 服务的属性,如名称、描述、帐户和启动类型。
接下来,我们需要一种在安装时访问 app.config 值的方法。添加以下类
internal static class InstallTimeConfigurationManager
{
public static string GetConfigurationValue(string key)
{
Assembly service = Assembly.GetAssembly(typeof(HardWorkingServiceInstaller));
var config = ConfigurationManager.OpenExeConfiguration(service.Location);
return config.AppSettings.Settings[key].Value;
}
}
这将加载 app.config,并在安装时提供对 app.config 值的编程访问。
现在,我们可以回到自定义安装程序,并充实这个类。
[RunInstaller(true)]
public partial class HardWorkingServiceInstaller : System.Configuration.Install.Installer
{
public HardWorkingServiceInstaller() : base()
{
var serviceProcessInstaller = new ServiceProcessInstaller();
var serviceInstaller = new ServiceInstaller();
//Service Account Information
serviceProcessInstaller.Account = ServiceAccount.LocalSystem;
serviceProcessInstaller.Username = null;
serviceProcessInstaller.Password = null;
//Service Information
serviceInstaller.DisplayName = InstallTimeConfigurationManager.GetConfigurationValue("ServiceDisplayName");
serviceInstaller.Description = InstallTimeConfigurationManager.GetConfigurationValue("ServiceDescription");
serviceInstaller.StartType = ServiceStartMode.Automatic;
serviceInstaller.DelayedAutoStart = true;
//This must be identical to the WindowsService.ServiceBase name
//set in the constructor of WindowsService.cs
serviceInstaller.ServiceName = InstallTimeConfigurationManager.GetConfigurationValue("SystemServiceName");
this.Installers.Add(serviceProcessInstaller);
this.Installers.Add(serviceInstaller);
this.Committed += Installer_Committed;
}
private void Installer_Committed(Object sender, InstallEventArgs e)
{
//auto start the service once the installation is finished
var controller = new ServiceController(InstallTimeConfigurationManager.GetConfigurationValue("SystemServiceName"));
controller.Start();
}
}
我们基本上告诉安装程序在本地计算机的 System 帐户下运行,为 Windows 服务管理器中的发现设置显示名称和描述,并将其设置为延迟自动启动。此外,一旦我们安装了它,我们将立即将其设置为启动(所以我们不会忘记这样做……像我可能……不止一次……)
现在,添加对 System.Messaging
的引用,然后转到继承自 ServiceBase
的 HardWorkingService
类
#region OperationMessageQueue
private MessageQueue _OperationMessageQueue;
protected MessageQueue OperationMessageQueue
{
get
{
if (_OperationMessageQueue == null)
{
_OperationMessageQueue = new MessageQueue(ConfigurationManager.AppSettings["OperationMessageQueuePath"]);
_OperationMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(QueuedWorkItem) });
_OperationMessageQueue.ReceiveCompleted += Mq_ReceiveCompleted;
}
return _OperationMessageQueue;
}
}
private void Mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
throw new NotImplementedException();
}
#endregion
我们正在设置一个委托,以便在项目通过队列传递时执行。诚然,XML 不是我们 MQ 选项中最有效的;我们可以使用二进制格式;但是,如果我们以后想分析队列的日志,XML 可以提供人类可读的队列传递记录。
花点时间填写 Start/Stop 操作
protected override void OnStart(string[] args)
{
OperationMessageQueue.BeginReceive();
}
protected override void OnStop()
{
OperationMessageQueue.ReceiveCompleted -= Mq_ReceiveCompleted;
OperationMessageQueue.Dispose();
}
protected override void OnPause()
{
OperationMessageQueue.ReceiveCompleted -= Mq_ReceiveCompleted;
}
protected override void OnContinue()
{
OperationMessageQueue.ReceiveCompleted += Mq_ReceiveCompleted;
OperationMessageQueue.BeginReceive();
}
没有什么特别的,需要记住的重要一点是,当 Windows 服务管理器暂停您的应用程序时,您实际上仍在运行 - 对象保留在内存中,进程、线程等继续执行。基本上,您收到了一个友好的请求,要求停止您正在做的事情并进入低功耗状态。对我们来说,这意味着在我们接收消息的时间到来时,取消我们的事件处理程序,并在继续时重新连接它。
现在回到那个消息接收事件处理程序
private void Mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
using (var msg = OperationMessageQueue.EndReceive(e.AsyncResult))
{
try
{
var qItem = msg.Body as QueuedWorkItem;
ProcessQueuedWorkItem(qItem);
}
catch (Exception ex)
{
//TODO : Write to the log we failed in some way
Environment.Exit(Environment.ExitCode);
}
}
OperationMessageQueue.BeginReceive();
}
MSMQ 的 API 在 .Net 的 基于事件的异步编程模型 下运行。也就是说,Begin() 方法调用异步操作,调用委托来处理完成,我们从 End() 方法中提取结果。这没问题 - 唯一的问题是我们需要再次启动该进程,以便收集队列中的下一条消息;否则,我们的 Windows 服务将只处理一条消息。
这不是唯一的做法。我们可以阻塞在一个 `while(true)` 循环中调用 .Recieve
方法 - 我是说,我们毕竟没有在 UI 线程上工作;然而,我更喜欢异步实现,因为它允许我们的应用程序在等待消息时将资源让还给系统。
现在是时候进行魔法操作了
private void ProcessQueuedWorkItem(QueuedWorkItem item)
{
NamedPipe.Sender.SendMessage(String.Format("Starting work on {0}", item.Name));
var delay = new TimeSpan(0,0,3);
for(var i = 0; i < 5; i++)
{
NamedPipe.Sender.SendMessage(String.Format("beginning work on item {0} of 5 for {1}", i, item.Name));
System.Threading.Thread.Sleep(delay); //yep, we're "working" really hard here :)
NamedPipe.Sender.SendMessage(String.Format("completed work on item {0} of 5 for {1}", i, item.Name));
}
NamedPipe.Sender.SendMessage(String.Format("Completed work on {0}", item.Name));
}
由于我们将所有 WCF 设置和错误检查逻辑都路由到了 Sender 类,因此将消息传递到另一个进程非常容易。
我认为这里真正的诀窍是它需要一种思维的转变 - 第一种想法可能是 Windows 服务将是 WCF 宿主,而 Windows 应用程序将是消费者。实际上,情况恰恰相反 - 我们的 Windows 服务将向一个固定地址广播消息,如果该地址没有管道宿主,那么我们将把消息路由到垃圾桶。
此时,我们就可以启动 Windows 服务,让它开始处理本地队列了。使用 InstallUtil.exe 安装服务(请确保您以管理员身份运行),并且 将 Visual Studio 调试器附加到正在运行的进程 如果您想单步调试代码。
完成后,您应该有两个结果
InstallUtil 的确认
以及您的服务现在已列在 Windows 服务列表中
我们配置了服务自动启动,看!它正在运行!另外,四处看看,名称、描述和其他变量都已存在!如果您查看 MSMQ,它现在也应该是空的,这意味着我们正在积极地从队列中消耗指令。
使用 WPF 显示消息
此时,我们已经实现了我们的三个目标中的两个——能够自动处理指令,并且能够以任何我们选择的顺序排队指令。现在,让我们来实现实时进度。
让我们添加一个 WPF 项目,并从 NuGet 添加 MVVM light。我认为他们的 NuGet PowerShell 脚本需要更新,因为每次添加它时,命名空间在 app.xaml 中都不会正确设置。我们来修复一下。前往 app.xaml,并像这样调整它
<Application
x:Class="ProgressObserver.App"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
StartupUri="MainWindow.xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
d1p1:Ignorable="d"
xmlns:d1p1="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:vm="clr-namespace:ProgressObserver.ViewModel">
<Application.Resources>
<vm:ViewModelLocator x:Key="Locator" d:IsDataSource="True" />
</Application.Resources>
</Application>
默认情况下,MVVM light 在各个资源级别声明 XML 命名空间。我只是将其提升到文档级别,因为对于这个项目,我们将使用此命名空间中的所有内容,并且不会有冲突的命名空间。
我们去窗口的视图
<Window x:Class="ProgressObserver.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:converter="clr-namespace:ProgressObserver.Converters"
Title="MainWindow" Height="350" Width="525"
DataContext="{Binding Source={StaticResource Locator}, Path=Main}">
<Window.Resources>
<ResourceDictionary>
<converter:LocalTimeConverter x:Key="localTime" />
</ResourceDictionary>
</Window.Resources>
<Grid>
<ItemsControl ItemsSource="{Binding Messages}">
<ItemsControl.ItemTemplate>
<DataTemplate>
<StackPanel Orientation="Horizontal">
<TextBlock Text="{Binding Recieved, Converter={StaticResource localTime}}" />
<TextBlock Text=":" />
<TextBlock Text="{Binding Message}" />
</StackPanel>
</DataTemplate>
</ItemsControl.ItemTemplate>
</ItemsControl>
</Grid>
</Window>
这里没有什么特别的,除了使用 IValueConverter
将 UTC 转换为本地时间以供用户使用。我将指出一点——当我刚接触 MVVM light 时,有一点我一开始没有真正理解,那就是在 XAML 中声明窗口的数据上下文,而不是在代码隐藏中。由于我们在 app.xaml 文件中将我们的定位器声明为静态资源,我们可以将应用程序的所有视图模型作为属性挂在它上面。在这里,我们的主窗口的视图模型只是我们定位器的属性之一。
快完成了,我们去 MainViewModel。
我们需要设置 WCF 服务宿主 - 这允许 Windows 服务与我们的应用程序进行进程间通信
#region Receiver Property
/// <summary>
/// Member-level variable backing <see cref="Receiver" />
/// </summary>
private Receiver _Receiver = null;
/// <summary>
/// Gets and sets the <see cref="Receiver" /> property.
/// </summary>
public Receiver Receiver
{
get {
if(_Receiver == null)
{ _Receiver = new Receiver(this.OnMessageReceived); }
return _Receiver;
}
set
{
Set(() => Receiver, ref _Receiver, value);
}
}
#endregion
注意我们如何将一个委托传递给构造函数,它告诉管道在报告来自我们 Windows 服务的进度时要执行哪个方法。
在构造函数中,我们将启动 WCF 宿主
/// <summary>
/// Initializes a new instance of the MainViewModel class.
/// </summary>
public MainViewModel()
{
Messages = new ObservableCollection<MessageItem>();
Receiver.ServiceOn();
}
最后,充实我们的消息处理委托
private void OnMessageReceived(String message)
{
Dispatcher.CurrentDispatcher.Invoke(() => {
Messages.Add(new MessageItem(message));
});
}
请注意,我们是如何将 add 操作包装在已调度的调用中的。当我们添加消息时,我们并不是在 UI 线程上执行。如果我们处理的是标量类型(int、string 等),WPF 会自动为我们处理 PropertyChanged
通知并将其发送到 UI 线程。由于我们正在修改集合类型,WPF 不会为我们进行路由,因此我们必须通过调度程序在 UI 线程上调用此代码。
现在,让我们启动所有内容。如果您将 Visual Studio 设置为同时运行我们的控制台和我们的可视化工具,而服务正在运行,那么您应该能够执行以下操作
加分项!
如果您的 MSMQ/工作机也是 Web 服务器,您可以通过 Web 页面通过 Web API 启动排队的任务!下载代码以查看如何做到这一点。
历史
2014-12-13 - 首次发布