轻松实现 ASP.NET 多线程





2.00/5 (2投票s)
一个迷你库,可以将您的常规方法作为多线程运行。
引言
虽然 BackgroundWorker 使开发多线程桌面应用程序变得非常容易,但开发人员在开发 ASP.NET 应用程序时实际上需要编写自己的代码来完成相同的任务。 
这个过程包括将数据输入分成更小的部分、创建线程、将任务分配给线程和从先前创建的线程收集结果。
为了提供一种简单快捷的方法来转换耗时的操作,我创建了一个小库,以便能够以多线程方式使用您自己的方法。 整个想法是编写一个执行任务的方法,并将其传递给库,以多线程方式而不是单线程方式运行它。
背景
该项目主要基于我之前的两个项目的组合。 其中一个是创建类似于 DotLiquid 的模板引擎,以提供可定制的报告输出。 另一个基本上是使用基于 Gold Parser 的动态公式来加速 3600 多名员工的工资计算。
我使用了多线程工资计算引擎,并结合了模板引擎中的代码对其进行了泛化。
使用代码
让我们从一个设计为单线程运行的代码开始。
private IList<Employee> m_employees
{
    get
    {
        return Session["Employees"] as IList<Employee>;
    }
    set
    {
        Session["Employees"] = value;
    }
}
protected void Page_Load(object sender, EventArgs e)
{
    if (!IsPostBack)
    {
        IList<Employee> eList = DAL.SelectHQL<Employee>("select p from Personel p", 5000);
        m_employees = eList;
    }
}
protected void btRunSingleThreaded_Click(object sender, EventArgs e)
{
    try
    {
        object[] results = new object[m_employees.Count];
        int counter = 0;
        foreach (Employee emp in m_employees)
        {
            results[counter] = GetLastPosition(emp);
            counter++;
        }
    }
    catch (Exception ex)
    {
        throw ex;
    }
}
private Position GetLastPosition(Employee emp)
{
    try
    {
        string hql = @"
from
    Position as pos
    inner join fetch pos.Sort as srt
where
    pos.Employee.ID = ?
    order by
    srt.No asc";
        Position lastPosition = DAL.SelectHQL<Position>(hql, 0, emp.ID).LastOrDefault();
        return lastPosition;
    }
    catch (Exception ex)
    {
        throw ex;
    }
}
其中 DAL 代表 数据访问层,这是我们公司框架的一个类。
要将之前的 GetLastPosition() 方法转换为以多线程方式工作,我们只需使用 QuickParallelization 类。
protected void btRunMultiThreaded_Click(object sender, EventArgs e)
{
    try
    {
        ParallelOperationsLibrary lib = new ParallelOperationsLibrary();
        object[] lastPositons = lib.RunParallel(m_employees, 50, new Func<Employee, Position>(this.GetLastPosition));
    }
    catch (Exception ex)
    {
        throw ex;
    }
}
为了更好地理解整个过程的工作原理,让我们检查一下该库本身。
public class ParallelOperationsLibrary
{
private static ManualResetEvent[] ThreadEvents;  // Array to collect the notifications for one or more waiting threads
private Delegate ProcMethod;                     // Delegate that refers to the method which run with every single process
private object[] ProcArgs;                       // Array to pass required arguments the thread block
private List<object> ProcList;             // Collection of data as input
object[] resultSet;                              // Array of data as output
public ParallelOperationsLibrary()
{}
}
主要入口点是 RunParallel 方法本身,它将整个输入分成更小的块进行处理,创建线程,运行线程并收集结果。 此外,创建的最大线程数限制为 64,因为这是允许的最大数量,超过此值将导致抛出异常。 
public object[] RunParallel(
                            object list, 
                            int threadCount,
                            Delegate method,
                            params object[] args)
{
    try
    {
        if(threadCount > 64)
        {
            threadCount = 64;
        }
        List<object> oList = new List<object>();
        if (list is IEnumerable)
        {
            var oq = ((IEnumerable)list).Cast<object>().ToList();
            oList = oq.ToList();
        }
        resultSet = new object[oList.Count];
        ProcMethod = method;
        ProcArgs = args;
        ProcList = oList;
        ThreadEvents = new ManualResetEvent[threadCount];
        int startIndex, endIndex = 0;
        int partCount = (int)Math.Ceiling((decimal)oList.Count / threadCount);
        for (int i = 0; i < threadCount; i++)
        {
            startIndex = partCount * i;
            endIndex += partCount;
            if (endIndex > oList.Count)
            {
                endIndex = oList.Count;
            }
            ThreadEvents[i] = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(new WaitCallback(this.CreateThreads),
                       new object[] { i, startIndex, endIndex });
        }
        WaitHandle.WaitAll(ThreadEvents);
        return resultSet;
    }
    catch (Exception ex)
    {
        throw new Exception("An error occured while running " + 
                  "the parallel processing library.", ex);
    }
}
在将数据输入集分成更小的部分并创建线程后,我们只需处理每个数据块并使用 DynamicInvoke() 方法调用执行实际处理的原始方法。此过程发生在 CreateThreads() 方法中。
private void CreateThreads(object state)
{
    try
    {
        object[] _state = state as object[];
        int index = (int)_state[0];
        int start = (int)_state[1];
        int end = (int)_state[2];
        for (int i = start; i < end; i++)
        {
            try
            {
                object[] newArgs = new object[ProcArgs.Length + 1];
                newArgs[0] = (object)ProcList[i];
                for (int j = 0; j < ProcArgs.Length; j++)
                {
                    newArgs[j + 1] = ProcArgs[j];
                }
                resultSet[i] = ProcMethod.DynamicInvoke(newArgs);
            }
            catch (Exception ex)
            {
                resultSet[i] = ex;
            }
        }
        ThreadEvents[index].Set();
    }
    catch (Exception ex)
    {
    throw new Exception("An error occured while processing threads.", ex);
    }
}
结论
为了测试库的性能,我运行了几个测试,这些测试在输入数据集长度和线程数量上有所不同。 结果证明,多处理机制有效,并且随着处理的数据量增加,可以大大提高性能。
可以通过下表轻松监控提供的性能提升
| 员工人数 | 单线程 (秒) | 多线程 (秒) | 性能提升 (%) | |
| 10 个线程 | 50 个线程 | |||
| 1000 | 6.391 | 5.216 | 6.039 | 12.86 | 
| 2000 | 22.419 | 14.604 | 13.376 | 40.33 | 
| 5000 | 205.272 | 78.415 | 56.814 | 72.32 | 


