65.9K
CodeProject 正在变化。 阅读更多。
Home

易于使用的并行 foreach,其速度比 .NET 版本快几个数量级

starIconstarIconstarIconstarIconstarIcon

5.00/5 (18投票s)

2018年7月8日

CPOL

1分钟阅读

viewsIcon

34001

嵌套循环的并行 foreach 循环实现

引言

我想分享一个并行 foreach 循环的快速解决方案,它有可能提高当前使用 .NET 4.0 中引入的内置版本的某些应用程序的性能。此自定义版本适用于嵌套循环,其中外部循环需要按顺序处理。线程数需要手动设置,这可以提供对线程的额外控制。

背景

最近,我遇到一个问题,我有一个可以独立处理的项目集合,但我也有一个外部循环,我必须按顺序对所述集合进行后处理。重复创建内部 Parallel.Foreach 会产生很大的开销,导致性能远不如使用非并行循环。作为解决方案,我创建了一个类,可以实例化许多必要的对象在循环之外,从而减轻管理开销。

这个控制台应用程序是一个演示,用于演示潜在的性能提升。

我使用了一个质数查找算法 (FindPrimeNumber(int n)) 作为计算密集型函数。

        static void Main(string[] args)
        {
            int numberOfThreads = 7;
            int numberOfIterations = 10000;
            int startOfRange = 80;

            List<int> numbers = new List<int>();
            numbers = Enumerable.Range(startOfRange, 7).ToList();

            Console.WriteLine($"Parallel processing has started...");

            var stopwatch = Stopwatch.StartNew();
            stopwatch.Start();

            ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber);

            for (int j = 0; j < numberOfIterations; j++)
            {
                pp.ForEach(numbers);
            }

            stopwatch.Stop();
            Console.WriteLine($"Job's (custom) done over {stopwatch.ElapsedMilliseconds} ms");

            Console.WriteLine($"Process started: Built-in ForEach");

            stopwatch.Reset();
            stopwatch.Start();

            for (int j = 0; j < numberOfIterations; j++)
            {
                Parallel.ForEach(numbers, (currentNumber) =>
                {
                    FindPrimeNumber(currentNumber);
                });
            }

            Console.WriteLine($"Job's (built-in) done over {stopwatch.ElapsedMilliseconds} ms");
            Console.WriteLine($"Process started: sequential");
            stopwatch.Reset();
            stopwatch.Start();

            for (int j = 0; j < numberOfIterations; j++)
            {
                for (int i = 0; i < numbers.Count; i++)
                {
                    FindPrimeNumber(numbers[i]);
                }
            }

            Console.WriteLine($"Job's (sequential) done over {stopwatch.ElapsedMilliseconds} ms");
            Console.ReadKey();
        }

        static void FindPrimeNumber(int n)
        {
            int count = 0;
            long a = 2;
            while (count < n)
            {
                long b = 2;
                int prime = 1;
                while (b * b <= a)
                {
                    if (a % b == 0)
                    {
                        prime = 0;
                        break;
                    }
                    b++;
                }
                if (prime > 0)
                {
                    count++;
                }
                a++;
            }
        }

这个演示应用程序产生了以下输出

Parallel processing has started...
Job's (custom) done over 994 ms
Process started: Built-in ForEach
Job's (built-in) done over 3159 ms
Process started: sequential
Job's (sequential) done over 2262 ms

Using the Code

这个类本身非常简单

    public class ParallelProcessor<T>
    {
        SlicedList<T>[] listSlices;
        int numberOfThreads;
        Action<T> action;
        ManualResetEvent[] manualResetEvents;

        public ParallelProcessor(int NumberOfThreads, Action<T> Action)
        {
            this.numberOfThreads = NumberOfThreads;
            this.listSlices = new SlicedList<T>[numberOfThreads];
            this.action = Action;
            this.manualResetEvents = new ManualResetEvent[numberOfThreads];

            for (int i = 0; i < numberOfThreads; i++)
            {
                listSlices[i] = new SlicedList<T>();
                manualResetEvents[i] = new ManualResetEvent(false);
                listSlices[i].indexes = new LinkedList<int>();
                listSlices[i].manualResetEvent = manualResetEvents[i];
            }
        }

        public void ForEach(List<T> Items)
        {
            prepareListSlices(Items);
            for (int i = 0; i < numberOfThreads; i++)
            {
                manualResetEvents[i].Reset();
                ThreadPool.QueueUserWorkItem(new WaitCallback(
                    DoWork), listSlices[i]);
            }
            WaitHandle.WaitAll(manualResetEvents);
        }

        private void prepareListSlices(List<T> items)
        {
            for (int i = 0; i < numberOfThreads; i++)
            {
                listSlices[i].items = items;
                listSlices[i].indexes.Clear();
            }
            for (int i = 0; i < items.Count; i++)
            {
                listSlices[i % numberOfThreads].indexes.AddLast(i);
            }
        }

        private void DoWork(object o)
        {
            SlicedList<T> slicedList = (SlicedList<T>)o;

            foreach (int i in slicedList.indexes)
            {
                action(slicedList.items[i]);
            }
            slicedList.manualResetEvent.Set();
        }
    }

    public class SlicedList<T>
    {
        public List<T> items;
        public LinkedList<int> indexes;
        public ManualResetEvent manualResetEvent;
    }

要使用该函数,需要创建类的实例,并使用集合作为参数调用 ForEach() 方法,如演示应用程序所示

 ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber); 
 pp.ForEach(numbers);
© . All rights reserved.