具有远程控制 GUI 的 Windows 服务多线程
一个 Windows 服务,
引言
我被要求创建一个 Windows 服务,该服务在独立的线程中处理不同的任务。这个案例的背景是,我们有几个 SQL-Server-Agents 中的作业(复杂的数据选择和传输),有时可能需要 8 小时以上。
有时 IIS 会终止一个作业,我们找不到合理的解释,但我们猜测是某种超时。
因此,解决方案是将这些任务放入 Windows 服务中,并让任务以无限超时运行。
这是本文的扩展链接,如果您希望您的服务在不同的服务器上 AlwaysOn 运行:AlwaysOn Windows 服务与远程控制
背景
我对这个服务感到满意,但困扰我的是它在后台运行,没有人知道发生了什么。
所以我想到了制作一个小型应用程序,它能显示服务的状态、当前任务,并且还能让我在安全关闭的同时存储任务的当前状态。
1. 使用代码
源代码基于 Visual Studio 2012 和 .NET 4.5。对于 GUI,您还需要这个库:http://wpfanimatedgif.codeplex.com/。
首先在 Visual Studio 中创建一个新的 Windows 服务项目(有关详细的演练,我发现这篇 MSDN 文章非常有帮助:http://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx)。
- 创建类型为“Windows 服务”的新项目。
- 将 Service1.cs 重命名为 MyService.cs(包括其依赖对象)。
- 您可以选择添加一个
EventLog
,我发现它非常有用。我编写了一个小型static
类 Helper\EventLogger.cs,它允许我快速写入日志(有关详细描述,请参阅 MSDN 文章)。
事件日志记录是源代码的一部分,但本文未作描述。
多线程服务有三种不同类型的线程
- 简单:只能执行到最后,没有并行简单线程
- 复杂:可以在几个步骤之间停止,并在下次启动时继续,也没有并行运行
- 多线程:可以同时启动 N 个任务
现在我们有了基础,我将描述服务的不同部分。
2. Program.cs
这个自动生成的类初始化 MyService
。
3. MyService.cs
为 MyService
添加一个接口类,并将其命名为 IMyService.cs。
interface IMyService
{
void StartService();
void StopService();
}
现在,打开 MyService
的代码库,包含接口并实现其成员并创建属性
private bool finalExit { get; set; }
public void StartService()
{
OnStart(null);
}
public void StopService()
{
finalExit = false;
OnStop();
}
创建一个名为 WCF 的新文件夹,并添加一个类 WCFProvider.cs,同时将一个类 ServiceExecution.cs 添加到项目的根目录。现在,我们修改 MyService.cs 的初始化器并添加另一个属性。
GetInstance
尚未实现,但将在步骤 6 中完成。
private WCFProvider wcfProvider { get; set; }
public MyService()
{
try
{
InitializeComponent();
finalExit = true;
ServiceExecution.GetInstance().myService = this;
wcfProvider = new WCFProvider();
}
catch (Exception)
{ }
}
4. ThreadHolder.cs
创建一个名为 Threading 的新文件夹和一个类 ThreadHolder.cs。
class ThreadHolder
{
public Thread thread { get; set; }
public MyThread myThread { get; set; }
public bool resuming { get; set; }
public ThreadHolder()
{
resuming = false;
}
public void Process()
{
myThread.Process(resuming);
}
}
resume 属性用于具有多个步骤的复杂线程,以便在服务之前停止后,在某个点恢复执行。
MyThread
将在步骤 7 中可用。
5. LockHolder.cs
为了在多线程类上获得安全的锁,我发现 Bill Wagner 的文章非常有帮助,并将其技术用于我的服务:更有效的 C#:第 13 条:首选 lock() 进行同步。
我将“LockHolder.cs”放在 Helper 文件夹中。
6. ServiceExecution.cs
此类是处理所有作业的主线程,将它们放入独立的线程中,并在作业完成时移除特定线程。
6.1 枚举
State
表示 ServiceExecution
线程当前运行的状态。当状态为 Stopped
时,Windows 服务本身仍在运行,可以通过 GUI 程序重新启动。
ThreadType
用于识别不同的作业。
public enum State
{
Running,
Shutting_Down,
Stopped
}
public enum ThreadType
{
SimpleThread,
ComplexThread,
MultiThread
}
6.2 此类应为线程安全的单例
此类从以下位置访问
- 图形用户界面
- 服务的主类 MyService.cs
- 每个作业线程在完成后都会回调
因此我们需要确保只存在一个实例
private ServiceExecution() { }
private static ServiceExecution instance;
private static readonly object myInstanceLock = new object();
public static ServiceExecution GetInstance()
{
// DoubleLock for thread safety
if (instance == null)
{
lock (myInstanceLock)
{
if (instance == null)
{
instance = new ServiceExecution();
}
}
}
return instance;
}
6.3 其他对象
接口从 MyService.cs 传递,通过 WCF 服务调用 Start
和 Stop
函数
public IMyService myService { get; set; }
每个作为线程执行的作业都存储在 ThreadHolder
中,以 GUID 作为唯一标识符,以便在关闭线程或检查活动线程时轻松访问线程。
private readonly Dictionary<Guid, ThreadHolder> runningThreads = new Dictionary<Guid, ThreadHolder>();
以及一些属性
private State currentState = State.Stopped;
private DateTime nextRunSimple = DateTime.Now;
private DateTime nextRunComplex = DateTime.Now;
private DateTime nextRunMulti = DateTime.Now;
private bool isProcessingSimple;
private bool isProcessingComplex;
6.4 初始化
当 ServiceExecution
线程运行时,它会定期检查是否应该启动新的子线程。逻辑将在 2.5 中描述。
当它收到关闭命令时,它会等待所有当前线程完成。
在复杂线程的情况下,发送中断当前执行的命令。在我的项目中,此线程由四个存储过程调用组成。当在步骤 2 发送中断操作命令时,线程完成第二步并将信息写入数据库,以便在服务再次启动时返回到步骤 3。
public void StartServiceExecution()
{
try
{
currentState = State.Running;
while (currentState == State.Running)
{
CheckForSimpleRun();
CheckForComplexRun();
CheckForMultipeRun();
Thread.Sleep(10000);
}
while (currentState == State.Shutting_Down)
{
using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
{
if (lockObj.LockSuccessful)
{
foreach (ThreadHolder currentThread in runningThreads.Values)
{
if (currentThread.myThread.GetThreadType() == ThreadType.ComplexThread)
{
currentThread.myThread.BreakOperation();
}
}
if (runningThreads.Count == 0)
{
currentState = State.Stopped;
}
}
}
}
}
catch (Exception)
{ }
}
6.5 检查新线程
我在这里只添加了简单线程,因此其他线程是相似的(您可以通过源代码获得它们)。如果没有简单线程正在运行且延迟时间合适,则将新线程添加到池中。线程类本身将在步骤 7 中描述。
private void CheckForSimpleRun()
{
if (isProcessingSimple == false && nextRunSimple <= DateTime.Now)
{
const ThreadType threadType = ThreadType.SimpleThread;
ThreadHolder exportThread = new ThreadHolder();
Guid guid = Guid.NewGuid();
exportThread.myThread = new MyThreadSimple(this, guid, threadType);
if (CreateWorkerThread(exportThread, threadType, guid))
{
isProcessingSimple = true;
}
}
}
6.6 创建新线程
runningThreads
属性应始终以线程安全的方式访问。如果该属性可以在合理的时间(1000 毫秒)内被声明,则新线程将被添加并立即启动。
private bool CreateWorkerThread(ThreadHolder exportThread, ThreadType threadType, Guid guid)
{
using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
{
if (lockObj.LockSuccessful)
{
Thread thread = new Thread(exportThread.Process) { Name = threadType.ToString() };
exportThread.thread = thread;
runningThreads.Add(guid, exportThread);
thread.Start();
return true;
}
}
return false;
}
6.7 清理已完成的线程
每个子线程都继承 IServiceExecution
接口,并在完成时调用此函数。相关线程类型的属性将被重置,并且线程将从池中移除
public void ThreadFinished(Guid threadId)
{
using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
{
if (lockObj.LockSuccessful)
{
if (runningThreads[threadId].myThread.GetThreadType() == ThreadType.SimpleThread)
{
nextRunSimple = DateTime.Now.AddSeconds(Properties.Settings.Default.trigger_simple_thread);
isProcessingSimple = false;
}
runningThreads.Remove(threadId);
}
}
}
6.8 从运行线程获取信息
此函数遍历所有活动线程并将信息连接成一个 string
。我使用这种方式,因为它是一种通过 WCF 服务传递数据的简单方法。
public string GetCurrentThreadInfo()
{
using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
{
if (lockObj.LockSuccessful)
{
int counter = 0;
string threadInfo = "";
foreach (KeyValuePair<Guid, ThreadHolder> currentThread in runningThreads)
{
DateTime tmpTime = currentThread.Value.myThread.GetStartTime();
if (counter == 0)
{
threadInfo = String.Format("{0:dd.MM.yyyy HH:mm:ss}", tmpTime) + "§" +
currentThread.Value.myThread.GetThreadType().ToString();
}
else
{
threadInfo = threadInfo + "#" +
String.Format("{0:dd.MM.yyyy HH:mm:ss}", tmpTime) + "§" +
currentThread.Value.myThread.GetThreadType().ToString();
}
counter++;
}
return threadInfo;
}
}
return "";
}
6.9 停止执行
目前,服务执行停止的实现必须完成。
public void StopServiceExecution()
{
currentState = State.Shutting_Down;
}
6.10 IServiceExecution.cs
此接口用于向子线程和 WCFProvider
提供必要的函数。
interface IServiceExecution
{
void StartServiceExecution();
void StopServiceExecution();
void ThreadFinished(Guid threadId);
}
7 线程类
7.1 基类“MyThread.cs”
此类提供每个线程类需要实现的函数。线程 ID 是每个线程的唯一标识符,ServiceExecution
使用它来在线程完成时删除线程对象。
startTime
用于 GUI
protected IServiceExecution serviceExecution;
protected Guid threadId;
protected ThreadType threadType;
protected DateTime startTime;
public MyThread(IServiceExecution serviceExecution, Guid threadId, ThreadType threadType)
{
this.serviceExecution = serviceExecution;
this.threadId = threadId;
this.threadType = threadType;
}
process 函数调用处理例程。根据您的线程类型,可以在继承自此基类的派生类中重写相应的函数。这些函数是 protected
,因此其他类无需访问它们。
public void Process(bool resumeProcessing = false)
{
startTime = DateTime.Now;
try
{
if (resumeProcessing)
{
ResumeProcessingData();
}
else
{
ProcessingData();
}
}
catch (Exception e)
{
//...
}
finally
{
serviceExecution.ThreadFinished(threadId);
}
}
BreakOperation
用于复杂线程以停止操作并在稍后继续运行。
public virtual void BreakOperation() { }
最后,该类包含用于线程类型和开始时间的 getter 函数。
7.2 MyThreadComplex.cs
在示例源代码中,我只包含了复杂线程的简短实现以展示中断操作,其他两个线程类只实现了基类而没有重写任何内容。
在此示例中,处理函数只是调用 worker 函数。
当调用 BreakOperation
时,worker 函数的当前步骤将完成,然后该函数将停止。在实际实现中,我将当前步骤存储在数据库中,当服务再次启动时,此保存的步骤将通过属性 resumeStep
传递。
private readonly int resumeStep;
private bool breakOperation;
public MyThreadComplex(IServiceExecution serviceExecution,
Guid threadId, ThreadType threadType, int resumeStep = 0)
: base(serviceExecution, threadId, threadType)
{
this.resumeStep = resumeStep;
}
public override void BreakOperation()
{
breakOperation = true;
}
private void Worker()
{
for (int i = resumeStep; i < 5; i++)
{
Thread.Sleep(7500);
if (breakOperation)
{
return;
}
}
}
8 WCF 服务
8.1 IServiceWCF.cs
该接口定义了 GUI 将用于与服务通信的函数。
对于 WCF 功能,我们需要添加对 System.ServiceModel
程序集的引用。
IsServiceActive
返回服务的状态Start
和Stop
使用IsOneWay-Behaviour
实现,因为 GUI 不应等待响应,因为它是void
调用GetActiveThread
以连接的string
形式返回所有运行线程的信息
using System.ServiceModel;
[ServiceContract]
public interface IServiceWCF
{
[OperationContract]
State IsServiceActive();
[OperationContract(IsOneWay = true)]
void StartImosExportService();
[OperationContract(IsOneWay = true)]
void StopImosExportService();
[OperationContract]
string GetActiveThreads();
}
8.2 ServiceWCF.cs
此类包含接口类的实现
ConcurrencyMode
设置为Multiple
,这意味着可以同时发生多个调用。例如:可以在请求ActiveThreads
的同时调用StopService
。-
SynchronizationContext
设置为false
,因为将访问不同的线程。
[ServiceBehavior(UseSynchronizationContext=false, ConcurrencyMode=ConcurrencyMode.Multiple)]
public class ServiceWCF : IServiceWCF
{
public State IsServiceActive()
{
return ServiceExecution.GetInstance().CheckIfActive();
}
public void StartImosExportService()
{
ServiceExecution.GetInstance().myService.StartService();
}
public void StopImosExportService()
{
ServiceExecution.GetInstance().myService.StopService();
}
public string GetActiveThreads()
{
string threadInfo = ServiceExecution.GetInstance().GetCurrentThreadInfo();
return threadInfo;
}
}
8.3 WCFProvider.cs
在此类中,WCF 服务被设置。由于我只在服务运行的同一服务器上使用 GUI,所以我使用了简单的命名管道端点,但您可以实现所有类型的 WCF 通信。
readonly ServiceHost serviceProvider;
public WCFProvider()
{
serviceProvider = new ServiceHost(
typeof(ServiceWCF), new[] { new Uri("net.pipe:///MyService/") });
serviceProvider.AddServiceEndpoint(typeof(IServiceWCF),
new NetNamedPipeBinding(), "PipeForMyService");
serviceProvider.Open();
}
public void StopProvidingService()
{
serviceProvider.Close();
}
9 GUI
现在,只需为 GUI 部分向解决方案添加第二个项目。我使用 WPF,但我不会过多地深入设计细节,因为只要您可以使用 WCF 服务,您就可以使用您喜欢的任何方式。
同样,我们需要引用 System.ServiceModel
程序集。
然后,我们可以将映射器添加到服务
enum
用于service-state
- WCF 函数的接口
public enum State
{
Running,
Shutting_Down,
Stopped
}
[ServiceContract]
public interface IServiceWCF
{
[OperationContract]
State IsServiceActive();
[OperationContract(IsOneWay = true)]
void StartImosExportService();
[OperationContract(IsOneWay = true)]
void StopImosExportService();
[OperationContract]
string GetActiveThreads();
}
在主类中,我们现在需要接口的属性。
初始化后,我们将通过命名管道提供程序连接到服务 - 请参阅 8.3 作为参考。
public MainWindow()
{
InitializeComponent();
ChannelFactory<IServiceWCF> pipeFactory =
new ChannelFactory<IServiceWCF>(
new NetNamedPipeBinding(),
new EndpointAddress("net.pipe:///MyService/PipeForMyService"));
pipeProxy = pipeFactory.CreateChannel();
}
现在,我们可以简单地使用 pipeProxy
调用服务函数。
pipeProxy.StartImosExportService();
...
string threadInfo = pipeProxy.GetActiveThreads();
未来改进
我们有一个可能的管道任务,我们应该一个接一个地传输 1..n 个文件。每个文件将由外部系统导入,并且在发送文件时触发导入。但是必须完成一个导入,然后才能发送下一个文件。
因此,我考虑集成 WCF WebService
和一种新的线程类型,它能够在一步之后暂停,并等待被触发以继续。然后外部应用程序将调用 webservice
,线程将发送下一个文件。
历史
- 2013年10月16日 - 初次发布
- 2016年2月25日 -
AlwaysOn
扩展链接