探寻流式处理……第 1 部分(共 2 部分)





5.00/5 (6投票s)
探讨流(streams)……流式处理(streaming)……函数式流式处理(functional streaming)……仅此而已
引言
在开始讨论流式处理或任何相关细节之前,我们想对本文中使用的词汇进行说明,以避免混淆。
- 我们将使用“流”(Stream 或 Streams)这个词来讨论能够实现以下功能的底层代码实现(接口、相关类等):
- 通过串联重复使用形成管道
- 转换
byte
序列并将其转发到链中的下一个流
事实上,在讨论这些流实现时,只要能获得期望的结果,我们对其实现的复杂性和来源(例如,是框架的一部分、开源库还是自制方案)都保持中立。实际上,这个假设非常重要,否则我们将无法获得一个真正与上下文无关的 API,该 API 能够支持对底层字节流的几乎任何操作(我们将在下文看到)。
- 我们将使用“流式处理”(Streaming)这个词来描述字节序列通过这样一个基于流的管道的流动过程。
实际上,作为这种声明的一个特例,当管道中的一个流是网络(HTTP)流,并且流动的字节内容是多媒体内容时,我们就得到了多媒体流式处理(或通常所知的流媒体)的实现。因此,重要的是要记住,在我们的讨论中,我们不会将“流式处理”一词的范围局限于多媒体流式处理。
相关知识
流无疑是一种独一无二、为特定目的而生的“猛兽”。虽然流可以被看作是字节序列的生成器,但它通过其动态性展现了真正的力量。让我们来解释一下。事实上,在运行时,任何其他对象,无论是 string
、array
、自定义 class/
struct
对象中的数据,甚至是源代码本身,都不过是内存中某处的字节序列。即使是一个简单的 integer
值也可以被看作是一个4字节的序列(对于.NET中的int32)。此外,可以从这些对象的字节表示中提取子序列来执行一些精细的操作;然而,这样的字节序列缺乏流式处理天生就具有的动态性。而且,在我们的讨论中,当开始讨论数据流式处理时,这种与字节和运行时处理相关的流动概念就变得非常方便。接下来,我们将**不仅**努力详细解释这一现象,**而且**将提出一个全新的 API 来处理数据流式处理的需求。
一听到“流式处理”这个词,脑海中就会浮现出许多画面,比如在线观看视频、观看活动直播、在线听喜欢的歌曲等。我们几乎立刻就会把多媒体内容(如音频和视频)与这个词联系起来。因此,对我们来说,转换思路并设定一个讨论平台非常重要。为此,我们首先想通过扩展我们定义中的流式处理范围,来个性化“流式处理”的定义,可以简单地写成:
“连续地发送/传输数据1,可能以大小可变的二进制数据块形式进行;同时,允许接收端2在可能的情况下,以独立的方式(即不缓冲数据1)持续处理这些数据块。”
1术语“数据”在这里是上下文相关的。对我们来说,它指的是构成整个数据集的任何内容,例如,整个视频、仅1秒钟的视频剪辑、一个简单的“Hello World!
”字符串或一个永无止境的数据序列。
2术语“接收端”用于标识串联中的下一个流。
有了这样一个定义:
- 我们更感兴趣的是数据的
BYTE
格式,而不是媒体类型。因此,我们希望处理任何可以立即以字节形式获得或可转换为字节(无论转换的复杂性如何)的数据。 - 我们希望连续传输数据,即数据一可用就传输,并且可能以数据块的形式传输。因此,我们力求在流式处理过程中的任何时候都不在内存中缓冲整个数据集。
- 我们对底层协议/API 不做要求,只要我们能够连续发送这些数据块即可。
- 我们希望设计一种能够支持任何此类任意数据端到端处理的方案/框架/机制。
- 我们对接收方不做要求,只要它能够接受这些数据块(即,无论其数据处理能力如何)。
实现说明
- 从理论角度来看,本文具有通用性,可能对多种语言/框架有效;然而,我们用 C# .NET 实现了我们的想法,并将在整个讨论中提供一些 .NET 代码片段。
- 想要在 Visual Studio 中按原样编译所附源代码的读者,应确保已安装 .NET Framework 4.7.2 SDK,并且 C# 语言版本为 7.1 或更高(如 MSDN 博客中所述)。
注意:本文中提供的数据统计是在以下系统配置下获得的。
先说原因
在尝试理解为什么我们想到这样的实现之前,我们应该先了解我们现有的工具。假设我们有两个 Stream
实例 _readableStream
和 _writableStream
;顾名思义,我们可以从 _readableStream
读取数据,并向 _writableStream
写入数据。进一步假设,我们手头有一个简单的任务,需要将数据从 _readableStream
复制到 _writableStream
。大多数语言/框架提供以下(或多或少)的实现方式来实现它:
/////////////////////
//// PSUEDO CODE ////
/////////////////////
//define some temporary byte array as buffer
byte[] buffer = new byte[buffer_size];
//continuously read from readable stream
while ((readLength = _readableStream.read(buffer)) > 0)
{
//write on writable stream as long as we read at least 1 byte
_writableStream.write(buffer, 0, readLength);
}
从上面的代码片段中,我们注意到通过使用固定大小的缓冲区(通常为几KB大小),我们实现了这种流到流的复制。复杂度与流的长度成线性关系,而且我们不消耗太多空间;这很合理。
但等一下,我们在这里做了一个假设,即流与 I/O 设备(特别是 _writableStream
)相关联,如文件、网络等。但是,当我们的 _writableStream
是一个内存流(C# .NET 中的 MemoryStream
)时会发生什么?这时,我们立即增加了空间复杂度。如果两个流(_readableStream
和 _writableStream
)都是内存流呢?那么空间需求就翻倍了。
但我们为什么如此关心这个问题呢?简单来说,内存虽然便宜,但不是免费的,也不是无限的。尽管如此,原因并非如此简单。因此,为了避免赘述,作者邀请读者阅读一篇由 Doug Duerner 和 Yeon-Chang Wang 撰写的优秀文章,题为“堆还是不堆;这是个大对象问题?”,以了解与大对象(如字符串、列表或一般数组)相关的空间复杂度增加的细节。
总的来说,减少运行时内存是我们的第一个原因。同样,我们的下一个原因是延迟,通过在复制操作期间重用(一次性分配的)相同缓冲区,可以减少延迟,而无需花费宝贵的 CPU 时间来重新调整大小/复制字节数组(在内存中)来缓冲全部数据。
虽然通常较少被提及,我们的下一个原因是代码组织(例如,可读性、可测试性、关注点分离等);我们的目标是准备一个直观且富有表现力的 API 来执行流式操作。此外,我们希望在我们的 API 中嵌入某种人工智能,以允许我们在流链中实现运行时可塑性(即,管道的条件化组装)。最终,我们希望能够自由地构建管道,对运行中的字节块执行任意操作(通配符),同时不失其相关优势。事实上,我们将构建一些特定的流式操作来展示这种通配符能力。
务实一点
如果你一直跟着我们到这里,你可能会争辩说,流式处理在常规应用程序中并非那么常用,甚至大多数应用程序也只是进行文件读/写。我们不能对此争辩,因为这是基于经验的论点。然而,以下非详尽列表确实提供了流式处理的用例:
- WebAPI
- Base64 转换
- 对象序列化
- 数据加密
- 数据压缩
- 哈希计算
- 文件处理……等等……
衡量一个简单任务的性能
在深入细节之前,让我们从一个简单的例子开始。假设我们手头有以下任务:
定义给定一个二进制文件的路径,读取其所有字节。首先,使用 GZip 压缩算法对其进行解压缩,然后使用 JSON 序列化器将数据反序列化为一个定义好的对象数组(即 List<T>,其中 T 是已知的)。
从上面的陈述中,我们可以识别出三(3)个不同的操作,即:
- 从给定文件中读取所有字节
- 使用 GZip 算法解压缩这些字节
- 使用 Json 序列化器,从解压缩的字节中创建
List<T>
(T
是已知的,或者是泛型占位符,这无关紧要)
为了保持代码可读性并忽略任何性能/代码优化(暂时),我们考虑实现以下三(3)个函数:
public byte[] PullAllBytesFrom(FileInfo file)
{
return File.ReadAllBytes(file.FullName);
}
public byte[] DecompressUsingGzip(byte[] compressedBytes)
{
var unzippedData = new MemoryStream();
using (var unzipper = new GZipStream(new MemoryStream(compressedBytes),
CompressionMode.Decompress, false))
{
unzipper.CopyTo(unzippedData);
}
return unzippedData.ToArray();
}
public List<T> DeserializeAs<T>(byte[] data)
{
// ===> Using Newtonsoft.Json (we will call it with T = List<T>)
return JsonConvert.DeserializeObject<T>(new UTF8Encoding().GetString(data));
}
我们本可以用其他方式编写代码,但是,我们将这三(3)个操作分开创建的原因是后续讨论的主题,我们将在那里详细说明。目前,我们只想关注以下代码的性能:
////////////////
//// CODE ID 1
//// We will use this ID as reference below during our discussion
///////////////
public List<T> DeserializeListFrom<T>(FileInfo compressedJsonFile)
{
var fileBytes = PullAllBytesFrom(compressedJsonFile);
var uncompressedBytes = DecompressUsingGzip(fileBytes);
return DeserializeAs<List<T>>(uncompressedBytes);
}
如果你运行类似编写的“DeserializeListFrom
”代码(如果你已经从本文下载了附加的源代码,可以运行 PerfCompareNonStreamingWithStreamingAsync
方法),你将从 Visual Studio 诊断工具中看到以下类似的性能图(注意:API 方法是我们的实现,也是本次讨论的主题,而 DeserializeListFrom
是与上面代码片段中所示类似编写的方法):
观察这张图,我们看到在代码执行期间发生了一个耗费内存的操作,并且字节数组可能被重新分配了多次(因此,也重新复制了)。总的来说,很明显我们有机会在内存方面取得巨大优势,并在 CPU 时间上也有显著提升。因此,了解了问题所在,我们可以进一步深入研究。
定义目标
基于我们到目前为止的讨论,我们希望:
- 避免使用内存缓冲区来改善运行时内存
- 只使用必要的固定大小缓冲区
- 能够创建高效的端到端(从源到目标)管道(操作链)
- 创建一个提供以下特性的 API:
- 可组合性:操作的组合
- 可读性:组合是声明性的
- 可维护性:为每个底层组合操作推行单一职责原则
- 弹性:对任何特殊的和/或常规的数据处理需求开放
- 可重用性:允许以确定性的方式对组合链进行运行时变动
本文的其余部分将介绍我们为实现上述目标所做的工作。
流的概述
从表面上看,所有流看起来都差不多,很难将它们分门别类。然而,为了充分利用流式处理的能力,我们确实需要了解这些流实现的不同特性。
单向与双向
幸运的是,在 .NET 中,存在一个为流(Streams)定义良好的接口(在 System.IO
命名空间内,Stream
被定义为 Abstract
类),并且所有的流实现都继承自它。我们来仔细看看它的一些功能,如下所示:
// from https://referencesource.microsoft.com/#mscorlib/system/io/stream.cs
public abstract class Stream : MarshalByRefObject, IDisposable
{
public abstract bool CanRead { get; }
public abstract bool CanWrite { get; }
public abstract int Read(byte[] buffer, int offset, int count);
public abstract void Write(byte[] buffer, int offset, int count);
/* ...
* Other methods and properties
* ...
*/
}
因此,除了 Read
和 Write
方法外,Stream
还公开了 CanRead
和 CanWrite
的布尔值;因此,如果一个流支持 Read
操作,它应该为 CanRead
返回 true
,同样,如果它支持 Write
操作,它应该为 CanWrite
返回真值。事实上,并非所有流的实现都对这两个属性返回 True
。因此,我们可以说,当一个流要么是 Readable
要么是 Writable
(但不是两者兼有)时,它是单向的(例如,具有读取权限的 FileStream
);同样,当它同时是 Readable
和 Writable
时,它是双向的(例如,writable=true
的 MemoryStream
)。
开放式与封闭式
一些流的实现实际上是封闭的,因为它们绑定到目标设备;例如,FileStream
绑定到磁盘上的物理位置。另一方面,一些流的实现是开放的(无关的),对于读或写操作所涉及的目标,即它们操作于抽象之上(例如,.NET 中的抽象类 Stream
)。这样的流通常在构造时(即构造函数调用时)需要一个 Stream
的实例;例如,GZipStream
的构造函数在解压缩/压缩期间分别接受另一个 Stream
的实例进行读/写操作,但它并不关心给定的 Stream
是 MemoryStream
还是 FileStream
。尽管给出的解释(和流的分类)看起来很简单,但它使我们能够在流式处理期间创建一个链(管道)。
事实上,正如我们稍后将看到的,基于流的这些独特特性,我们提出的 API 能够在不依赖于两个独立流式操作之间的中间完整数据缓冲的情况下,串联地创建流式操作链。
MemoryStream 的特性
注意:下面列出的问题或多或少同样适用于 Byte[]
(字节数组)和 List<Byte>
(字节列表)。
MemoryStream
有其独特之处。其底层是一个简单的 Byte
数组,在写入操作期间会调整其容量(其方式类似于 List<Byte>
,即分配一个更大的数组并从现有数组中重新复制字节),在读取操作期间则遍历该数组。虽然目前的实现工作得很好,但这些数组(缓冲区
)的调整大小操作确实给 CPU 带来了一些压力(内存分配/数据复制)。如果涉及的数据(总字节数)很大,这些操作可能会显著影响性能。虽然很难用一个单一的数字指出数据大小的限制,但一旦数组大小达到85000字节,我们就会触及大对象堆(LOH),任何(在写入操作期间)调整此数组大小以获得更大容量的新调用都将最终处理 LOH。简而言之,一旦 MemoryStream
参与任何与流相关的操作,就应该小心。
我们已经在上面(在“先说原因”标题下的伪代码中)看到,流到流的复制采用固定大小的缓冲区,并在其迭代复制操作中重用该缓冲区(字节块受限于缓冲区容量),而不是使用 MemoryStream
(从源流中读取所有内容到内存,然后将所有内容写入目标流)。此外,我们知道 MemoryStream
是非线程安全的,不可能同时对其进行读写。虽然完全可以创建一个新的线程安全的内存流版本,但这种艰苦的努力未必能带来回报;特别是在写入该内存流的写入器比同一流上的相关读取器快得多的情况下(内部数组最终会增长并可能造成性能瓶颈)。因此,我们认为任何内存中的数据缓冲(超出常规流操作所需的固定大小缓冲区)对于面向流的 API 都是无益的。而且,我们稍后将讨论,我们的方法避免了这种内存中字节数组的使用。
数据流
粗略地说,根据数据源/目标的性质,流式数据流可以列为:
- 一种字节表示到另一种字节表示(例如,文件中的文本数据到一个压缩文件)
- 内存数据结构到字节表示,即序列化 + 一些额外的流处理(例如,将.NET类实例进行json序列化到一个硬盘上的加密文件)
- 从字节表示到内存数据结构,即反序列化 + 一些额外的流处理
基于这些数据流,我们确定了在流式处理期间最常遇到,并可能导致那些不必要的性能问题的数据结构:
string
:通常在序列化、文件读取、字符串连接、Base64 操作等过程中获得。byte[]
:通常从字符串编码、使用内存流、文件读取等获得。MemoryStream
:通常由于流管道未对齐而出现。T[]
或List<T>
或任何类似的对象集合,其中T
是一个已知的可序列化对象:通常是序列化/反序列化操作的目标。
此外,我们确定了最常见的流式操作(也作为框架的一部分提供):
- 文件处理
- 字节编码
- 压缩
- 哈希计算
- Base64 转换
- 加密/解密
最后,我们还识别出以下几个常见需求:
- 流扇出(Stream Fan-out):当一个给定的流需要被输入到多个目标时,例如,为了通过冗余方式维持数据可用性,相同的数据被复制到多个文件流和/或发送到远程服务等。
- 流长度(Stream Length):当关注的是根据所选的编码和处理方式获取数据的字节数时。
总的来说,如果我们将所有这些想法累积起来准备一个思维导图,我们会得到以下简单的插图:
撇开学术兴趣不谈,流式处理的存在在以下情况下才有意义:
- 数据被持久化(例如,作为硬盘上的文件)以及当持久化的数据被消费时
- 数据被传输到另一个实体/进程(例如,从服务器到客户端)
无论用例如何,数据流都可以被建模为数据发布方(即数据生产者)在一端推送数据,而数据消费方在另一端拉取数据。根据数据交换的性质,消费者可以并发运行或顺序运行。例如,在http通信的情况下,当发送方在分块传输编码期间向网络写入数据时,接收方可以同时恢复有效载荷数据;而当发送方将数据写入文件时,接收方(在没有同步的情况下)只有在文件被持久化后才能消费该文件。其次,在这些数据推送过程中,每当发送方应用任何数据转换时,消费者通常需要以相反的顺序应用逆转换以获得原始数据。这些是常见的流式处理场景,其中性能可以得到优化。下图阐明了同样的想法。
因此,我们从上面看到,发送方的所有数据转换操作(由 OP- 表示)都有相应的逆转换(由 INV-OP- 表示),且顺序相反(即,如果发送方在 OP-2 之前应用 OP-1,则接收方在 INV-OP-1 之前应用 INV-OP-2)。因此,假设发送方首先将数据序列化为 json,然后应用 GZip 压缩,那么为了恢复等效的原始数据内存表示,接收方首先应用 GZip 解压缩,然后反序列化 JSON 数据。
注意
一些数据转换本质上是不可逆的,即一旦数据被转换,理论上就不可能获得原始数据;例如,密码学哈希计算。但当然,如果目的只是将数据的哈希值发送给接收方,那么就已经假定接收方不需要原始数据。因此,对于这些类似的情况,上面所示的逆向链在接收方将不存在,但流式处理仍然可以带着其所有好处被使用;例如,获取数据在其目标字节表示形式下的哈希值。
走向实现
我们在上面注意到,为了实现我们的简单任务(在“衡量一个简单任务的性能”标题下),我们编写了三(3)个不同的函数。拥有这些专用函数的想法是为了实现可组合性,例如,如果我们有一个新的功能需求,要求从 json 文件中读取并获得一个已知的对象(不一定是列表),那么最终的代码可能看起来像这样:
public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
var uncompressedBytes = PullAllBytesFrom(uncompressedJsonFile);
return DeserializeAs<T>(uncompressedBytes);
}
因此,在不重写/修改/破坏现有代码的情况下,我们立即(几乎)交付了一个功能。坦白说,这种简单的实现非常符合 SOLID 原则(单一职责)。然而,如果我们注意到,表面上看,这些实现看起来是无害的,但随着我们增加文件字节大小,我们就会发现相关问题。我们意识到“File.ReadAllBytes
”分配的字节数组大小与文件大小成正比,但较少被提及的是,在内部,“File.ReadAllBytes
”仍然使用类似的缓冲区复制循环,正如一开始在伪代码中所示。一旦我们意识到使用那些“固定大小缓冲区复制循环”是流式处理的优势之一(也许是最不被理解/欣赏的),我们就能欣赏接下来的所有内容。
意识到这个问题后,我们可能会想用以下方式更改代码(以提高性能):
public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
using(var fileStream = new FileStream(uncompressedJsonFile, ...))
{
using(var textReader = new TextReader(fileStream, ...))
{
//JsonReader of Newtonsoft.Json
using(var jsonReader = new JsonReader(textReader, ...))
{
//... your serialization code ...//
}
}
}
}
但是,通过这段代码,我们再次意识到我们使代码复杂化了。我们不仅降低了可读性,破坏了可维护性(在某种程度上),而且还增加了不必要的代码冗余;也就是说,使用这种嵌套的“using
”调用,我们需要重写代码的某些(主要)部分两次(一次如上所列,另一次是带 GZip 压缩的)。事实上,从长远来看,我们会发现每次编写任何与流相关的代码时(在同一项目内和跨多个项目),我们都会产生冗余。最重要的是,这样的实现既不可组合,也无弹性,更不可重用(注意:广义上的可重用)。
可视化实现
阅读/解释如此冗长的文本绝非易事,因此,在这里我们努力提供一些插图来理解上述文字。让我们首先尝试理解在我们的“代码 Id 1”(来自上面的“衡量一个简单任务的性能”标题)运行时,在不同时间点会发生什么。
为简单起见,我们假设以下时间尺度:
- 在
T = 0
时,我们称之为起始点,此时我们假设内存使用量为Nil
(零),代码执行正等待执行“PullAllBytesFrom
”这一行。 - 在
T = t1
时,代码“var fileBytes = PullAllBytesFrom(compressedJsonFile);
”已成功执行,我们的fileBytes
变量持有对字节数组的引用。 - 在
T = t2
时,代码“var uncompressedBytes = DecompressUsingGzip(fileBytes);
”已成功执行,我们的uncompressedBytes
变量持有对未压缩字节数组的引用。 - 在
T = t3
时,对“DeserializeAs<List<T>>(uncompressedBytes);
”的调用已完成。
我们做出以下假设以避免可视化中的复杂性:
- GC(垃圾回收器)在 t3 之前不运行
- 框架内部使用的缓冲区大小相对较小,因此可以在可视化中忽略
- 我们可以使用线性近似来进行内存分配/重新分配
- 假设文件大小为 1 MB,而解压缩后的数据和
DeserializedList
各自需要 2 MB。
注意:在T=t3时,我们假设最终的 return 语句将执行,GC 将会发生,内存减少到 2 MB(由列表持有)。
基于以上假设,下面是一个近似的可视化图:
即使在这张忽略了内存浪费(由于重新分配/复制)的简单图片中,我们也能清楚地观察到我们不必要地消耗了高达 5 MB 的内存(在 t3 时达到峰值);而目标状态仅消耗 2 MB 的内存。认识到这一事实后,就很容易设想出理想的目标状态,这与以下可视化图一致:
比较图4和图5,我们立刻就能明白我们希望获得的收益。现在,我们可以实际讨论有助于我们实现目标的实现细节了。
行百里者半九十…
到目前为止,我们已经讨论了与流操作和代码实现相关的问题,并确定了目标。总的来说,我们已经收集了材料,并希望基于这些材料来阐明我们在实现过程中所做的选择。
我们决定将整篇文章分为两部分。我们不想仅仅展示实现,而是想详细阐述其背后的“为什么”。我们仍在撰写文章的第 2 部分以完成我们的讨论,我们希望听到读者的评论,以便提高材料的质量。
我们也邀请读者提前查看附加的示例代码源和实现。