65.9K
CodeProject 正在变化。 阅读更多。
Home

具有远程控制 GUI 的 Windows 服务多线程

starIconstarIconstarIconstarIconstarIcon

5.00/5 (25投票s)

2013年10月16日

CPOL

8分钟阅读

viewsIcon

83517

downloadIcon

5366

一个 Windows 服务, 可以处理不同线程中的不同操作, 并且可以通过小型 GUI 程序进行控制。

引言

我被要求创建一个 Windows 服务,该服务在独立的线程中处理不同的任务。这个案例的背景是,我们有几个 SQL-Server-Agents 中的作业(复杂的数据选择和传输),有时可能需要 8 小时以上。

有时 IIS 会终止一个作业,我们找不到合理的解释,但我们猜测是某种超时。

因此,解决方案是将这些任务放入 Windows 服务中,并让任务以无限超时运行。

这是本文的扩展链接,如果您希望您的服务在不同的服务器上 AlwaysOn 运行:AlwaysOn Windows 服务与远程控制

背景

我对这个服务感到满意,但困扰我的是它在后台运行,没有人知道发生了什么。

所以我想到了制作一个小型应用程序,它能显示服务的状态、当前任务,并且还能让我在安全关闭的同时存储任务的当前状态。

GUI program

1. 使用代码

源代码基于 Visual Studio 2012 和 .NET 4.5。对于 GUI,您还需要这个库:http://wpfanimatedgif.codeplex.com/

首先在 Visual Studio 中创建一个新的 Windows 服务项目(有关详细的演练,我发现这篇 MSDN 文章非常有帮助:http://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx)。

  1. 创建类型为“Windows 服务”的新项目。
  2. Service1.cs 重命名为 MyService.cs(包括其依赖对象)。
  3. 您可以选择添加一个 EventLog,我发现它非常有用。我编写了一个小型 staticHelper\EventLogger.cs,它允许我快速写入日志(有关详细描述,请参阅 MSDN 文章)。

事件日志记录是源代码的一部分,但本文未作描述。

多线程服务有三种不同类型的线程

  • 简单:只能执行到最后,没有并行简单线程
  • 复杂:可以在几个步骤之间停止,并在下次启动时继续,也没有并行运行
  • 多线程:可以同时启动 N 个任务

现在我们有了基础,我将描述服务的不同部分。

2. Program.cs

这个自动生成的类初始化 MyService

3. MyService.cs

MyService 添加一个接口类,并将其命名为 IMyService.cs

interface IMyService
{
    void StartService();
    void StopService();
}

现在,打开 MyService 的代码库,包含接口并实现其成员并创建属性

private bool finalExit { get; set; }
 
public void StartService()
{
    OnStart(null);
}
 
public void StopService()
{
    finalExit = false;
    OnStop();
} 

创建一个名为 WCF 的新文件夹,并添加一个类 WCFProvider.cs,同时将一个类 ServiceExecution.cs 添加到项目的根目录。现在,我们修改 MyService.cs 的初始化器并添加另一个属性。

GetInstance 尚未实现,但将在步骤 6 中完成。

private WCFProvider wcfProvider { get; set; }
 
public MyService()
{    
    try
    {
        InitializeComponent();
 
        finalExit = true;
 
        ServiceExecution.GetInstance().myService = this;
 
        wcfProvider = new WCFProvider();
    }
    catch (Exception)
    { }
}  

4. ThreadHolder.cs

创建一个名为 Threading 的新文件夹和一个类 ThreadHolder.cs

class ThreadHolder
{
    public Thread thread { get; set; }
    public MyThread myThread { get; set; }
    public bool resuming { get; set; }
 
    public ThreadHolder()
    {
        resuming = false;
    }
 
    public void Process()
    {
        myThread.Process(resuming);
    }
} 

resume 属性用于具有多个步骤的复杂线程,以便在服务之前停止后,在某个点恢复执行。

MyThread 将在步骤 7 中可用。

5. LockHolder.cs

为了在多线程类上获得安全的锁,我发现 Bill Wagner 的文章非常有帮助,并将其技术用于我的服务:更有效的 C#:第 13 条:首选 lock() 进行同步

我将“LockHolder.cs”放在 Helper 文件夹中。

6. ServiceExecution.cs

此类是处理所有作业的主线程,将它们放入独立的线程中,并在作业完成时移除特定线程。

6.1 枚举

State 表示 ServiceExecution 线程当前运行的状态。当状态为 Stopped 时,Windows 服务本身仍在运行,可以通过 GUI 程序重新启动。

ThreadType 用于识别不同的作业。

public enum State
{
    Running,
    Shutting_Down,
    Stopped
}
 
public enum ThreadType
{
    SimpleThread,
    ComplexThread,
    MultiThread
} 

6.2 此类应为线程安全的单例

此类从以下位置访问

  • 图形用户界面
  • 服务的主类 MyService.cs
  • 每个作业线程在完成后都会回调

因此我们需要确保只存在一个实例

private ServiceExecution() { }
private static ServiceExecution instance;
private static readonly object myInstanceLock = new object();
 
public static ServiceExecution GetInstance()
{
    // DoubleLock for thread safety
    if (instance == null)
    {
        lock (myInstanceLock)
        {
            if (instance == null)
            {
                instance = new ServiceExecution();
            }
        }
    }
    return instance;
} 

6.3 其他对象

接口从 MyService.cs 传递,通过 WCF 服务调用 StartStop 函数

public IMyService myService { get; set; }

每个作为线程执行的作业都存储在 ThreadHolder 中,以 GUID 作为唯一标识符,以便在关闭线程或检查活动线程时轻松访问线程。

private readonly Dictionary<Guid, ThreadHolder> runningThreads = new Dictionary<Guid, ThreadHolder>(); 

以及一些属性

private State currentState = State.Stopped;
 
private DateTime nextRunSimple = DateTime.Now;
private DateTime nextRunComplex = DateTime.Now;
private DateTime nextRunMulti = DateTime.Now;
 
private bool isProcessingSimple;
private bool isProcessingComplex; 

6.4 初始化

ServiceExecution 线程运行时,它会定期检查是否应该启动新的子线程。逻辑将在 2.5 中描述。

当它收到关闭命令时,它会等待所有当前线程完成。

在复杂线程的情况下,发送中断当前执行的命令。在我的项目中,此线程由四个存储过程调用组成。当在步骤 2 发送中断操作命令时,线程完成第二步并将信息写入数据库,以便在服务再次启动时返回到步骤 3。

public void StartServiceExecution()
{
    try
    {
        currentState = State.Running;
 
        while (currentState == State.Running)
        {
            CheckForSimpleRun();
 
            CheckForComplexRun();
 
            CheckForMultipeRun();
 
            Thread.Sleep(10000);
        }
 
        while (currentState == State.Shutting_Down)
        {
            using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
                new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
            {
                if (lockObj.LockSuccessful)
                {
                    foreach (ThreadHolder currentThread in runningThreads.Values)
                    {
                        if (currentThread.myThread.GetThreadType() == ThreadType.ComplexThread)
                        {
                            currentThread.myThread.BreakOperation();
                        }
                    }
 
                    if (runningThreads.Count == 0)
                    {
                        currentState = State.Stopped;
                    }
                }
            }
        }
    }
    catch (Exception)
    { }
}  

6.5 检查新线程

我在这里只添加了简单线程,因此其他线程是相似的(您可以通过源代码获得它们)。如果没有简单线程正在运行且延迟时间合适,则将新线程添加到池中。线程类本身将在步骤 7 中描述。

private void CheckForSimpleRun()
{
    if (isProcessingSimple == false && nextRunSimple <= DateTime.Now)
    {
        const ThreadType threadType = ThreadType.SimpleThread;
        ThreadHolder exportThread = new ThreadHolder();
        Guid guid = Guid.NewGuid();
        exportThread.myThread = new MyThreadSimple(this, guid, threadType);
 
        if (CreateWorkerThread(exportThread, threadType, guid))
        {
            isProcessingSimple = true;
        }
    }
} 

6.6 创建新线程

runningThreads 属性应始终以线程安全的方式访问。如果该属性可以在合理的时间(1000 毫秒)内被声明,则新线程将被添加并立即启动。

private bool CreateWorkerThread(ThreadHolder exportThread, ThreadType threadType, Guid guid)
{
    using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
        new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
    {
        if (lockObj.LockSuccessful)
        {
            Thread thread = new Thread(exportThread.Process) { Name = threadType.ToString() };
            exportThread.thread = thread;
 
            runningThreads.Add(guid, exportThread);
 
            thread.Start();
 
            return true;
        }
    }
 
    return false;
}

6.7 清理已完成的线程

每个子线程都继承 IServiceExecution 接口,并在完成时调用此函数。相关线程类型的属性将被重置,并且线程将从池中移除

public void ThreadFinished(Guid threadId)
{
    using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
        new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
    {
        if (lockObj.LockSuccessful)
        {
            if (runningThreads[threadId].myThread.GetThreadType() == ThreadType.SimpleThread)
            {
                nextRunSimple = DateTime.Now.AddSeconds(Properties.Settings.Default.trigger_simple_thread);
                isProcessingSimple = false;
            }
 
            runningThreads.Remove(threadId);
        }
    }
} 

6.8 从运行线程获取信息

此函数遍历所有活动线程并将信息连接成一个 string。我使用这种方式,因为它是一种通过 WCF 服务传递数据的简单方法。

public string GetCurrentThreadInfo()
{
    using (LockHolder<Dictionary<Guid, ThreadHolder>> lockObj =
        new LockHolder<Dictionary<Guid, ThreadHolder>>(runningThreads, 1000))
    {
        if (lockObj.LockSuccessful)
        {
            int counter = 0;
            string threadInfo = "";
 
            foreach (KeyValuePair<Guid, ThreadHolder> currentThread in runningThreads)
            {
                DateTime tmpTime = currentThread.Value.myThread.GetStartTime();
 
                if (counter == 0)
                {
                    threadInfo = String.Format("{0:dd.MM.yyyy HH:mm:ss}", tmpTime) + "§" +
                        currentThread.Value.myThread.GetThreadType().ToString();
                }
                else
                {
                    threadInfo = threadInfo + "#" + 
                        String.Format("{0:dd.MM.yyyy HH:mm:ss}", tmpTime) + "§" +
                        currentThread.Value.myThread.GetThreadType().ToString();
                }
 
                counter++;
            }
 
            return threadInfo;
        }
    }
 
    return "";
} 

6.9 停止执行

目前,服务执行停止的实现必须完成。

public void StopServiceExecution()
{
    currentState = State.Shutting_Down;
} 

6.10 IServiceExecution.cs

此接口用于向子线程和 WCFProvider 提供必要的函数。

interface IServiceExecution
{
    void StartServiceExecution();
 
    void StopServiceExecution();
 
    void ThreadFinished(Guid threadId);
} 

7 线程类

7.1 基类“MyThread.cs”

此类提供每个线程类需要实现的函数。线程 ID 是每个线程的唯一标识符,ServiceExecution 使用它来在线程完成时删除线程对象。

startTime 用于 GUI

protected IServiceExecution serviceExecution;
protected Guid threadId;
protected ThreadType threadType;
protected DateTime startTime;
 
public MyThread(IServiceExecution serviceExecution, Guid threadId, ThreadType threadType)
{
    this.serviceExecution = serviceExecution;
    this.threadId = threadId;
    this.threadType = threadType;
} 

process 函数调用处理例程。根据您的线程类型,可以在继承自此基类的派生类中重写相应的函数。这些函数是 protected,因此其他类无需访问它们。

public void Process(bool resumeProcessing = false)
{
    startTime = DateTime.Now;
 
    try
    {
        if (resumeProcessing)
        {
            ResumeProcessingData();
        }
        else
        {
            ProcessingData();
        }
    }
    catch (Exception e)
    {
        //...
    }
    finally
    {
        serviceExecution.ThreadFinished(threadId);
    }
}

BreakOperation 用于复杂线程以停止操作并在稍后继续运行。

public virtual void BreakOperation() { } 

最后,该类包含用于线程类型和开始时间的 getter 函数。

7.2 MyThreadComplex.cs

在示例源代码中,我只包含了复杂线程的简短实现以展示中断操作,其他两个线程类只实现了基类而没有重写任何内容。

在此示例中,处理函数只是调用 worker 函数。

当调用 BreakOperation 时,worker 函数的当前步骤将完成,然后该函数将停止。在实际实现中,我将当前步骤存储在数据库中,当服务再次启动时,此保存的步骤将通过属性 resumeStep 传递。

private readonly int resumeStep;
private bool breakOperation;
 
public MyThreadComplex(IServiceExecution serviceExecution, 
       Guid threadId, ThreadType threadType, int resumeStep = 0)
       : base(serviceExecution, threadId, threadType)
{
    this.resumeStep = resumeStep;
}
 
public override void BreakOperation()
{
    breakOperation = true;
}
 
private void Worker()
{
    for (int i = resumeStep; i < 5; i++)
    {
        Thread.Sleep(7500);
 
        if (breakOperation)
        {
            return;
        }
    }
} 

8 WCF 服务

8.1 IServiceWCF.cs

该接口定义了 GUI 将用于与服务通信的函数。

对于 WCF 功能,我们需要添加对 System.ServiceModel 程序集的引用。

  • IsServiceActive 返回服务的状态
  • StartStop 使用 IsOneWay-Behaviour 实现,因为 GUI 不应等待响应,因为它是 void 调用
  • GetActiveThread 以连接的 string 形式返回所有运行线程的信息
using System.ServiceModel;
 
[ServiceContract]
public interface IServiceWCF
{
    [OperationContract]
    State IsServiceActive();
 
    [OperationContract(IsOneWay = true)]
    void StartImosExportService();
 
    [OperationContract(IsOneWay = true)]
    void StopImosExportService();
 
    [OperationContract]
    string GetActiveThreads();
} 

8.2 ServiceWCF.cs

此类包含接口类的实现

  • ConcurrencyMode 设置为 Multiple,这意味着可以同时发生多个调用。例如:可以在请求 ActiveThreads 的同时调用 StopService
  • SynchronizationContext 设置为 false,因为将访问不同的线程。
[ServiceBehavior(UseSynchronizationContext=false, ConcurrencyMode=ConcurrencyMode.Multiple)]
public class ServiceWCF : IServiceWCF
{
    public State IsServiceActive()
    {
        return ServiceExecution.GetInstance().CheckIfActive();
    }
 
    public void StartImosExportService()
    {
        ServiceExecution.GetInstance().myService.StartService();
    }
 
    public void StopImosExportService()
    {
        ServiceExecution.GetInstance().myService.StopService();
    }
 
    public string GetActiveThreads()
    {
        string threadInfo = ServiceExecution.GetInstance().GetCurrentThreadInfo();
        return threadInfo;
    }
} 

8.3 WCFProvider.cs

在此类中,WCF 服务被设置。由于我只在服务运行的同一服务器上使用 GUI,所以我使用了简单的命名管道端点,但您可以实现所有类型的 WCF 通信。

readonly ServiceHost serviceProvider;
 
public WCFProvider()
{
    serviceProvider = new ServiceHost(
        typeof(ServiceWCF), new[] { new Uri("net.pipe:///MyService/") });
 
    serviceProvider.AddServiceEndpoint(typeof(IServiceWCF),
        new NetNamedPipeBinding(), "PipeForMyService");
 
    serviceProvider.Open();
}
 
public void StopProvidingService()
{
    serviceProvider.Close();
}

9 GUI

现在,只需为 GUI 部分向解决方案添加第二个项目。我使用 WPF,但我不会过多地深入设计细节,因为只要您可以使用 WCF 服务,您就可以使用您喜欢的任何方式。

同样,我们需要引用 System.ServiceModel 程序集。

然后,我们可以将映射器添加到服务

  • enum 用于 service-state
  • WCF 函数的接口
public enum State
{
    Running,
    Shutting_Down,
    Stopped
}
 
[ServiceContract]
public interface IServiceWCF
{
    [OperationContract]
    State IsServiceActive();
 
    [OperationContract(IsOneWay = true)]
    void StartImosExportService();
 
    [OperationContract(IsOneWay = true)]
    void StopImosExportService();
 
    [OperationContract]
    string GetActiveThreads();
}

在主类中,我们现在需要接口的属性。

初始化后,我们将通过命名管道提供程序连接到服务 - 请参阅 8.3 作为参考。

public MainWindow()
{
    InitializeComponent();
 
    ChannelFactory<IServiceWCF> pipeFactory =
        new ChannelFactory<IServiceWCF>(
            new NetNamedPipeBinding(),
            new EndpointAddress("net.pipe:///MyService/PipeForMyService"));
 
    pipeProxy = pipeFactory.CreateChannel();
} 

现在,我们可以简单地使用 pipeProxy 调用服务函数。

pipeProxy.StartImosExportService();
 
...
 
string threadInfo = pipeProxy.GetActiveThreads();

未来改进

我们有一个可能的管道任务,我们应该一个接一个地传输 1..n 个文件。每个文件将由外部系统导入,并且在发送文件时触发导入。但是必须完成一个导入,然后才能发送下一个文件。

因此,我考虑集成 WCF WebService 和一种新的线程类型,它能够在一步之后暂停,并等待被触发以继续。然后外部应用程序将调用 webservice,线程将发送下一个文件。

历史

  • 2013年10月16日 - 初次发布
  • 2016年2月25日 - AlwaysOn 扩展链接
© . All rights reserved.