65.9K
CodeProject 正在变化。 阅读更多。
Home

一个改进的 Stream.CopyToAsync() 方法,可报告进度

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.29/5 (4投票s)

2020 年 7 月 24 日

MIT

5分钟阅读

viewsIcon

21647

downloadIcon

316

使用此代码为您的下载或复制添加进度报告

引言

。NET Framework 提供了一个 Stream.CopyToAsync() 方法,该方法支持取消但不支持进度报告,尽管有一个标准的接口 IProgress<T>,可用于报告任务的进度。这使得它不适用于极长的复制操作,或从较慢的流(如 NetworkStream)进行复制,您希望定期更新用户。我的目标是为您提供一个支持进度报告的方法,并解释它的工作原理,包括如何使用它以及如何创建它。这是一个非常简单的项目。演示本身比带有进度报告的 CopyToAsync() 实现更复杂。演示本身,“furl”是一个主要用于从远程站点下载的小工具,但也可以用于在本地复制文件。

概念化这个混乱的局面

我最近爱上了 Task 框架和可等待方法,这促使我深入研究它的黑暗角落并写了一些文章。我决定浮出水面,做一些简单但有用的事情。我们喜欢 Task 框架,对吧?嗯,幸运的是 Stream 在 TAP 模式下提供了可等待的方法,但我们无法传入一个进度对象。进度对象只是一个实现了 IProgress<T> 的类型的实例。这样的类允许任务将进度报告回发起方。如果 CopyToAsync() 接受进度对象,那么它将是使用单个调用进行带有进度报告的长时间下载所需的大部分代码。我们想要的是一个扩展方法,为 CopyToAsync() 创建接受进度对象的重载。

IProgress<T> 接口

此接口提供一个成员,即 Report() 方法。消费者任务会定期调用 Report() 方法来报告任务的进度。如何将进度报告回调用者未定义,但框架提供的基本 Progress<T> 类实现了此接口,同时公开了一个 ProgressChanged 事件,可以挂接到该事件来更新进度。

新的 CopyToAsync() 扩展方法

这些方法提供接受进度对象的重载。为了做到这一点,我们不得不重新实现基本 CopyToAsync() 方法中的功能,以便我们可以在复制循环中报告进度。我们使用异步读写来实现这一点。

编写这个混乱的程序

新的 CopyToAsync() 扩展方法

我们将首先介绍这些,因为它们是项目的心脏。实际上只有一个方法包含代码,因为其余的只是主要方法的重载,而主要方法在这里

/// <summary>
/// Copys a stream to another stream
/// </summary>
/// <param name="source">The source <see cref="Stream"/> to copy from</param>
/// <param name="sourceLength">The length of the source stream, 
/// if known - used for progress reporting</param>
/// <param name="destination">The destination <see cref="Stream"/> to copy to</param>
/// <param name="bufferSize">The size of the copy block buffer</param>
/// <param name="progress">An <see cref="IProgress{T}"/> implementation 
/// for reporting progress</param>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns>A task representing the operation</returns>
public static async Task CopyToAsync(
    this Stream source, 
    long sourceLength,
    Stream destination, 
    int bufferSize, 
    IProgress<KeyValuePair<long,long>> progress, 
    CancellationToken cancellationToken)
{
    if (0 == bufferSize)
        bufferSize = _DefaultBufferSize;
    var buffer = new byte[bufferSize];
    if(0>sourceLength && source.CanSeek)
        sourceLength = source.Length - source.Position;
    var totalBytesCopied = 0L;
    if (null != progress)
        progress.Report(new KeyValuePair<long, long>(totalBytesCopied, sourceLength));
    var bytesRead = -1;
    while(0!=bytesRead && !cancellationToken.IsCancellationRequested)
    {
        bytesRead = await source.ReadAsync(buffer, 0, buffer.Length);
        if (0 == bytesRead || cancellationToken.IsCancellationRequested)
            break;
        await destination.WriteAsync(buffer, 0, buffer.Length);
        totalBytesCopied += bytesRead;
        if (null != progress)
            progress.Report(new KeyValuePair<long, long>(totalBytesCopied, sourceLength));
    }
    if(0<totalBytesCopied)
        progress.Report(new KeyValuePair<long, long>(totalBytesCopied, sourceLength));
    cancellationToken.ThrowIfCancellationRequested();
}

您首先会注意到代码中该方法有很多参数。没关系,因为有几个重载省略了一个或多个参数,但每个都接受一个 progress 参数。

接下来,您会看到我们将 bufferSize 设置为 _DefaultBufferSize(81920,与 CopyToAsync() 的框架版本相同)。然后我们创建一个大小为该值的 buffer。然后,如果 sourceLength 未指定(为 0)并且 source Stream 支持 CanSeek,我们就使用该功能计算复制操作的长度。否则,我们无法报告有界进度 - 我们必须改用报告无界进度。这可能发生在网络上,如果您下载的是通过块发送的内容,没有 Content-Length 响应头,这是很常见的。

接下来,如果 progress 参数不是 null,我们报告初始进度为零。在循环中,当操作被取消或没有更多字节时,循环会终止,我们异步读取到我们之前的缓冲区。然后我们检查是否没有读取到字节或操作是否被取消,在这种情况下我们停止。否则,我们进行写入并更新我们复制的总字节数,我们使用它来进行进度报告。

再次,如果 progress 不是 null,我们报告当前的进度(以字节为单位)。

如果我们实际上复制了任何内容,那么我们报告最终状态。

最后,如果操作被取消,我们抛出异常,让 Task 知道它被取消了。

演示项目 "furl"

如前所述,此项目会将 URL 下载到文件,或将一个文件复制到另一个文件。这很简单,多亏了上面的代码。这是我们的入口点代码,因此您可以看到我们如何发出请求

if(2!=args.Length)
{
    _PrintUsage();
    throw new ArgumentException("Two arguments expected");
}
var url = args[0];
var stopwatch = new Stopwatch();
stopwatch.Start();
if (-1 < url.IndexOf("://"))
{
    var wreq = WebRequest.Create(url);
    using (var wresp = await wreq.GetResponseAsync())
    {
        var httpresp = wresp as HttpWebResponse;
        var sourceLen = -1L;
        if (null != httpresp)
            sourceLen = httpresp.ContentLength;
        // disposed with wresp:
        var src = wresp.GetResponseStream();
        await _CopyToDstAsync(src, args[1],stopwatch);
    }
} else
    using (var src = File.OpenRead(url))
        await _CopyToDstAsync(src, args[1],stopwatch);

我们这里有两个主要的 कोड路径,具体取决于您指定的是文件还是 URL。两者最终都委托给 _CopyToDstAsync() 来进行复制。URL 分支会检查响应是否为 HTTP,如果是,它会查找 Content-Length 头。它将其用作复制操作的总长度。这样,从网络下载至少有时会给您带来有界进度。让我们看一下 _CopyToDstAsync()

// BUG: Progress doesn't always report the last block, so it may not end at 100%
// I'm not sure why
var totalBytes = 0L;
using (var dst = File.Create(path, 81920, FileOptions.Asynchronous))
{
    dst.SetLength(0L);
    var prog = new Progress<KeyValuePair<long, long>>();
    var first = true;
    var i = 0;

    prog.ProgressChanged += delegate (object s, KeyValuePair<long, long> p)
    {
        var str = " Downloaded";
        lock (_Lock)
        {
            if (-1 != p.Value)
            {
                ConsoleUtility.WriteProgressBar((int)(p.Key / (double)p.Value * 100), !first);
                str += string.Format(" {0}kb/{1}kb", p.Key / 1024, p.Value / 1024);
            }
            else
            {
                ConsoleUtility.WriteProgress(i, true);
                ++i;
                str += string.Format(" {0}kb/???kb", p.Key / 1024);
            }
            totalBytes = p.Key;
            first = false;
            Console.Write(str);
            Console.Write(new string('\b', str.Length));
        }
    };
    await src.CopyToAsync(-1, dst, prog);
    stopwatch.Stop();
}
lock (_Lock)
{
    Console.WriteLine();
    Console.WriteLine("Done @ " + 
            Math.Round(totalBytes / 1024d / stopwatch.Elapsed.TotalSeconds) + "kbps");
}

请注意这个 bug。由于一些我找不到原因的原因,有时进度不会报告最后一个块,导致进度报告卡在 99% 左右。请参阅结尾的 Bug 部分。

继续,我们创建一个用于异步 I/O 的文件,然后将长度设置为零。我总是这样做,因为有时框架会打开一个现有文件并将其位置设置为开头,而不是删除它,这意味着如果新文件比旧文件短,多余的字节差额将留在文件末尾。SetLength(0) 确保不会发生这种情况。

接下来,我们创建一个进度对象和一些记账变量。然后我们挂钩刚刚创建的进度对象的 ProgressChanged 事件,并在其中 lock 然后将进度写入 Console。加锁的原因是,如果没有它,控制台不一定会按顺序写入所有内容,导致状态屏幕混乱。

Bug

有一个 bug 我一直没能追踪到,即演示报告的进度有时不会报告最终进度,导致它卡在例如 99%。我不认为这会影响我编写的 CopyToAsync() 方法。我相信 bug 存在于演示项目中,但我不确定在哪里。它是间歇性的。

由于这是在演示应用程序中,并且不是导致程序停止的错误,所以我决定按原样发布。

如果有人发现了这个 bug,请在评论中说明。

谢谢!

历史

  • 2020 年 7 月 24 日 - 首次提交
© . All rights reserved.