65.9K
CodeProject 正在变化。 阅读更多。
Home

在 .NET 3.0 中并行任务

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.89/5 (9投票s)

2010年5月6日

CPOL

4分钟阅读

viewsIcon

34780

downloadIcon

483

提供一种机制,可以在多线程上并行执行任务列表,并将异常、超时和任务成功完成等有用状态传回调用线程。

引言

我目前的项目是构建一个实时分布式报价引擎。在线客户将输入他们的详细信息并提交一个请求给报价引擎。引擎会创建一个报价提供商列表,通过 Web 与各种后端系统进行 B2B 通信。每个提供商都有自定义实现,包含特定于提供商的映射和传输要求。

报价引擎的一个要求是处理过程不能超过 15 秒,因为基于 Web 的客户如果等待太久就会离开。

从每个提供商那里检索报价所需的时间估计为 3-7 秒,因此顺序处理是不可能的。我们还意识到,如果系统等待所有报价返回后再将结果返回给消费者,那么仅仅一个提供商的失败就会影响整体性能,例如,如果任何目标提供商系统宕机或运行缓慢。

解决问题

代码的核心是标准的 ThreadPool.QueueUserWorkItem 来运行我的提供商,以及 WaitHandle.WaitAll(waitHandles, timeOut),它等待所有提供商完成或在设定的时间内退出。

我添加的主要内容是一个任务包装器 ThreadTaskInfo,它跟踪正在运行的提供商的状态,包括任何异常,以及一个静态管理器类 ThreadUtil,用于在任何类的列表上并行运行一个动作委托,并通过关联的 ThreadTaskInfo 来管理每个动作的状态。

ThreadUtil 类旨在处理您希望并行运行的任何类。无需继承基类或实现接口。您只需要一个具有通用 Action 委托(命令模式)的类列表,并将此类列表及其 Action 委托提供给 ThreadUtil.RunAsynchronously<T> 方法。

本文提供的解决方案类似于 .NET 4 的 TPL 任务并行库。当我编写代码时,我并不知道并行扩展库,后来发现 .NET 3.5 的并行扩展链接似乎都已损坏,并且目前我们没有迁移到 .NET 4 的计划。

主代码

ThreadTaskInfo 包装您的执行类,并表示一个单一的任务,该任务存储线程的当前执行状态。它还会存储您的类抛出的任何异常,或代码未及时完成时的超时信息。

public class ThreadTaskInfo<t>
{
    private readonly object _synchLock = new object();

    /// <summary>
    /// Initializes a new instance of the ThreadTaskInfo
    // see cref="ThreadTaskInfo<T>"/> class.
    /// </summary>
    /// <param name="task">The task.</param>
    public ThreadTaskInfo(T task)
    {
        Task = task;

        // Task is created, but has not started running yet
        State = ThreadTaskStateType.Created;

        // Task is not set to completed
        TaskComplete = new ManualResetEvent(false);
    }

    /// <summary>
    /// Gets or sets the task that is running on the thread
    /// </summary>
    /// <value>The task.</value>
    public T Task { get; private set; }

    /// <summary>
    /// Gets or sets the task complete signal (Wait Handle).
    /// </summary>
    /// <value>The task complete.</value>
    public ManualResetEvent TaskComplete { get; private set; }

    /// <summary>
    /// Gets or sets an exception if it is thrown by the task.
    /// </summary>
    /// <value>The exception.</value>
    public Exception Exception { get; set; }

    /// <summary>
    /// Gets or sets the state of currently running threaded task.
    /// </summary>
    /// <value>The state.</value>
    public ThreadTaskStateType State { get; private set; }

    /// <summary>
    /// Set the state of the task.
    /// </summary>
    /// The state.
    public void SetState(ThreadTaskStateType state)
    {
        // Lock the writing of the State property
        lock (_synchLock)
        {
            switch (state)
            {
                case ThreadTaskStateType.Created:
                    throw new Exception("State cannot be set to created");

                case ThreadTaskStateType.Started:
                    if (State != ThreadTaskStateType.Created)
                    {
                        return;
                    }
                    State = state;
                    return;

                case ThreadTaskStateType.Completed:
                    if (State != ThreadTaskStateType.Started)
                    {
                        return;
                    }
                    State = state;
                    return;

                case ThreadTaskStateType.Failed:
                    if (State == ThreadTaskStateType.Started || State == 
                                 ThreadTaskStateType.Completed)
                    {
                        State = state;
                    }
                    return;

                case ThreadTaskStateType.FailedTimeout:
                    if (State == ThreadTaskStateType.Started || State == 
                                 ThreadTaskStateType.Completed)
                    {
                        State = state;
                    }
                    return;
            }
        }
    }
}

ThreadTaskStateType 枚举表示每个运行任务的状态。

/// <summary>
/// State of the currently running threaded task
/// </summary>
public enum ThreadTaskStateType
{
    /// <summary>
    /// Created state, not yet put onto the thread pool
    /// </summary>
    Created,

    /// <summary>
    /// Thread is started, the task is ready to execute
    /// </summary>
    Started,

    /// <summary>
    /// Thread is complete, the task has finsihed executing
    /// </summary>
    Completed,

    /// <summary>
    /// Thread failed to complete
    /// </summary>
    Failed,

    /// <summary>
    /// Thread failed to complete in specified time
    /// </summary>
    FailedTimeout,
}

ThreadUtil 提供一个静态方法 RunAsynchronously<T>,它将并行地对任务列表运行您的 Action 委托。此方法将处理状态管理、异常和超时,并将信息记录在每个 ThreadTaskInfo 对象中。

当所有任务都完成或达到超时时,将返回一个 ThreadTaskInfo 列表,您可以检查该列表以查看哪些任务成功且在规定时间内运行。

public static class ThreadUtil
{
    /// <summary>
    /// Execute a list of tasks asynchronously.
    /// </summary>
    /// <typeparam name="T">
    /// <param name="tasks">The tasks to execute.</param>
    /// <param name="action">The action on the object to run.</param>
    /// <param name="timeOut">Timeout in millisecond.</param>
    /// <returns>List of TaskInfo storing information about the execution
    /// and a reference to the tasks themselves</returns>
    public static List<ThreadTaskInfo<t>> 
           RunAsynchronously<t>(IList<t> tasks, 
                             Action<t> action, int timeOut)
    {
        var result = new List<ThreadTaskInfo<t>>();

        if (tasks == null || tasks.Count == 0 || action == null)
        {
            return result;
        }

        // Create a list of ThreadTaskInfo wrappers for each task to run
        foreach (var taskInfo in tasks.Select(task => new ThreadTaskInfo<t>(task)))
        {
            result.Add(taskInfo);

            var info = taskInfo;

            // Wrap the execution method in a delegate with a thread complete
            // signal to inform the main thread that the worker thread is complete
            ThreadPool.QueueUserWorkItem(state =>
            {
                try
                {
                    info.SetState(ThreadTaskStateType.Started);

                    // Execute task
                    action.Invoke(info.Task);

                    info.SetState(ThreadTaskStateType.Completed);

                    // Signal the main thread, that this execution thread is complete
                    info.TaskComplete.Set();
                }
                catch (Exception ex)
                {
                    info.SetState(ThreadTaskStateType.Failed);
                    info.Exception = ex;
                    info.TaskComplete.Set();
                }
            });
        }

        // waitHandles is used to signal when a thread is complete
        // Grab all wait handles asigned to the task list
        var waitHandles = result.Select(taskInfo => taskInfo.TaskComplete).ToArray();

        WaitHandle.WaitAll(waitHandles, timeOut);

        // If any tasks are still running then mark them with Timeout Failure status
        foreach (var taskInfo in result.Where(taskInfo => 
                 taskInfo.State == ThreadTaskStateType.Started))
        {
            taskInfo.SetState(ThreadTaskStateType.FailedTimeout);
        }

        return result;
    }
}

测试代码

测试类演示了各种示例,并让管理器经历不同的场景。

  • BaseTestTask - ThreadUtil.RunAsynchronously<T>(List<T> tasks, Action<T> action, int timeOut) 方法要求所有任务都为 T 类型,因此在本示例中,每个不同的任务都将从 BaseTestTask 派生并实现 Execute 方法。
  • PrintTask - 向控制台打印一些文本。
  • GoogleTask - 对 Google 运行搜索。
  • LongRunTask - 使用 Thread.Sleep 暂停一段时间。
  • ExceptionTask - 抛出异常。

单元测试演示了以下场景

  • 成功 - 并行成功运行十个独立的任务。
  • 超时 - 运行三个独立的任务,但其中一个任务耗时过长。
  • 异常 - 运行三个独立的任务,但其中一个会抛出异常。

示例任务

public abstract class BaseTestTask
{
    protected BaseTestTask(string taskName)
    {
        TaskName = taskName;
    }

    public string TaskName { get; set; }

    public abstract void Execute();
}

public class PrintTask : BaseTestTask
{
    public PrintTask(string taskName) : base(taskName) { }

    public override void Execute()
    {
        _.P("Print Task: {0}", TaskName);
    }
}

public class GoogleTask : BaseTestTask
{
    public GoogleTask(string taskName, string query) : base(taskName)
    {
        Query = query;
    }

    public string Query { get; set; }

    public override void Execute()
    {
        var query = 
            "http://www.google.com/search?q={0}".FormatWith(
            HttpUtility.UrlEncode(Query));

        _.P("Google Task: {0} : {1}", TaskName, query);

        Content = (new WebClient()).DownloadString(query);
    }

    public string Content { get; set; }
}

public class LongRunTask : BaseTestTask
{
    public LongRunTask(string taskName, int pauseFor) : base(taskName)
    {
        _.P("Long Task: {0}", TaskName);

        PauseFor = pauseFor;
    }

    public int PauseFor { get; set; }

    public override void Execute()
    {
        _.P("Long Task: {0} - Timeout: {1}", TaskName, PauseFor);

        Thread.Sleep(PauseFor);
    }
}

public class ExceptionTask : BaseTestTask
{
    public ExceptionTask(string taskName, 
           string exceptionMessage) : base(taskName)
    {
        ExceptionMessage = exceptionMessage;
    }

    public string ExceptionMessage { get; set; }

    public override void Execute()
    {
        _.P("Exception Task: {0}", TaskName);

        throw new Exception(ExceptionMessage);
    }
}

单元测试

// *********************************************************************************
// Test Cases
// *********************************************************************************
[Test]
public void RunParallelTasksSuccess()
{
    var tasks = new List<BaseTestTask>
                   {
                       new PrintTask("David"),
                       new PrintTask("Cruwys"),
                       new GoogleTask("Query 1", "www.davidcruwys.com"),
                       new GoogleTask("Query 2", "www.infochoice.com"),
                       new GoogleTask("Query 3", "www.my-trading-journal.com"),
                       new GoogleTask("Query 4", "www.tradingtothemax.com"),
                       new GoogleTask("Query 5", "Cheap travel"),
                       new GoogleTask("Query 6", "Home loans"),
                       new GoogleTask("Query 7", "Credit cards"),
                       new LongRunTask("But within timeout", 8000),
                   };

    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 10000);

    _.L("Print Info");
    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(10));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
    Assert.That(taskInfos[3].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 4");
    Assert.That(taskInfos[4].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 5");
    Assert.That(taskInfos[5].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 6");
    Assert.That(taskInfos[6].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 7");
    Assert.That(taskInfos[7].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 8");
    Assert.That(taskInfos[8].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 9");
    Assert.That(taskInfos[9].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 10");
}

[Test]
public void RunParallelTasksTimeOut()
{
    var tasks = new List<BaseTestTask>
                   {
                       new LongRunTask("Too Long", 3500),
                       new PrintTask("David"),
                       new PrintTask("Cruwys"),
                   };

    _.L("Print Info");
    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 3000);

    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(3));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.FailedTimeout), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}

[Test]
public void RunParallelTasksException()
{
    var tasks = new List<BaseTestTask>
                   {
                       new PrintTask("David"),
                       new ExceptionTask("Bad Task", "Failure in task"),
                       new PrintTask("Cruwys"),
                   };

    _.L("Print Info");
    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 1000);

    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(3));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Failed), "Task 2");
    Assert.That(taskInfos[1].Exception, Is.Not.Null, "Task 2");
    Assert.That(taskInfos[1].Exception.Message, Is.EqualTo("Failure in task"), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}

业务测试用例

以下代码未包含在下载文件中。它只是我们报价引擎的一个摘录,测试了运行 15 个具有不同请求数据结构的独立客户请求到报价引擎,该引擎又调用 6 个报价提供商来完成请求。

此处仅将其作为并行运行任务的另一个示例。

[Test]
[QuoteRequestCleanup(CleanupAction)]
public void BuildProductQuotes99_SendMultipleMessages()
{
    var tasks = new List<messagerunner>
        {
            new MessageRunner("01_Success.xml"),
            new MessageRunner("02_Success_CarModified.xml"),
            new MessageRunner("03_Success_CarDamaged.xml"),
            new MessageRunner("10_RemoteServer_TimeoutA.xml"),
            new MessageRunner("10_RemoteServer_TimeoutB.xml"),
            new MessageRunner("11_RemoteServer_NullMessage.xml"),
            new MessageRunner("12_RemoteServer_InvalidResultStructure.xml"),
            new MessageRunner("13_RemoteServer_UnknownProducts.xml"),
            new MessageRunner("14_RemoteServer_ZeroProducts.xml"),
            new MessageRunner("15_RemoteServer_ProductKnockout.xml"),
            new MessageRunner("20_QuoteProvider_BuildMessageException.xml"),
            new MessageRunner("21_QuoteProvider_SendMessageException.xml"),
            new MessageRunner("22_QuoteProvider_SendMessageTimeoutException.xml"),
            new MessageRunner("23_QuoteProvider_SendMessageAuthorizationException.xml"),
            new MessageRunner("24_QuoteProvider_ProcessMessageException.xml")
        };

    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 20000);

    _.L("Print Info");
    taskInfos.ForEach(t => t.Task.Print());
}

public class MessageRunner
{
    public MessageRunner(string dataFileName)
    {
        DataFileName = dataFileName;

        QuoteRequestController = Ioc.GetInstance<iquoterequestcontroller>();
        SecurityController = Ioc.GetInstance<isecuritycontroller>();
        SampleUser = SecurityController.AuthenticateUser(
                      "TestPartnerName", "TestPassword");
    }

    private IQuoteRequestController QuoteRequestController { get; set; }
    private ISecurityController SecurityController { get; set; }
    private IUser SampleUser { get; set; }

    private string DataFileName { get; set; }

    public CarInsuranceQuoteRequestResultDto Result { get; set; }

    public void Execute()
    {
        var quoteRequest = 
          T.TestData.QuoteRequestMessage.GetQuoteRequest(DataFileName);

        Result = QuoteRequestController.QuoteRequest(SampleUser, quoteRequest);
    }

    public void Print()
    {
        _.SerializeToString(Result);
    }
}

历史

  • 2010 年 5 月 5 日 - 首次发布。
  • 2010 年 5 月 6 日 - 根据评论更新。
© . All rights reserved.