65.9K
CodeProject 正在变化。 阅读更多。
Home

使用并发与协调运行时 (CCR) 的并行递归方法

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.40/5 (11投票s)

2008年10月11日

Ms-PL

2分钟阅读

viewsIcon

47914

downloadIcon

562

使用 CCR 实现多线程目录大小计算

引言

本文介绍如何使用并发与协调运行时 (CCR) 通过异步递归方法计算目录大小。

以下是计算网络驱动器大小的结果。网络驱动器的 I/O 延迟突出了并行执行的优势。

directorysize_networkdrive_results.png

递归

递归计算目录大小很简单

static long SerialRecursiveDirectorySize(string path)
{
   return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) +
          Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize);
}

LINQ 查询计算每个文件的大小和每个子目录的大小(递归),然后将所有结果加总。

并行递归

并发与协调运行时 (CCR) 通过声明方法之间的关系,允许在不同的线程中执行方法。更多信息和链接,请参见之前的文章 使用并发与协调运行时的管道和过滤器并发设计模式

long ParallelDirectorySize(string path)
{
   using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
   {   
      DispatcherQueue queue = new DispatcherQueue
				("Pipeline DispatcherQueue", dispatcher);
                
      var outputPort = new Port();
      ParallelDirectorySizeRecursive(path,queue,outputPort);

      return queue.WaitOnPort(outputSizePort);
    }
}

调度器是持有线程池的对象。调度器队列保存可以立即执行并等待线程可用的待处理委托列表。

ParallelDirectorySizeRecursive 方法通过在调度器队列中入队任务来计算目录的大小。计算完成后,它将结果发布到 outputPort
端口本质上是两个队列。第一个队列保存(数据)消息,第二个队列保存有兴趣处理这些消息的方法。WaitOnPort 是一个扩展方法(不是 CCR 的一部分),它阻塞直到收到消息并返回收到的值。

异步递归实现入队以下任务:

  • 计算目录中总文件大小的任务。
    此任务计划立即执行。完成后,它将结果发布到 inputPort
  • 每个递归调用都入队一个任务,将子目录大小发布到 inputPort。递归调用被调用 subDirectories.Length 次。
  • inputPort 中的所有结果相加并将其发布到 outputPort 的任务。
    此任务计划在 inputPort 中有 subDirectories.Length+1 个值时执行(这意味着文件大小和递归子目录大小计算已完成)。
void ParallelDirectorySizeRecursive(string directory,
                                    DispatcherQueue queue,
                                    Port outputPort)
{      
    var subDirectories = Directory.GetDirectories(directory);
    var inputPort = new Port();

    Arbiter.Activate(queue,
          Arbiter.FromHandler(            
                  delegate()
                  {
                       inputPort.Post(TotalFileSize(directory));               
                  }),
          Arbiter.MultipleItemReceive(
                       false,
                       inputPort,
                       subDirectories.Length + 1,
                       delegate(long[] subDirSize)
                       {
                             outputPort.Post(subDirSize.Sum());
                       }));
            
    foreach (string subDir in subDirectories)
    {    
          ParallelDirectorySizeRecursive(subDir,queue, inputPort);
    }             
}

long TotalFileSize(string path)
{
      return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length);
}

如果您不熟悉匿名方法(委托),请注意这些任务在单独的线程上运行,但它们仍然可以访问局部变量(例如 directorysubDirSize)。实际上发生的是它们收到一个可以在其线程中使用的副本。这使代码更短。

错误处理

我们希望递归能够从 UnauthorizedAccessException 异常中恢复。简单的递归实现接收一个 Collection 用于收集捕获的异常。所有递归调用都接收相同的集合对象。

long SerialRecursiveDirectorySizeErrorHandling
	(string path,Collection<Exception> errors)
{
   long fileSize = 0;
   var subDirs = new string[] {};
   try
   {
      fileSize = TotalFileSize(path);
      subDirs = Directory.GetDirectories(path);
   }
   catch (UnauthorizedAccessException ex)
   {
      errors.Add(ex);
   }

   return fileSize + subDirs.Sum
		(p => SerialRecursiveDirectorySizeErrorHandling(p,errors));
}

类似地,并行递归接收一个 Port 对象(见下面的 outputPort.P1)。相同的异常端口对象传递给所有递归调用。
这里有一个需要注意的地方 - 即使发生错误,该方法也必须将零结果发布到 outputPort(见下面的 outputPort.P0)。这是必要的,因为“Sum”任务没有监听异常端口,它一直等待正好 subDirectories.Length + 1 个结果。

void ParallelDirectorySizeWithErrorHandlerRecursive(
           string directory,
           DispatcherQueue queue,
           PortSet<long,Exception> outputPort)
{
   var subDirectories = new string[] {};
   try
   {
      subDirectories = Directory.GetDirectories(directory);
   }
   catch (UnauthorizedAccessException ex)
   {
      outputPort.P0.Post(0);
      outputPort.P1.Post(ex);                
      return;
   }
   var inputPort = new PortSet<long,Exception>(new Port<long>(), outputPort.P1);
   Arbiter.Activate(queue,
         Arbiter.FromHandler(
                   delegate()
                   {
                         long size = 0;
                          try
                          {
                             size = TotalFileSize(directory);
                          }
                          catch (UnauthorizedAccessException ex)
                          {
                             outputPort.P1.Post(ex);
                          }
                          finally
                          {
                             inputPort.P0.Post(size);
                          }
                    }),
         Arbiter.MultipleItemReceive(
                    false,
                    inputPort.P0,
                    subDirectories.Length + 1,
                    delegate(long[] subDirSize)
                    {
                          outputPort.P0.Post(subDirSize.Sum());
                    }));

   foreach (string subDir in subDirectories)
   {
      ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
   }
}

历史

  • 2008年10月11日:初始版本
© . All rights reserved.