易于使用的并行 foreach,其速度比 .NET 版本快几个数量级





5.00/5 (18投票s)
嵌套循环的并行 foreach 循环实现
引言
我想分享一个并行 foreach
循环的快速解决方案,它有可能提高当前使用 .NET 4.0 中引入的内置版本的某些应用程序的性能。此自定义版本适用于嵌套循环,其中外部循环需要按顺序处理。线程数需要手动设置,这可以提供对线程的额外控制。
背景
最近,我遇到一个问题,我有一个可以独立处理的项目集合,但我也有一个外部循环,我必须按顺序对所述集合进行后处理。重复创建内部 Parallel.Foreach
会产生很大的开销,导致性能远不如使用非并行循环。作为解决方案,我创建了一个类,可以实例化许多必要的对象在循环之外,从而减轻管理开销。
这个控制台应用程序是一个演示,用于演示潜在的性能提升。
我使用了一个质数查找算法 (FindPrimeNumber(int n)
) 作为计算密集型函数。
static void Main(string[] args)
{
int numberOfThreads = 7;
int numberOfIterations = 10000;
int startOfRange = 80;
List<int> numbers = new List<int>();
numbers = Enumerable.Range(startOfRange, 7).ToList();
Console.WriteLine($"Parallel processing has started...");
var stopwatch = Stopwatch.StartNew();
stopwatch.Start();
ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber);
for (int j = 0; j < numberOfIterations; j++)
{
pp.ForEach(numbers);
}
stopwatch.Stop();
Console.WriteLine($"Job's (custom) done over {stopwatch.ElapsedMilliseconds} ms");
Console.WriteLine($"Process started: Built-in ForEach");
stopwatch.Reset();
stopwatch.Start();
for (int j = 0; j < numberOfIterations; j++)
{
Parallel.ForEach(numbers, (currentNumber) =>
{
FindPrimeNumber(currentNumber);
});
}
Console.WriteLine($"Job's (built-in) done over {stopwatch.ElapsedMilliseconds} ms");
Console.WriteLine($"Process started: sequential");
stopwatch.Reset();
stopwatch.Start();
for (int j = 0; j < numberOfIterations; j++)
{
for (int i = 0; i < numbers.Count; i++)
{
FindPrimeNumber(numbers[i]);
}
}
Console.WriteLine($"Job's (sequential) done over {stopwatch.ElapsedMilliseconds} ms");
Console.ReadKey();
}
static void FindPrimeNumber(int n)
{
int count = 0;
long a = 2;
while (count < n)
{
long b = 2;
int prime = 1;
while (b * b <= a)
{
if (a % b == 0)
{
prime = 0;
break;
}
b++;
}
if (prime > 0)
{
count++;
}
a++;
}
}
这个演示应用程序产生了以下输出
Parallel processing has started...
Job's (custom) done over 994 ms
Process started: Built-in ForEach
Job's (built-in) done over 3159 ms
Process started: sequential
Job's (sequential) done over 2262 ms
Using the Code
这个类本身非常简单
public class ParallelProcessor<T>
{
SlicedList<T>[] listSlices;
int numberOfThreads;
Action<T> action;
ManualResetEvent[] manualResetEvents;
public ParallelProcessor(int NumberOfThreads, Action<T> Action)
{
this.numberOfThreads = NumberOfThreads;
this.listSlices = new SlicedList<T>[numberOfThreads];
this.action = Action;
this.manualResetEvents = new ManualResetEvent[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
listSlices[i] = new SlicedList<T>();
manualResetEvents[i] = new ManualResetEvent(false);
listSlices[i].indexes = new LinkedList<int>();
listSlices[i].manualResetEvent = manualResetEvents[i];
}
}
public void ForEach(List<T> Items)
{
prepareListSlices(Items);
for (int i = 0; i < numberOfThreads; i++)
{
manualResetEvents[i].Reset();
ThreadPool.QueueUserWorkItem(new WaitCallback(
DoWork), listSlices[i]);
}
WaitHandle.WaitAll(manualResetEvents);
}
private void prepareListSlices(List<T> items)
{
for (int i = 0; i < numberOfThreads; i++)
{
listSlices[i].items = items;
listSlices[i].indexes.Clear();
}
for (int i = 0; i < items.Count; i++)
{
listSlices[i % numberOfThreads].indexes.AddLast(i);
}
}
private void DoWork(object o)
{
SlicedList<T> slicedList = (SlicedList<T>)o;
foreach (int i in slicedList.indexes)
{
action(slicedList.items[i]);
}
slicedList.manualResetEvent.Set();
}
}
public class SlicedList<T>
{
public List<T> items;
public LinkedList<int> indexes;
public ManualResetEvent manualResetEvent;
}
要使用该函数,需要创建类的实例,并使用集合作为参数调用 ForEach()
方法,如演示应用程序所示
ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber);
pp.ForEach(numbers);