在 .NET 3.0 中并行任务






4.89/5 (9投票s)
提供一种机制,可以在多线程上并行执行任务列表,并将异常、超时和任务成功完成等有用状态传回调用线程。
引言
我目前的项目是构建一个实时分布式报价引擎。在线客户将输入他们的详细信息并提交一个请求给报价引擎。引擎会创建一个报价提供商列表,通过 Web 与各种后端系统进行 B2B 通信。每个提供商都有自定义实现,包含特定于提供商的映射和传输要求。
报价引擎的一个要求是处理过程不能超过 15 秒,因为基于 Web 的客户如果等待太久就会离开。
从每个提供商那里检索报价所需的时间估计为 3-7 秒,因此顺序处理是不可能的。我们还意识到,如果系统等待所有报价返回后再将结果返回给消费者,那么仅仅一个提供商的失败就会影响整体性能,例如,如果任何目标提供商系统宕机或运行缓慢。
解决问题
代码的核心是标准的 ThreadPool.QueueUserWorkItem
来运行我的提供商,以及 WaitHandle.WaitAll(waitHandles, timeOut)
,它等待所有提供商完成或在设定的时间内退出。
我添加的主要内容是一个任务包装器 ThreadTaskInfo
,它跟踪正在运行的提供商的状态,包括任何异常,以及一个静态管理器类 ThreadUtil
,用于在任何类的列表上并行运行一个动作委托,并通过关联的 ThreadTaskInfo
来管理每个动作的状态。
ThreadUtil
类旨在处理您希望并行运行的任何类。无需继承基类或实现接口。您只需要一个具有通用 Action 委托(命令模式)的类列表,并将此类列表及其 Action 委托提供给 ThreadUtil.RunAsynchronously<T>
方法。
本文提供的解决方案类似于 .NET 4 的 TPL 任务并行库。当我编写代码时,我并不知道并行扩展库,后来发现 .NET 3.5 的并行扩展链接似乎都已损坏,并且目前我们没有迁移到 .NET 4 的计划。
主代码
ThreadTaskInfo
包装您的执行类,并表示一个单一的任务,该任务存储线程的当前执行状态。它还会存储您的类抛出的任何异常,或代码未及时完成时的超时信息。
public class ThreadTaskInfo<t>
{
private readonly object _synchLock = new object();
/// <summary>
/// Initializes a new instance of the ThreadTaskInfo
// see cref="ThreadTaskInfo<T>"/> class.
/// </summary>
/// <param name="task">The task.</param>
public ThreadTaskInfo(T task)
{
Task = task;
// Task is created, but has not started running yet
State = ThreadTaskStateType.Created;
// Task is not set to completed
TaskComplete = new ManualResetEvent(false);
}
/// <summary>
/// Gets or sets the task that is running on the thread
/// </summary>
/// <value>The task.</value>
public T Task { get; private set; }
/// <summary>
/// Gets or sets the task complete signal (Wait Handle).
/// </summary>
/// <value>The task complete.</value>
public ManualResetEvent TaskComplete { get; private set; }
/// <summary>
/// Gets or sets an exception if it is thrown by the task.
/// </summary>
/// <value>The exception.</value>
public Exception Exception { get; set; }
/// <summary>
/// Gets or sets the state of currently running threaded task.
/// </summary>
/// <value>The state.</value>
public ThreadTaskStateType State { get; private set; }
/// <summary>
/// Set the state of the task.
/// </summary>
/// The state.
public void SetState(ThreadTaskStateType state)
{
// Lock the writing of the State property
lock (_synchLock)
{
switch (state)
{
case ThreadTaskStateType.Created:
throw new Exception("State cannot be set to created");
case ThreadTaskStateType.Started:
if (State != ThreadTaskStateType.Created)
{
return;
}
State = state;
return;
case ThreadTaskStateType.Completed:
if (State != ThreadTaskStateType.Started)
{
return;
}
State = state;
return;
case ThreadTaskStateType.Failed:
if (State == ThreadTaskStateType.Started || State ==
ThreadTaskStateType.Completed)
{
State = state;
}
return;
case ThreadTaskStateType.FailedTimeout:
if (State == ThreadTaskStateType.Started || State ==
ThreadTaskStateType.Completed)
{
State = state;
}
return;
}
}
}
}
ThreadTaskStateType
枚举表示每个运行任务的状态。
/// <summary>
/// State of the currently running threaded task
/// </summary>
public enum ThreadTaskStateType
{
/// <summary>
/// Created state, not yet put onto the thread pool
/// </summary>
Created,
/// <summary>
/// Thread is started, the task is ready to execute
/// </summary>
Started,
/// <summary>
/// Thread is complete, the task has finsihed executing
/// </summary>
Completed,
/// <summary>
/// Thread failed to complete
/// </summary>
Failed,
/// <summary>
/// Thread failed to complete in specified time
/// </summary>
FailedTimeout,
}
ThreadUtil
提供一个静态方法 RunAsynchronously<T>
,它将并行地对任务列表运行您的 Action 委托。此方法将处理状态管理、异常和超时,并将信息记录在每个 ThreadTaskInfo
对象中。
当所有任务都完成或达到超时时,将返回一个 ThreadTaskInfo
列表,您可以检查该列表以查看哪些任务成功且在规定时间内运行。
public static class ThreadUtil
{
/// <summary>
/// Execute a list of tasks asynchronously.
/// </summary>
/// <typeparam name="T">
/// <param name="tasks">The tasks to execute.</param>
/// <param name="action">The action on the object to run.</param>
/// <param name="timeOut">Timeout in millisecond.</param>
/// <returns>List of TaskInfo storing information about the execution
/// and a reference to the tasks themselves</returns>
public static List<ThreadTaskInfo<t>>
RunAsynchronously<t>(IList<t> tasks,
Action<t> action, int timeOut)
{
var result = new List<ThreadTaskInfo<t>>();
if (tasks == null || tasks.Count == 0 || action == null)
{
return result;
}
// Create a list of ThreadTaskInfo wrappers for each task to run
foreach (var taskInfo in tasks.Select(task => new ThreadTaskInfo<t>(task)))
{
result.Add(taskInfo);
var info = taskInfo;
// Wrap the execution method in a delegate with a thread complete
// signal to inform the main thread that the worker thread is complete
ThreadPool.QueueUserWorkItem(state =>
{
try
{
info.SetState(ThreadTaskStateType.Started);
// Execute task
action.Invoke(info.Task);
info.SetState(ThreadTaskStateType.Completed);
// Signal the main thread, that this execution thread is complete
info.TaskComplete.Set();
}
catch (Exception ex)
{
info.SetState(ThreadTaskStateType.Failed);
info.Exception = ex;
info.TaskComplete.Set();
}
});
}
// waitHandles is used to signal when a thread is complete
// Grab all wait handles asigned to the task list
var waitHandles = result.Select(taskInfo => taskInfo.TaskComplete).ToArray();
WaitHandle.WaitAll(waitHandles, timeOut);
// If any tasks are still running then mark them with Timeout Failure status
foreach (var taskInfo in result.Where(taskInfo =>
taskInfo.State == ThreadTaskStateType.Started))
{
taskInfo.SetState(ThreadTaskStateType.FailedTimeout);
}
return result;
}
}
测试代码
测试类演示了各种示例,并让管理器经历不同的场景。
BaseTestTask
-ThreadUtil.RunAsynchronously<T>(List<T> tasks, Action<T> action, int timeOut)
方法要求所有任务都为T
类型,因此在本示例中,每个不同的任务都将从BaseTestTask
派生并实现Execute
方法。PrintTask
- 向控制台打印一些文本。GoogleTask
- 对 Google 运行搜索。LongRunTask
- 使用Thread.Sleep
暂停一段时间。ExceptionTask
- 抛出异常。
单元测试演示了以下场景
- 成功 - 并行成功运行十个独立的任务。
- 超时 - 运行三个独立的任务,但其中一个任务耗时过长。
- 异常 - 运行三个独立的任务,但其中一个会抛出异常。
示例任务
public abstract class BaseTestTask
{
protected BaseTestTask(string taskName)
{
TaskName = taskName;
}
public string TaskName { get; set; }
public abstract void Execute();
}
public class PrintTask : BaseTestTask
{
public PrintTask(string taskName) : base(taskName) { }
public override void Execute()
{
_.P("Print Task: {0}", TaskName);
}
}
public class GoogleTask : BaseTestTask
{
public GoogleTask(string taskName, string query) : base(taskName)
{
Query = query;
}
public string Query { get; set; }
public override void Execute()
{
var query =
"http://www.google.com/search?q={0}".FormatWith(
HttpUtility.UrlEncode(Query));
_.P("Google Task: {0} : {1}", TaskName, query);
Content = (new WebClient()).DownloadString(query);
}
public string Content { get; set; }
}
public class LongRunTask : BaseTestTask
{
public LongRunTask(string taskName, int pauseFor) : base(taskName)
{
_.P("Long Task: {0}", TaskName);
PauseFor = pauseFor;
}
public int PauseFor { get; set; }
public override void Execute()
{
_.P("Long Task: {0} - Timeout: {1}", TaskName, PauseFor);
Thread.Sleep(PauseFor);
}
}
public class ExceptionTask : BaseTestTask
{
public ExceptionTask(string taskName,
string exceptionMessage) : base(taskName)
{
ExceptionMessage = exceptionMessage;
}
public string ExceptionMessage { get; set; }
public override void Execute()
{
_.P("Exception Task: {0}", TaskName);
throw new Exception(ExceptionMessage);
}
}
单元测试
// *********************************************************************************
// Test Cases
// *********************************************************************************
[Test]
public void RunParallelTasksSuccess()
{
var tasks = new List<BaseTestTask>
{
new PrintTask("David"),
new PrintTask("Cruwys"),
new GoogleTask("Query 1", "www.davidcruwys.com"),
new GoogleTask("Query 2", "www.infochoice.com"),
new GoogleTask("Query 3", "www.my-trading-journal.com"),
new GoogleTask("Query 4", "www.tradingtothemax.com"),
new GoogleTask("Query 5", "Cheap travel"),
new GoogleTask("Query 6", "Home loans"),
new GoogleTask("Query 7", "Credit cards"),
new LongRunTask("But within timeout", 8000),
};
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 10000);
_.L("Print Info");
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(10));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
Assert.That(taskInfos[3].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 4");
Assert.That(taskInfos[4].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 5");
Assert.That(taskInfos[5].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 6");
Assert.That(taskInfos[6].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 7");
Assert.That(taskInfos[7].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 8");
Assert.That(taskInfos[8].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 9");
Assert.That(taskInfos[9].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 10");
}
[Test]
public void RunParallelTasksTimeOut()
{
var tasks = new List<BaseTestTask>
{
new LongRunTask("Too Long", 3500),
new PrintTask("David"),
new PrintTask("Cruwys"),
};
_.L("Print Info");
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 3000);
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(3));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.FailedTimeout), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}
[Test]
public void RunParallelTasksException()
{
var tasks = new List<BaseTestTask>
{
new PrintTask("David"),
new ExceptionTask("Bad Task", "Failure in task"),
new PrintTask("Cruwys"),
};
_.L("Print Info");
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 1000);
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(3));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Failed), "Task 2");
Assert.That(taskInfos[1].Exception, Is.Not.Null, "Task 2");
Assert.That(taskInfos[1].Exception.Message, Is.EqualTo("Failure in task"), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}
业务测试用例
以下代码未包含在下载文件中。它只是我们报价引擎的一个摘录,测试了运行 15 个具有不同请求数据结构的独立客户请求到报价引擎,该引擎又调用 6 个报价提供商来完成请求。
此处仅将其作为并行运行任务的另一个示例。
[Test]
[QuoteRequestCleanup(CleanupAction)]
public void BuildProductQuotes99_SendMultipleMessages()
{
var tasks = new List<messagerunner>
{
new MessageRunner("01_Success.xml"),
new MessageRunner("02_Success_CarModified.xml"),
new MessageRunner("03_Success_CarDamaged.xml"),
new MessageRunner("10_RemoteServer_TimeoutA.xml"),
new MessageRunner("10_RemoteServer_TimeoutB.xml"),
new MessageRunner("11_RemoteServer_NullMessage.xml"),
new MessageRunner("12_RemoteServer_InvalidResultStructure.xml"),
new MessageRunner("13_RemoteServer_UnknownProducts.xml"),
new MessageRunner("14_RemoteServer_ZeroProducts.xml"),
new MessageRunner("15_RemoteServer_ProductKnockout.xml"),
new MessageRunner("20_QuoteProvider_BuildMessageException.xml"),
new MessageRunner("21_QuoteProvider_SendMessageException.xml"),
new MessageRunner("22_QuoteProvider_SendMessageTimeoutException.xml"),
new MessageRunner("23_QuoteProvider_SendMessageAuthorizationException.xml"),
new MessageRunner("24_QuoteProvider_ProcessMessageException.xml")
};
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 20000);
_.L("Print Info");
taskInfos.ForEach(t => t.Task.Print());
}
public class MessageRunner
{
public MessageRunner(string dataFileName)
{
DataFileName = dataFileName;
QuoteRequestController = Ioc.GetInstance<iquoterequestcontroller>();
SecurityController = Ioc.GetInstance<isecuritycontroller>();
SampleUser = SecurityController.AuthenticateUser(
"TestPartnerName", "TestPassword");
}
private IQuoteRequestController QuoteRequestController { get; set; }
private ISecurityController SecurityController { get; set; }
private IUser SampleUser { get; set; }
private string DataFileName { get; set; }
public CarInsuranceQuoteRequestResultDto Result { get; set; }
public void Execute()
{
var quoteRequest =
T.TestData.QuoteRequestMessage.GetQuoteRequest(DataFileName);
Result = QuoteRequestController.QuoteRequest(SampleUser, quoteRequest);
}
public void Print()
{
_.SerializeToString(Result);
}
}
历史
- 2010 年 5 月 5 日 - 首次发布。
- 2010 年 5 月 6 日 - 根据评论更新。