轻松实现 ASP.NET 多线程





2.00/5 (2投票s)
一个迷你库,可以将您的常规方法作为多线程运行。
引言
虽然 BackgroundWorker
使开发多线程桌面应用程序变得非常容易,但开发人员在开发 ASP.NET 应用程序时实际上需要编写自己的代码来完成相同的任务。
这个过程包括将数据输入分成更小的部分、创建线程、将任务分配给线程和从先前创建的线程收集结果。
为了提供一种简单快捷的方法来转换耗时的操作,我创建了一个小库,以便能够以多线程方式使用您自己的方法。 整个想法是编写一个执行任务的方法,并将其传递给库,以多线程方式而不是单线程方式运行它。
背景
该项目主要基于我之前的两个项目的组合。 其中一个是创建类似于 DotLiquid 的模板引擎,以提供可定制的报告输出。 另一个基本上是使用基于 Gold Parser 的动态公式来加速 3600 多名员工的工资计算。
我使用了多线程工资计算引擎,并结合了模板引擎中的代码对其进行了泛化。
使用代码
让我们从一个设计为单线程运行的代码开始。
private IList<Employee> m_employees
{
get
{
return Session["Employees"] as IList<Employee>;
}
set
{
Session["Employees"] = value;
}
}
protected void Page_Load(object sender, EventArgs e)
{
if (!IsPostBack)
{
IList<Employee> eList = DAL.SelectHQL<Employee>("select p from Personel p", 5000);
m_employees = eList;
}
}
protected void btRunSingleThreaded_Click(object sender, EventArgs e)
{
try
{
object[] results = new object[m_employees.Count];
int counter = 0;
foreach (Employee emp in m_employees)
{
results[counter] = GetLastPosition(emp);
counter++;
}
}
catch (Exception ex)
{
throw ex;
}
}
private Position GetLastPosition(Employee emp)
{
try
{
string hql = @"
from
Position as pos
inner join fetch pos.Sort as srt
where
pos.Employee.ID = ?
order by
srt.No asc";
Position lastPosition = DAL.SelectHQL<Position>(hql, 0, emp.ID).LastOrDefault();
return lastPosition;
}
catch (Exception ex)
{
throw ex;
}
}
其中 DAL
代表 数据访问层
,这是我们公司框架的一个类。
要将之前的 GetLastPosition()
方法转换为以多线程方式工作,我们只需使用 QuickParallelization
类。
protected void btRunMultiThreaded_Click(object sender, EventArgs e)
{
try
{
ParallelOperationsLibrary lib = new ParallelOperationsLibrary();
object[] lastPositons = lib.RunParallel(m_employees, 50, new Func<Employee, Position>(this.GetLastPosition));
}
catch (Exception ex)
{
throw ex;
}
}
为了更好地理解整个过程的工作原理,让我们检查一下该库本身。
public class ParallelOperationsLibrary
{
private static ManualResetEvent[] ThreadEvents; // Array to collect the notifications for one or more waiting threads
private Delegate ProcMethod; // Delegate that refers to the method which run with every single process
private object[] ProcArgs; // Array to pass required arguments the thread block
private List<object> ProcList; // Collection of data as input
object[] resultSet; // Array of data as output
public ParallelOperationsLibrary()
{}
}
主要入口点是 RunParallel
方法本身,它将整个输入分成更小的块进行处理,创建线程,运行线程并收集结果。 此外,创建的最大线程数限制为 64,因为这是允许的最大数量,超过此值将导致抛出异常。
public object[] RunParallel(
object list,
int threadCount,
Delegate method,
params object[] args)
{
try
{
if(threadCount > 64)
{
threadCount = 64;
}
List<object> oList = new List<object>();
if (list is IEnumerable)
{
var oq = ((IEnumerable)list).Cast<object>().ToList();
oList = oq.ToList();
}
resultSet = new object[oList.Count];
ProcMethod = method;
ProcArgs = args;
ProcList = oList;
ThreadEvents = new ManualResetEvent[threadCount];
int startIndex, endIndex = 0;
int partCount = (int)Math.Ceiling((decimal)oList.Count / threadCount);
for (int i = 0; i < threadCount; i++)
{
startIndex = partCount * i;
endIndex += partCount;
if (endIndex > oList.Count)
{
endIndex = oList.Count;
}
ThreadEvents[i] = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(new WaitCallback(this.CreateThreads),
new object[] { i, startIndex, endIndex });
}
WaitHandle.WaitAll(ThreadEvents);
return resultSet;
}
catch (Exception ex)
{
throw new Exception("An error occured while running " +
"the parallel processing library.", ex);
}
}
在将数据输入集分成更小的部分并创建线程后,我们只需处理每个数据块并使用 DynamicInvoke()
方法调用执行实际处理的原始方法。此过程发生在 CreateThreads()
方法中。
private void CreateThreads(object state)
{
try
{
object[] _state = state as object[];
int index = (int)_state[0];
int start = (int)_state[1];
int end = (int)_state[2];
for (int i = start; i < end; i++)
{
try
{
object[] newArgs = new object[ProcArgs.Length + 1];
newArgs[0] = (object)ProcList[i];
for (int j = 0; j < ProcArgs.Length; j++)
{
newArgs[j + 1] = ProcArgs[j];
}
resultSet[i] = ProcMethod.DynamicInvoke(newArgs);
}
catch (Exception ex)
{
resultSet[i] = ex;
}
}
ThreadEvents[index].Set();
}
catch (Exception ex)
{
throw new Exception("An error occured while processing threads.", ex);
}
}
结论
为了测试库的性能,我运行了几个测试,这些测试在输入数据集长度和线程数量上有所不同。 结果证明,多处理机制有效,并且随着处理的数据量增加,可以大大提高性能。
可以通过下表轻松监控提供的性能提升
员工人数 | 单线程 (秒) | 多线程 (秒) | 性能提升 (%) | |
10 个线程 | 50 个线程 | |||
1000 | 6.391 | 5.216 | 6.039 | 12.86 |
2000 | 22.419 | 14.604 | 13.376 | 40.33 |
5000 | 205.272 | 78.415 | 56.814 | 72.32 |