使用并发与协调运行时 (CCR) 的并行递归方法






4.40/5 (11投票s)
使用 CCR 实现多线程目录大小计算
引言
本文介绍如何使用并发与协调运行时 (CCR) 通过异步递归方法计算目录大小。
以下是计算网络驱动器大小的结果。网络驱动器的 I/O 延迟突出了并行执行的优势。

递归
递归计算目录大小很简单
static long SerialRecursiveDirectorySize(string path)
{
return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) +
Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize);
}
LINQ 查询计算每个文件的大小和每个子目录的大小(递归),然后将所有结果加总。
并行递归
并发与协调运行时 (CCR) 通过声明方法之间的关系,允许在不同的线程中执行方法。更多信息和链接,请参见之前的文章 使用并发与协调运行时的管道和过滤器并发设计模式。
long ParallelDirectorySize(string path)
{
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue queue = new DispatcherQueue
("Pipeline DispatcherQueue", dispatcher);
var outputPort = new Port();
ParallelDirectorySizeRecursive(path,queue,outputPort);
return queue.WaitOnPort(outputSizePort);
}
}
调度器是持有线程池的对象。调度器队列保存可以立即执行并等待线程可用的待处理委托列表。
ParallelDirectorySizeRecursive
方法通过在调度器队列中入队任务来计算目录的大小。计算完成后,它将结果发布到 outputPort
。
端口本质上是两个队列。第一个队列保存(数据)消息,第二个队列保存有兴趣处理这些消息的方法。WaitOnPort
是一个扩展方法(不是 CCR 的一部分),它阻塞直到收到消息并返回收到的值。
异步递归实现入队以下任务:
- 计算目录中总文件大小的任务。
此任务计划立即执行。完成后,它将结果发布到inputPort
。 - 每个递归调用都入队一个任务,将子目录大小发布到
inputPort
。递归调用被调用subDirectories.Length
次。 - 将
inputPort
中的所有结果相加并将其发布到outputPort
的任务。
此任务计划在inputPort
中有subDirectories.Length+1
个值时执行(这意味着文件大小和递归子目录大小计算已完成)。
void ParallelDirectorySizeRecursive(string directory,
DispatcherQueue queue,
Port outputPort)
{
var subDirectories = Directory.GetDirectories(directory);
var inputPort = new Port();
Arbiter.Activate(queue,
Arbiter.FromHandler(
delegate()
{
inputPort.Post(TotalFileSize(directory));
}),
Arbiter.MultipleItemReceive(
false,
inputPort,
subDirectories.Length + 1,
delegate(long[] subDirSize)
{
outputPort.Post(subDirSize.Sum());
}));
foreach (string subDir in subDirectories)
{
ParallelDirectorySizeRecursive(subDir,queue, inputPort);
}
}
long TotalFileSize(string path)
{
return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length);
}
如果您不熟悉匿名方法(委托),请注意这些任务在单独的线程上运行,但它们仍然可以访问局部变量(例如 directory
和 subDirSize
)。实际上发生的是它们收到一个可以在其线程中使用的副本。这使代码更短。
错误处理
我们希望递归能够从 UnauthorizedAccessException
异常中恢复。简单的递归实现接收一个 Collection
用于收集捕获的异常。所有递归调用都接收相同的集合对象。
long SerialRecursiveDirectorySizeErrorHandling
(string path,Collection<Exception> errors)
{
long fileSize = 0;
var subDirs = new string[] {};
try
{
fileSize = TotalFileSize(path);
subDirs = Directory.GetDirectories(path);
}
catch (UnauthorizedAccessException ex)
{
errors.Add(ex);
}
return fileSize + subDirs.Sum
(p => SerialRecursiveDirectorySizeErrorHandling(p,errors));
}
类似地,并行递归接收一个 Port
对象(见下面的 outputPort.P1
)。相同的异常端口对象传递给所有递归调用。
这里有一个需要注意的地方 - 即使发生错误,该方法也必须将零结果发布到 outputPort
(见下面的 outputPort.P0
)。这是必要的,因为“Sum
”任务没有监听异常端口,它一直等待正好 subDirectories.Length + 1
个结果。
void ParallelDirectorySizeWithErrorHandlerRecursive(
string directory,
DispatcherQueue queue,
PortSet<long,Exception> outputPort)
{
var subDirectories = new string[] {};
try
{
subDirectories = Directory.GetDirectories(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P0.Post(0);
outputPort.P1.Post(ex);
return;
}
var inputPort = new PortSet<long,Exception>(new Port<long>(), outputPort.P1);
Arbiter.Activate(queue,
Arbiter.FromHandler(
delegate()
{
long size = 0;
try
{
size = TotalFileSize(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P1.Post(ex);
}
finally
{
inputPort.P0.Post(size);
}
}),
Arbiter.MultipleItemReceive(
false,
inputPort.P0,
subDirectories.Length + 1,
delegate(long[] subDirSize)
{
outputPort.P0.Post(subDirSize.Sum());
}));
foreach (string subDir in subDirectories)
{
ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
}
}
历史
- 2008年10月11日:初始版本