并行处理






4.11/5 (3投票s)
使用 Parallel Extensions (TPL) 利用 CPU 核心。
引言
正如你可能从我之前的文章中看出的那样,我对利用硬件资源(主要是 CPU 核心)进行优化情有独钟。本文也不例外,涉及新的 TPL 库。此示例使用 .NET 3.5 的 CTP Parallel Extensions,面向中级受众。
背景
此项目的目标是利用 TPL 来加速简单报表的计算性能。此示例使用 LINQ group by,然后使用 TPL 并行执行计算。该代码尝试一种相当典型的顺序计算,并报告所用的时间;紧接着以并行方式运行相同的计算并显示结果。
由于此项目面向中级开发人员,因此我不会详细介绍此代码的某些方面。读者应该对 LINQ、谓词以及 TPL 有一定的了解。
使用代码
该项目利用了一个抽象类。我不想重写计时器和计算代码两次,如果我有足够的冒险精神来尝试第三种理论,也许会重写三次。
需要重写的方法是 StartCalculations
;如果我选择创建更多方法,则其余代码将处理各种方法的比较计时器。
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
namespace ParallelClass
{
public abstract class ReportCalculations
{
private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();
private long _elapsedTime;
public long Elapsed_ms
{
get { return _elapsedTime; }
}
public void Begin(IEnumerable<IGrouping<int,
IGrouping<int, CompanyInfo>>> companyGroups)
{
var sw = new Stopwatch();
sw.Start();
StartCalculations(companyGroups);
sw.Stop();
_elapsedTime = sw.ElapsedMilliseconds;
}
public virtual string Name
{
get { return "Generic Report Class"; }
}
public virtual void StartCalculations(IEnumerable<IGrouping<int,
IGrouping<int, CompanyInfo>>> companyGroups)
{
}
}
}
其余代码非常简单。该类接受一个 int
,该 int
定义要在数据库中初始化的事务行数。在本例中,数据库只是 CompanyInfo
类的集合。
在抽象类中,你会注意到它传递了一个有趣的 IEnumerable
对象。这是在下面的 GetGrouping
方法中的嵌套 LINQ 查询的结果。GetGrouping
方法按 CompanyID,然后按 TransactionCode 对对象进行分组,因此通过 TPL 更容易处理多个计算。
PopulateCompanyTransactions
方法随机生成所有将用于排序和计算的事务。
在此示例中,我有两个派生自抽象类 ReportCalculations
的类。
它们是 MySeq
和 MyTPL
。MySeq
运行一个典型的顺序循环,该循环在单个线程上计算每个事务组。后者,MyTPL
,在可能的情况下利用所有可用的 CPU 来计算总和。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ParallelClass
{
public class CompanyInfo
{
public int CompanyId { get; set; }
public int TransactionCode { get; set; }
public decimal Amount { get; set; }
}
public class Process
{
public Process(int recordsToProcess)
{
var rec = PopulateCompanyTransactions(recordsToProcess);
var grouping = GetGrouping(rec);
var calcClasses = new List<ReportCalculations> { new MySeq(), new MyTPL() };
foreach(var calc in calcClasses)
{
calc.Begin(grouping);
Console.WriteLine("{0} : {1}", calc.Name, calc.Elapsed_ms);
}
Console.ReadLine();
}
//Group records by Company then by Transaction
private static IEnumerable<IGrouping<int, IGrouping<int,
CompanyInfo>>> GetGrouping(IEnumerable<CompanyInfo> companyInfos)
{
var query = from company in companyInfos
group company by company.CompanyId
into companyGroup
from transactionGroup in
(
from company in companyGroup
group company by company.TransactionCode
)
group transactionGroup by companyGroup.Key;
return query;
}
//Populate record values with random data
private static List<CompanyInfo> PopulateCompanyTransactions(int totalRecords)
{
var rnd = new Random();
var companyInfo = new List<CompanyInfo>();
for (int count = 0; count < totalRecords; count++)
companyInfo.Add(new CompanyInfo
{
Amount = (decimal) (rnd.Next(-50, 1000)*rnd.NextDouble()),
CompanyId = rnd.Next(0, 100),
TransactionCode = rnd.Next(100, 120)
});
return companyInfo;
}
}
public class MySeq : ReportCalculations
{
private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();
public override string Name { get { return "Sequential"; } }
public override void StartCalculations(IEnumerable<IGrouping<int,
IGrouping<int, CompanyInfo>>> companyGroups)
{
foreach (var firstGroup in companyGroups)
{
foreach (var secondGroup in firstGroup)
{
decimal total = 0;
foreach (var details in secondGroup)
total += details.Amount;
_totals.Add(new CompanyInfo { Amount = total,
CompanyId = firstGroup.Key, TransactionCode = secondGroup.Key });
}
}
}
}
public class MyTPL : ReportCalculations
{
private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();
public override string Name { get { return "TPL"; } }
public override void StartCalculations(IEnumerable<IGrouping<int,
IGrouping<int, CompanyInfo>>> companyGroups)
{
foreach (var firstGroup in companyGroups)
Parallel.ForEach(firstGroup, group => Calculate(group, firstGroup.Key));
}
//TPL Parallel method
private void Calculate(IGrouping<int, CompanyInfo> grouping, int companyID)
{
decimal total = 0;
Parallel.ForEach(grouping, g => { total += g.Amount; });
_totals.Add(new CompanyInfo { Amount = total,
CompanyId = companyID, TransactionCode = grouping.Key });
}
}
}
结论
最初,我预计计算结果不会有太大的差异,因为我的假设是,在底层线程处理程序进行分析时,最后一个线程已经完成,并且对于有限级别的事务来说,情况似乎确实如此。
我的经验表明,当事务组包含足够的行,以便线程在处理程序检查时仍然处于活动状态时,性能会得到显着提升。在本例中,这种提升大约从 100 万个事务开始。对于 1000 万个事务,我一直获得 23%-25% 的性能提升,但是对于低于 100 万个事务的任何事物,结果都慢得多。
该项目的影响应该很明显,即尽管并行模型在理论上更有效,但是在实施之前,有些情况需要更多的调查和基准测试。
此示例不应作为你自己的工作的权威基准测试指南,因为每种并行设计的情况都将是唯一的。
为了好玩,请尝试创建一个新的计算类,并在任何数量的事务上超越 MySeq
和 MyTPL
类。