65.9K
CodeProject 正在变化。 阅读更多。
Home

并行处理

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.11/5 (3投票s)

2009年9月21日

CPOL

3分钟阅读

viewsIcon

198781

downloadIcon

1

使用 Parallel Extensions (TPL) 利用 CPU 核心。

引言

正如你可能从我之前的文章中看出的那样,我对利用硬件资源(主要是 CPU 核心)进行优化情有独钟。本文也不例外,涉及新的 TPL 库。此示例使用 .NET 3.5 的 CTP Parallel Extensions,面向中级受众。

背景

此项目的目标是利用 TPL 来加速简单报表的计算性能。此示例使用 LINQ group by,然后使用 TPL 并行执行计算。该代码尝试一种相当典型的顺序计算,并报告所用的时间;紧接着以并行方式运行相同的计算并显示结果。

由于此项目面向中级开发人员,因此我不会详细介绍此代码的某些方面。读者应该对 LINQ、谓词以及 TPL 有一定的了解。

使用代码

该项目利用了一个抽象类。我不想重写计时器和计算代码两次,如果我有足够的冒险精神来尝试第三种理论,也许会重写三次。

需要重写的方法是 StartCalculations;如果我选择创建更多方法,则其余代码将处理各种方法的比较计时器。

using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;

namespace ParallelClass
{
    public abstract class ReportCalculations
    {
        private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();
        private long _elapsedTime;

        public long Elapsed_ms
        {
            get { return _elapsedTime; }
        }


        public void Begin(IEnumerable<IGrouping<int, 
               IGrouping<int, CompanyInfo>>> companyGroups)
        {
            var sw = new Stopwatch();
            sw.Start();

            StartCalculations(companyGroups);

            sw.Stop();
            _elapsedTime = sw.ElapsedMilliseconds;
        }

        public virtual string Name
        {
            get { return "Generic Report Class"; }
        }

        public virtual void StartCalculations(IEnumerable<IGrouping<int, 
               IGrouping<int, CompanyInfo>>> companyGroups)
        {
            
        }
    }
}

其余代码非常简单。该类接受一个 int,该 int 定义要在数据库中初始化的事务行数。在本例中,数据库只是 CompanyInfo 类的集合。

在抽象类中,你会注意到它传递了一个有趣的 IEnumerable 对象。这是在下面的 GetGrouping 方法中的嵌套 LINQ 查询的结果。GetGrouping 方法按 CompanyID,然后按 TransactionCode 对对象进行分组,因此通过 TPL 更容易处理多个计算。

PopulateCompanyTransactions 方法随机生成所有将用于排序和计算的事务。

在此示例中,我有两个派生自抽象类 ReportCalculations 的类。

它们是 MySeqMyTPLMySeq 运行一个典型的顺序循环,该循环在单个线程上计算每个事务组。后者,MyTPL,在可能的情况下利用所有可用的 CPU 来计算总和。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ParallelClass
{
    public class CompanyInfo
    {
        public int CompanyId { get; set; }
        public int TransactionCode { get; set; }
        public decimal Amount { get; set; }
    }

    public class Process
    {
        
       public Process(int recordsToProcess)
        {
            var rec = PopulateCompanyTransactions(recordsToProcess);
            var grouping = GetGrouping(rec);

            var calcClasses = new List<ReportCalculations> { new MySeq(), new MyTPL() };
            
            foreach(var calc in calcClasses)
            {
                calc.Begin(grouping);
                Console.WriteLine("{0} : {1}", calc.Name, calc.Elapsed_ms);
            }
            
            Console.ReadLine();
        }
       
        //Group records by Company then by Transaction
        private static IEnumerable<IGrouping<int, IGrouping<int, 
                CompanyInfo>>> GetGrouping(IEnumerable<CompanyInfo> companyInfos)
        {
            var query = from company in companyInfos
                        group company by company.CompanyId
                        into companyGroup
                            from transactionGroup in
                            (
                                from company in companyGroup
                                group company by company.TransactionCode
                            )
                            group transactionGroup by companyGroup.Key;

            return query;
        }


        //Populate record values with random data
        private static List<CompanyInfo> PopulateCompanyTransactions(int totalRecords)
        {
            var rnd = new Random();
            var companyInfo = new List<CompanyInfo>();

            for (int count = 0; count < totalRecords; count++)
                companyInfo.Add(new CompanyInfo
                            {
                                Amount = (decimal) (rnd.Next(-50, 1000)*rnd.NextDouble()),
                                CompanyId = rnd.Next(0, 100),
                                TransactionCode = rnd.Next(100, 120)
                            });
            return companyInfo;
        }
    }

    public class MySeq : ReportCalculations
    {
        private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();
        public override string Name { get { return "Sequential"; } }

        public override void StartCalculations(IEnumerable<IGrouping<int, 
               IGrouping<int, CompanyInfo>>> companyGroups)
        {
            foreach (var firstGroup in companyGroups)
            {
                foreach (var secondGroup in firstGroup)
                {
                    decimal total = 0;
                    foreach (var details in secondGroup)
                        total += details.Amount;

                    _totals.Add(new CompanyInfo { Amount = total, 
                       CompanyId = firstGroup.Key, TransactionCode = secondGroup.Key });
                }
            }

        }
    }

    public class MyTPL : ReportCalculations
    {
        private readonly List<CompanyInfo> _totals = new List<CompanyInfo>();

        public override string Name { get { return "TPL"; } }

        public override void StartCalculations(IEnumerable<IGrouping<int, 
               IGrouping<int, CompanyInfo>>> companyGroups)
        {
            
            foreach (var firstGroup in companyGroups)
                Parallel.ForEach(firstGroup, group => Calculate(group, firstGroup.Key));
        }

        //TPL Parallel method
        private void Calculate(IGrouping<int, CompanyInfo> grouping, int companyID)
        {
            decimal total = 0;
            Parallel.ForEach(grouping, g => { total += g.Amount; });
            _totals.Add(new CompanyInfo { Amount = total, 
               CompanyId = companyID, TransactionCode = grouping.Key });
        }   
    }
}

结论

最初,我预计计算结果不会有太大的差异,因为我的假设是,在底层线程处理程序进行分析时,最后一个线程已经完成,并且对于有限级别的事务来说,情况似乎确实如此。

我的经验表明,当事务组包含足够的行,以便线程在处理程序检查时仍然处于活动状态时,性能会得到显着提升。在本例中,这种提升大约从 100 万个事务开始。对于 1000 万个事务,我一直获得 23%-25% 的性能提升,但是对于低于 100 万个事务的任何事物,结果都慢得多。

该项目的影响应该很明显,即尽管并行模型在理论上更有效,但是在实施之前,有些情况需要更多的调查和基准测试。

此示例不应作为你自己的工作的权威基准测试指南,因为每种并行设计的情况都将是唯一的。

为了好玩,请尝试创建一个新的计算类,并在任何数量的事务上超越 MySeqMyTPL 类。

© . All rights reserved.