65.9K
CodeProject 正在变化。 阅读更多。
Home

轻松实现 ASP.NET 多线程

starIconstarIconemptyStarIconemptyStarIconemptyStarIcon

2.00/5 (2投票s)

2011年6月8日

CPOL

2分钟阅读

viewsIcon

31421

downloadIcon

580

一个迷你库,可以将您的常规方法作为多线程运行。

引言

虽然 BackgroundWorker 使开发多线程桌面应用程序变得非常容易,但开发人员在开发 ASP.NET 应用程序时实际上需要编写自己的代码来完成相同的任务。 

这个过程包括将数据输入分成更小的部分创建线程将任务分配给线程从先前创建的线程收集结果

为了提供一种简单快捷的方法来转换耗时的操作,我创建了一个小库,以便能够以多线程方式使用您自己的方法。 整个想法是编写一个执行任务的方法,并将其传递给库,以多线程方式而不是单线程方式运行它。 

背景 

该项目主要基于我之前的两个项目的组合。 其中一个是创建类似于 DotLiquid 的模板引擎,以提供可定制的报告输出。 另一个基本上是使用基于 Gold Parser 的动态公式来加速 3600 多名员工的工资计算。

我使用了多线程工资计算引擎,并结合了模板引擎中的代码对其进行了泛化。    

使用代码 

让我们从一个设计为单线程运行的代码开始。  

private IList<Employee> m_employees
{
    get
    {
        return Session["Employees"] as IList<Employee>;
    }
    set
    {
        Session["Employees"] = value;
    }
}

protected void Page_Load(object sender, EventArgs e)
{
    if (!IsPostBack)
    {
        IList<Employee> eList = DAL.SelectHQL<Employee>("select p from Personel p", 5000);
        m_employees = eList;
    }
}
protected void btRunSingleThreaded_Click(object sender, EventArgs e)
{
    try
    {
        object[] results = new object[m_employees.Count];
        int counter = 0;
        foreach (Employee emp in m_employees)
        {
            results[counter] = GetLastPosition(emp);
            counter++;
        }
    }
    catch (Exception ex)
    {
        throw ex;
    }
}
private Position GetLastPosition(Employee emp)
{
    try
    {
        string hql = @"
from
    Position as pos
    inner join fetch pos.Sort as srt
where
    pos.Employee.ID = ?
    order by
    srt.No asc";

        Position lastPosition = DAL.SelectHQL<Position>(hql, 0, emp.ID).LastOrDefault();
        return lastPosition;
    }
    catch (Exception ex)
    {
        throw ex;
    }
}

其中 DAL 代表 数据访问层,这是我们公司框架的一个类。

要将之前的 GetLastPosition() 方法转换为以多线程方式工作,我们只需使用 QuickParallelization 类。

protected void btRunMultiThreaded_Click(object sender, EventArgs e)
{
    try
    {
        ParallelOperationsLibrary lib = new ParallelOperationsLibrary();
        object[] lastPositons = lib.RunParallel(m_employees, 50, new Func<Employee, Position>(this.GetLastPosition));
    }
    catch (Exception ex)
    {
        throw ex;
    }
}

为了更好地理解整个过程的工作原理,让我们检查一下该库本身。 

public class ParallelOperationsLibrary
{
private static ManualResetEvent[] ThreadEvents;  // Array to collect the notifications for one or more waiting threads
private Delegate ProcMethod;                     // Delegate that refers to the method which run with every single process
private object[] ProcArgs;                       // Array to pass required arguments the thread block
private List<object> ProcList;             // Collection of data as input
object[] resultSet;                              // Array of data as output

public ParallelOperationsLibrary()
{}
}

主要入口点是 RunParallel 方法本身,它将整个输入分成更小的块进行处理,创建线程,运行线程并收集结果。 此外,创建的最大线程数限制为 64,因为这是允许的最大数量,超过此值将导致抛出异常。 

public object[] RunParallel(
                            object list, 
                            int threadCount,
                            Delegate method,
                            params object[] args)
{
    try
    {
        if(threadCount > 64)
        {
            threadCount = 64;
        }
        List<object> oList = new List<object>();
        if (list is IEnumerable)
        {
            var oq = ((IEnumerable)list).Cast<object>().ToList();
            oList = oq.ToList();
        }
        resultSet = new object[oList.Count];
        ProcMethod = method;
        ProcArgs = args;
        ProcList = oList;
        ThreadEvents = new ManualResetEvent[threadCount];
        int startIndex, endIndex = 0;
        int partCount = (int)Math.Ceiling((decimal)oList.Count / threadCount);
        for (int i = 0; i < threadCount; i++)
        {
            startIndex = partCount * i;
            endIndex += partCount;
            if (endIndex > oList.Count)
            {
                endIndex = oList.Count;
            }
            ThreadEvents[i] = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(new WaitCallback(this.CreateThreads),
                       new object[] { i, startIndex, endIndex });
        }
        WaitHandle.WaitAll(ThreadEvents);
        return resultSet;
    }
    catch (Exception ex)
    {
        throw new Exception("An error occured while running " + 
                  "the parallel processing library.", ex);
    }
}

在将数据输入集分成更小的部分并创建线程后,我们只需处理每个数据块并使用 DynamicInvoke() 方法调用执行实际处理的原始方法。此过程发生在 CreateThreads() 方法中。

private void CreateThreads(object state)
{
    try
    {
        object[] _state = state as object[];
        int index = (int)_state[0];
        int start = (int)_state[1];
        int end = (int)_state[2];
        for (int i = start; i < end; i++)
        {
            try
            {
                object[] newArgs = new object[ProcArgs.Length + 1];
                newArgs[0] = (object)ProcList[i];
                for (int j = 0; j < ProcArgs.Length; j++)
                {
                    newArgs[j + 1] = ProcArgs[j];
                }
                resultSet[i] = ProcMethod.DynamicInvoke(newArgs);
            }
            catch (Exception ex)
            {
                resultSet[i] = ex;
            }
        }
        ThreadEvents[index].Set();
    }
    catch (Exception ex)
    {
    throw new Exception("An error occured while processing threads.", ex);
    }
}

结论

为了测试库的性能,我运行了几个测试,这些测试在输入数据集长度和线程数量上有所不同。 结果证明,多处理机制有效,并且随着处理的数据量增加,可以大大提高性能。

可以通过下表轻松监控提供的性能提升

员工人数 单线程 (秒) 多线程 (秒) 性能提升 (%)
10 个线程 50 个线程
1000 6.391 5.216 6.039 12.86
2000 22.419 14.604 13.376 40.33
5000 205.272 78.415 56.814 72.32
© . All rights reserved.