65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Directory.EnumerateFiles 进行批量处理

starIconstarIconstarIconstarIconstarIcon

5.00/5 (10投票s)

2021年3月28日

CPOL

3分钟阅读

viewsIcon

13558

批量处理是一种很好的技术,可以优雅地处理大量数据。Directory.EnumerateFiles 是允许你为包含大量文件的目录组织批量处理的 API。

引言

如果想要从目录中检索文件,Directory.GetFiles 是一个简单的答案,足以满足大多数场景。但是,当处理大量数据时,你可能需要更高级的技术。

示例

假设你有一个大数据解决方案,你需要处理包含 200000 个文件的目录。对于每个文件,你提取一些基本信息。

public record FileProcessingDto
{
    public string FullPath { get; set; }
    public long Size { get; set; }
    public string FileNameWithoutExtension { get; set; }
    public string Hash { get; internal set; }
}

请注意,我们在这里方便地使用了新颖的 C# 9 记录类型 作为我们的 DTO。

之后,我们将提取的信息发送到进一步处理。让我们用以下代码片段来模拟它。

public class FileProcessingService
{
    public Task Process(IReadOnlyCollection<FileProcessingDto> files, 
                        CancellationToken cancellationToken = default)
    {
        files.Select(p =>
        {
            Console.WriteLine($"Processing {p.FileNameWithoutExtension} 
                              located at {p.FullPath} of size {p.Size} bytes");
            return p;
        });

        return Task.Delay(TimeSpan.FromMilliseconds(20), cancellationToken);
    }
}

现在,最后一步是提取信息并调用服务。

public class Worker
{
    public const string Path = @"path to 200k files";
    private readonly FileProcessingService _processingService;

    public Worker()
    {
        _processingService = new FileProcessingService();
    }

    private string CalculateHash(string file)
    {
        using (var md5Instance = MD5.Create())
        {
            using (var stream = File.OpenRead(file))
            {
                var hashResult = md5Instance.ComputeHash(stream);
                return BitConverter.ToString(hashResult)
                    .Replace("-", "", StringComparison.OrdinalIgnoreCase)
                    .ToLowerInvariant();
            }
        }
    }

    private FileProcessingDto MapToDto(string file)
    {
        var fileInfo = new FileInfo(file);
        return new FileProcessingDto()
        {
            FullPath = file,
            Size = fileInfo.Length,
            FileNameWithoutExtension = fileInfo.Name,
            Hash = CalculateHash(file)
        };
    }

    public Task DoWork()
    {
        var files = Directory.GetFiles(Path)
            .Select(p => MapToDto(p))
            .ToList();

        return _processingService.Process(files);
    }
}

请注意,在这里,我们以一种简单的方式操作,并通过一次性调用Directory.GetFiles(Path)提取所有文件。

但是,一旦你通过以下方式运行此代码:

await new Worker().DoWork()

你会注意到结果远不令人满意,并且应用程序正在大量消耗内存。

Directory.EnumerateFiles 来救援

Directory.EnumerateFiles 的特点是它返回 IEnumerable<string>,从而允许我们逐个获取集合项。反过来,这可以防止我们在一次加载大量数据时过度使用内存。

但是,你可能已经注意到,FileProcessingService.Process 中有编码的延迟(我们用简单的延迟模拟某种 I/O 操作)。在现实场景中,这可能是对外部 HTTP 端点的调用或与存储的交互。这使我们得出结论,调用 200,000 次 FileProcessingService.Process 可能会效率低下。这就是为什么我们要一次将合理数量的数据批量加载到内存中的原因。

修改后的代码如下所示

public class WorkerImproved
{
    //omitted for brevity

    public async Task DoWork()
    {
        const int batchSize = 10000;
        var files = Directory.EnumerateFiles(Path);
        var chunks = files.Chunk(batchSize);
        foreach (var chunk in chunks)
        {
            var filesToProcess = chunk.Select(file => MapToDto(file)).ToList();
            await _processingService.Process(filesToProcess);
        }
    }
}

在这里,我们使用 .NET 7 中出现的 LINQ Chunk 方法为我们进行批量处理。它返回 IEnumerable<string>,因此我们不是一次处理所有文件,而是在给定的时间点处理合理数量的文件。

评估版

Benchmark.NET 生成的结果相当令人信服

关于批量处理的一些话

在这篇文章中,我们简要介绍了软件工程中的常见模式。合理数量的批量处理可以帮助我们避免逐个处理的 I/O 惩罚以及一次将所有项目加载到内存中造成的过度内存消耗。

通常情况下,在对多个项目进行 I/O 操作时,你应该努力使用批量 API。并且一旦项目数量变多,你应该考虑将这些项目分成批次。

关于返回类型的一些话

在处理代码库时,我经常看到类似以下的代码

public IEnumerable<int> Numbers => new List<int> { 1, 2, 3 };

我认为这段代码违反了波斯特原则,随之而来的是,作为属性的使用者,我无法弄清楚我是否可以逐个枚举项目,或者它们是否只是一次性加载到内存中。

这就是我建议更具体地说明返回类型的原因,即

public IList<int> Numbers => new List<int> { 1, 2, 3 };

结论

批量处理是一种很好的技术,可以优雅地处理大量数据。Directory.EnumerateFiles 是允许你为包含大量文件的目录组织批量处理的 API。

历史

  • 2021年3月28日:初始版本
  • 2021年8月12日:使用新的 Chunk 方法替换批量处理代码
© . All rights reserved.