寻找流...第 2 部分(共 2 部分)





5.00/5 (2投票s)
讨论流...关于流...关于函数式流...以及其他一切。
本文完成了我们 在第一部分开始的讨论。在此之前,我们讨论了流的几个用例,并通过一个示例展示了一些性能问题。我们看到了实现不当的流管道如何导致内存消耗失控。此外,我们可视化了此类管道中的数据流,并确定了实现的最终目标。确定的目标可以概括为:
- 避免使用内存缓冲区以提高运行时内存使用效率
- 仅使用必需的固定大小缓冲区
- 能够创建从端到端(源到目标)的高效管道(操作链)
- 创建一个提供以下功能的 API:
- 可组合性:操作的组合
- 可读性:组合是声明式的
- 可维护性:为每个底层组合操作促进单一职责原则
- 弹性:开放以满足任何奇特和/或常规的数据处理要求
- 可重用性:允许在确定性的方式下对组合链进行运行时突变
<< 寻找流 - 第 1 部分
在本文中,我们将根据既定目标详细阐述实现方式,以及此类实现的几个战略性用例。以下是目录:
基本构建块
在第一部分中,我们展示了通过 使用嵌套的“using”块 以级联方式传递流来实现效率的一种可能方法。实际上,我们不知不觉地利用了调用链中中间流的开放式特性。我们知道这种流的级联可以提高运行时性能,但它也存在许多其他问题。让我们快速调查其中一些问题。假设我们有一个处理文件读取、解压缩以及从 JSON 文本转换为对象 <T> 的流级联;如下所示:
public T StreamCascade<T>(FileInfo jsonFile, ...)
{
using(var fileStream = new FileStream(uncompressedJsonFile, ...))
{
using(var textReader = new TextReader(fileStream, ...))
{
//JsonReader of Newtonsoft.Json
using(var jsonReader = new JsonReader(textReader, ...))
{
//... your serialization code ...//
}
}
}
}
我们希望能够完全控制其行为,例如:
- 条件式 JSON 反序列化:有时我们希望将 JSON 字符串反序列化为已知对象类型,否则仅收集 JSON 字符串本身,即:
public object DeserializeObjectFrom(FileInfo uncompressedJsonFile)
{
using(var fileStream = new FileStream(uncompressedJsonFile, ...))
{
using(var textReader = new TextReader(fileStream, ...))
{
if (deseralize)
{
//JsonReader of Newtonsoft.Json
using(var jsonReader = new JsonReader(textReader, ...))
{
//... your serialization code ...//
}
} else {
return textReader.ReadToEnd();
}
}
}
}
现在,我们立即开始认识到这种构造的局限性。一些观察结果是:
- 由于存在条件式 `if`,返回类型现在是“
object
”,而不是具体的<T>
类型。 - 条件式 `if` 本身又增加了一个嵌套级别。
现在,为了进一步论证,我们希望同时处理压缩和未压缩的文件路径(即使解压缩也成为条件)。此外,我们希望将所有配置参数暴露给我们的函数,以控制序列化、缓冲区大小、取消、字符编码、文件流参数等的行为。
最重要的是,我们希望添加 base64 操作(也可能是条件式的!)和/或加密支持?总之,我们想让这个函数变得**更丑陋**!
根据一些实验,我们认识到为了实现性能,我们要么创建专用的“using
”级联并复制代码,要么创建一些不可读的复杂代码片段。我们决定这些实现对我们来说是不可接受的;因此,我们决定创建我们的 API。但在我们看它之前,让我们先介绍一些文献。
函数式编程
函数式编程是一种编程范式,其解释超出了本文的范围。在这里,我们将仅简要介绍 头等函数 和 高阶函数,因为它们对我们的实现都很重要。幸运的是,在 C# .Net 中,我们有 函数作为头等公民 的概念、委托的概念、预定义的 Action<>
和 Func<>
委托的存在、使用这些预定义委托定义 Lambda
的便利性。最重要的是,由于这些 Action<>
和 Func<>
委托是通用的,创建高阶函数只是为那些泛型 <T>
(即类型占位符)分配正确类型的问题。让我们考虑一个假设的例子来理解用法。
问题陈述生成一个正整数。如果是奇数,则乘以 2。将结果打印到控制台。
public void GeneratePositiveEvenRandomInteger()
{
var randomGenerator = new Random();
var nextRandom = randomGenerator.Next(0, int.MaxValue/2);
if (nextRandom % 2 == 1)
{
Console.WriteLine(nextRandom * 2);
}
Console.WriteLine(nextRandom);
}
很好。现在,让我们看看如何使用 Function
实现相同的功能。
//this is our First-class function
public int MultiplyByTwo(int value)
{
return value * 2;
}
//our higher-order function that accepts Odd-To-Even delegate as parameter
public int GeneratePositiveEvenRandomInteger(Func<int, int> oddToEvenFunc)
{
var randomGenerator = new Random();
var nextRandom = randomGenerator.Next(0, int.MaxValue/2);
return nextRandom % 2 == 1 ? oddToEvenFunc(nextRandom) : nextRandom;
}
//somewhere else in the code:
//Passing our First-Class function in our Higher-Order function
var results = GeneratePositiveEvenRandomInteger(MultiplyByTwo);
Console.WriteLine(results);
好的!我们仍然看不到任何好处;事实上,乍一看它可能看起来令人不知所措。但请稍等,请注意:
MultiplyByTwo 逻辑已从整个逻辑中分离出来。这意味着如果明天我们需要更改“奇数处理”的逻辑,那么更容易只修改此函数而保留其余逻辑不变(关注点分离)。此外,单元测试这些函数也将更容易!
然而,这可能仍然不令人信服,对吧!让我们根据以下观察进一步扩展这个想法:
- GeneratePositiveEvenRandomInteger 正在做几件事:
- 它在每次调用时创建一个新的
Random
实例。 - 它正在检查
nextRandom
值的奇偶性。 - 根据奇偶性返回相应的值。
- 它在每次调用时创建一个新的
在我们的例子中分离关注点是我们下一步。
闭包
谈论函数式编程而不讨论 闭包 将是遗憾的(您可以参考 这篇文章 以获取一些有趣的细节)。大多数时候,您会看到一个匿名函数(或 lambda)作为示例来演示闭包的概念。让我们从类似的事情开始。假设为了拥有我们的 randomGenerator
(来自上面的代码),我们创建一个工厂函数来包装(闭合)Random()
实例,即:
//our random integer generator
public Func<int> RandomIntFactory(int min, int max)
{
var randomGenerator = new Random();
return () => randomGenerator.Next(min, max);
}
//somewhere else in the code:
var randomIntFactory = RandomIntFactory(0, int.MaxValue/2);
//now, we say randomIntFactory has a closure (or closed over) randomGenerator, min and max
每次我们调用 randomIntFactory()
(注意名称后的括号“()”)时,我们都会得到一个由“randomGenerator
”(同一个实例!)创建的新随机数。为了更好地理解所有这些魔法是如何工作的,我们可以像下面这样修改上面的代码(注意:下面的代码只是传达了这个想法,绝不代表 .Net 代码在执行时的确切运行时行为):
public class ClosureClass()
{
public Random _random;
public int _min;
public int _max;
public ClosureClass(Random random, int min, int max)
{
_random = random;
_min = min;
_max = max;
}
public int NextRandom()
{
return _random.Next(min, max);
}
}
//somewhere else in the code:
//our random integer generator
public Func<int> RandomIntFactory(int min, int max)
{
var closure = new ClosureClass(new Random(), min, max);
//add we return the first-class function of our closure class
return closure.NextRandom;
}
现在,借助头等函数(“NextRandom
”),可以清楚地看到这些值如何在 RandomIntFactory
函数的作用域之外神奇地保留下来,以便进行下一次随机值生成!所以魔法是我们仍然持有对类实例方法的引用,该方法又链接到类实例本身,而实例本身又持有所需的数据(一切现在都已连接并且有意义了!)。
注意: 以后,我们将只在代码中使用匿名函数/lambda,并享受 .Net 本身提供的闭包支持。
遵守规则
在涵盖了闭包之后,我们可以再次讨论我们未完成的 GeneratePositiveEvenRandomInteger
的故事。让我们为每个事物创建一个函数(即,1 个函数对应 1 个职责):
//our random integer generator
public Func<int> RandomIntFactory(int min, int max)
{
var randomGenerator = new Random();
return () => randomGenerator.Next(min, max);
}
//oddness check
public bool IsOdd(int value)
{
return value % 2 == 1;
}
//twice of value
public int MultiplyByTwo(int value)
{
return value * 2;
}
//Identity function
public int Identity(int value)
{
return value;
}
//higher-order function that applies lamdas, on generated value,
//based on predicate
public int GenerateNumberAndApply(Func<int> factory,
Func<int, bool> predicateFactory,
Func<int, int> whenTrue,
Func<int, int> whenFalse)
{
var value = factory();
return predicateFactory(value) ? whenTrue(value) : whenFalse(value);
}
////////////
//somewhere else in the code:
////////////
var factory = RandomIntFactory(0, int.MaxValue/2);
Func<int> generatePositiveEvenRandomInteger = () => GenerateNumberAndApply(factory,
IsOdd,
MultiplyByTwo,
Identity);
//Et voilaaaa...!
Console.WriteLine(generatePositiveEvenRandomInteger());
每次我们调用 generatePositiveEvenRandomInteger
()
(注意名称后的括号“()”)时,我们都会得到一个新生成的随机偶数。好的!但是除了每个函数都变成单行之外,我们仍然看不到任何显著的 genius,对吧!但是,请注意,我们获得了两个(2)非常受欢迎的特性:可维护性和可重用性!
为什么需要可维护性?
虽然与我们原始的 GeneratePositiveEvenRandomInteger 代码 相比,它看起来更长;但我们应该欣赏这样一个事实,即所有函数现在都可以轻松地在其自身隔离的关注范围内进行测试。考虑一种情况,其中所有涉及的函数(即数字工厂、谓词工厂以及 whenTrue 和 whenFalse 逻辑)本质上都很复杂;在这里,我们实现了真正的关注点分离,即使是复杂的工件,我们也可以轻松管理它们复杂的相互作用。最重要的是,考虑 GenerateNumberAndApply
的以下签名,带有泛型(即 <T>
):
//generic higher-order function that applies lamdas, on generated value,
//based on predicate
public T GenerateNumberAndApply<T>(Func<T> factory,
Func<T, bool> predicateFactory,
Func<T, T> whenTrue,
Func<T, T> whenFalse)
{
var value = factory();
return predicateFactory(value) ? whenTrue(value) : whenFalse(value);
}
////////////
//somewhere else in the code:
////////////
var factory = RandomIntFactory(0, int.MaxValue/2);
Func<int> generatePositiveEvenRandomInteger = () => GenerateNumberAndApply<int>(factory,
IsOdd,
MultiplyByTwo,
Identity);
//Et voilaaaa...!
Console.WriteLine(generatePositiveEvenRandomInteger());
现在,我们可以使用任何通用类型,它需要完全相同的条件处理;只需传递兼容的函数即可享受!另请注意,此函数已成为库函数的有力候选!
为什么需要可重用性?
突然,我们得到了以下新需求:
问题陈述生成一个正整数。如果是偶数,则加 1。将结果打印到控制台。
现在,为了满足这个新需求,我们需要进行以下小改动:
//twice of value
public int AddOne(int value)
{
return value + 1;
}
////////////
//somewhere else in the code:
////////////
var factory = RandomIntFactory(0, int.MaxValue/2);
Func<int> generatePositiveOddRandomInteger = () => GenerateNumberAndApply<int>(factory,
IsOdd,
Identity,
AddOne);
//Et voilaaaa...!
Console.WriteLine(generatePositiveOddRandomInteger());
因此,与其重写一个全新的函数,仅仅为了进行这个小改动,我们编写了一个新的单行函数并重用了现有的工件!
不信服!
是的,我们也同意!为了举例说明,这是一个很好的练习,但实际上,它并没有开箱即用地反映问题陈述的功能!也就是说,它缺少可读性。这是真的,所以为代码添加表达性(声明式方式)是我们的下一步。
添加便利语法!
我们知道 C# 不是一种声明式编程语言。为了给我们的代码带来“声明式”的特性,我们借助了该语言的另一项功能:扩展方法(又名 语法糖!)(注意:我们不想陷入关于扩展方法的使用是反模式还是非反模式的混乱争论!下面的例子只是为了演示如何丰富那些纯委托的表达性)。
我们知道,在语言本身(当然是 C#)中,我们无法对 委托 做太多事情,因此对那些 Func<>
和 Action<>
。除了某些调用相关的方法和关联的参数外,框架并没有提供太多。然而,由于扩展方法,我们可以为它们附加(类似于访问者)的功能。让我们针对我们的随机数生成目的构建一些这些方法:
//Extension methods are static and part of static class!
public static class HelperExtension
{
//We carefully choose names of our functions
//to maintain readability!
//instead of naming it CreateGenerator (which it indeed does), we
//call it by the Action it's suppose to perform at execution time.
// we will see the "why" when reading the use-case!
public static Func<int> GenerateInt(this Random value, int min, int max)
{
return () => value.Next(min, max);
}
public static Func<Func<int, int>, Func<int>> If(this Func<int> factory,
Func<int, bool> predicateFunc)
{
//reusing all the function we defined above!
return whenTrue => () => GenerateNumberAndApply(factory,
predicateFunc,
whenTrue,
Identity);
}
public static Func<int> Then(this Func<Func<int, int>, Func<int>> conditionFunc,
Func<int, int> whenTrue)
{
return conditionFunc(whenTrue);
}
public static void And(this Func<int> func, Action<int> apply)
{
apply(func());
}
}
////////////
//somewhere else in the code:
////////////
Action<int> print = val => Console.WriteLine("Value is: " + val);
//And we call as:
var randomGenerator = new Random();
randomGenerator.GenerateInt(0, int.MaxValue/2)
.If(IsOdd)
.Then(MultiplyByTwo)
.And(print);
如果我们忽略一些 C# 语法,在阅读最终的方法调用链时,我们会体验到以下 WOW 效果(即可读性):
WOW 效果(又名可读性)randomGenerator.GenerateInt(0, int.MaxValue/2).If(IsOdd).Then(MultiplyByTwo).And(print); =>
随机生成器,请生成一个新的随机整数,如果数字是奇数,则乘以二,然后打印结果数字!让我们再试一次。
randomGenerator.GenerateInt(0, int.MaxValue/2).If(val => !IsOdd(val)).Then(AddOne).And(print); =>
随机生成器,请生成一个新的随机整数,如果数字不是奇数,则加一,然后打印结果数字!
有了这些知识,让我们探索一种新的流式传输方式。
函数式流
由于我们将使用函数式编程概念,因此我们将“函数式流”一词用于标记我们的 API。我们将在此标题下讨论基本实现,然后再为其添加功能。
重新审视开放式流
在第一部分中,我们 粗略地介绍了开放式流。基本上,当流的构造函数(Ctor)签名符合以下大致伪签名时,我们将流称为开放式流:
class SomeStream : Stream
{
public SomeStream(Stream baseStream, ...)
{
... Ctor implementation ...
}
... class implementation ...
}
在内部,当我们从中读取(如果它是双向的或返回 CanRead=true)时,它会从构造函数注入的流(上面示例中的 baseStream
参数)读取。最终,它可能会操作这些字节并将其作为结果提供。基于配置的缓冲区大小,此类流应该能够根据需要多次读取 baseStream
。在本讨论中,我们将此机制称为 buffer-mutate-forward (BMF) 机制
。我们称展示此特性的流为“突变字节生成器”或简称为“Generators
”(尽管“Generator”一词的范围很广)。以下伪代码展示了相同的概念:
class SomeStream : Stream
{
public SomeStream(Stream baseStream, ...)
{
... Ctor implementation ...
}
public bool CanRead => true;
public int Read(byte[] buffer, int offset, int count)
{
while (buffer != full and baseStream.HasData)
{
baseStream.Read(localBuffer, localOffset, localCount);
//If there is NO transformation required, then the function
//is an IDENTITY function (returns the localBuffer back).
var mutatedBytes = PerformDataTransformation(localBuffer, localOffset, localCount);
mutatedBytes.CopyTo(buffer);
}
}
... class implementation ...
}
同样,当此类流是双向的或至少返回 CanWrite=true 时,我们可以对其进行写入。同样,在内部,它会在注入的流上写入突变后的字节,同时维护缓冲区状态。以下伪代码代表了写入机制:
class SomeStream : Stream
{
public SomeStream(Stream baseStream, ...)
{
... Ctor implementation ...
}
public bool CanWrite => true;
public int Write(byte[] buffer, int offset, int count)
{
//If there is NO transformation required, then the function
//is an IDENTITY function (returns the localBuffer back).
var mutatedBytes = PerformDataTransformation(buffer, offset, count);
baseStream.Write(mutatedBytes, ...);
}
... class implementation ...
}
我们发现这种流实现开箱即用的能力具有巨大的潜力,因此我们将我们的 API 基于此进行构建,而不是从头开始设计。让我们进一步剖析这个概念。
可视化管道
处理任何实现的最佳方法是在编写任何代码之前先进行可视化。因此,让我们通过以下插图看看管道是如何实际工作的:
在上图中,我们识别出三种(3)不同的管道类型:
- 源管道:它是管道的第一根管道。它包含(某种形式的)数据。它在左侧是封闭的,在右侧可以接受新管道。
- 中间管道:这些是可选的,如果连接的话,以串联方式连接到源管道(就像火车车厢连接到引擎一样),最后连接到目标管道。
- 目标管道:它是管道的最后一根管道,终止管道。它在左侧是开放的,可以连接到源管道(如果没有中间管道)或连接到最后一个中间管道。
稍加思考,我们可以认识到我们的
- 开放式流(如上所述和在第一部分中)是中间管道的理想选择。
- 具有
CanRead=true
的封闭式流(在第一部分中讨论)(即读取器流)是基于字符串、byte[]
、T[]
等数据类型的源候选(其中 T 是某种可以序列化为字节或字符串的已定义类型)。.. - 具有
CanWrite=true
的封闭式流(在第一部分中讨论)(即写入器流)是基于StringBuilder
、可写MemoryStream
、List<T>
等数据类型的目标候选(其中 T 是某种可以序列化为字节或字符串的已定义类型)。..
基于上述观察,并且知道如何实现我们所有 *-ability(可重用性、可维护性和可读性)的目标,我们只需要将这些流转换为 Generator
。下一步是创建一个简单的 Generator。
提升生成器
让我们再次从那些嵌套的“using
”块开始我们的旅程。
//NOTE: Comments contains part of explanation!
public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
using(var fileStream = new FileStream(uncompressedJsonFile.FullName, ...))
{
//... in order to create a generator on the above fileStream
// we need to hoist the below testReader out of this function
//
using(var textReader = new StreamReader(fileStream, ...))
{
//JsonReader of Newtonsoft.Json
using(var jsonReader = new JsonReader(textReader, ...))
{
//... your serialization code ...//
}
}
}
}
上述代码的问题在于第二个(第 2 个)“using
”块在第一个(第 1 个)块内部(同理,第三个“using
”在第二个内部……依此类推)。因此,为了分离与每个流相关的关注点,我们需要将内部的那些提升出来!如下所示:
为了进行这种提升,我们回到了函数式编程的原则,正如我们上面讨论过的。令人惊讶的是,我们已经讨论过这样的实现!您还记得我们是如何将“If
”逻辑与“Then
”动作分离的吗?让我们重新审视一下:
//NOTE: Comments contains part of explanation!
//As we did NOT know beforehand what function will take place in case of predicate is TRUTHY
// we turned that FUTURE input as a parameter to our higher-order function!
public static Func<Func<int, int>, Func<int>> If(this Func<int> factory,
Func<int, bool> predicateFunc)
{
//When "If" will execute at Runtime, it is still unaware of the "Then" part
// thus, we let the "Then" part to decide what should be the truthy execution
// so, we make that a parameter to the lambda!
return funcForWhenTrue => () => GenerateNumberAndApply(factory,
predicateFunc,
funcForWhenTrue,
Identity);
//NOTE: though, it is very well possible to pass "whenTrue" as parameter of
// this function, but then we wont be able to Chain the calls like:
// .If(...).Then(...)
//
// Instead the usage would become:
// .If(..., Then)... and of course... this is NOT what we think of a pipeline!
//
// Consider following and decide yourself which version is better:
//
// .If(...).Then(...).If(...).Then(...) <= No nesting of paranthesis
// OR
// .If(..., Then(..., If(..., Then(..., )))) <= Nested paranthesis
//
// We do not know about you, but for us Nested paranthesis are no better than
// nested "using"! (May be some LISPers won't agree with us... lol!)
}
public static Func<int> Then(this Func<Func<int, int>, Func<int>> conditionFunc,
Func<int, int> whenTrue)
{
// Here we pass the known functionality "whenTrue" associated with TRUTHY value
// into the conditionFunc
// Now, this "whenTrue" upon execution will become "funcForWhenTrue" and will get
// passed to "GenerateNumberAndApply" as shown above in "IF" in bold!
return conditionFunc(whenTrue);
}
让我们尝试对这些流进行一些代码提升(基于上述想法):
//NOTE: "..." are operation specific params which we can ignore for the moment
//We are going to PULL the data from this file
//We hoist our 1st "using" and create a separate Func
public static Func<Stream> PullData(this FileInfo someFileInfo, ...)
{
//just return the file stream to be consumed later by "then" parts
return () => new FileStream(someFileInfo.FullName, ...);
}
//To hoist out our 2nd "using", we need our first hoisted "using", lets
//pass it as parameter
public static Func<TextReader> ThenGenerateText(this Func<Stream> prevHoist, ...)
{
return () => new StreamReader(prevHoist(), ...);
}
//To hoist out our 3rd "using", we need our second hoisted "using", lets
//pass it as parameter
public static Func<T> AndJsonDeserialize(this Func<TextReader> prevHoist, ...)
{
return () =>
{
//JsonReader of Newtonsoft.Json
using(var jsonReader = new JsonReader(prevHoist(), ...))
{
//... your serialization code returning instance of <T> ...//
}
};
}
使用上面的代码片段,我们成功地创建了三个(3)个生成器(至少举例来说)。
- PullData:文件(由提供的 FileInfo 指向)的字节(即
byte[]
)的生成器。如果我们愿意,可以立即调用它并对其使用任何 FileStream 方法;但它本身并没有那么有趣,因为它作为我们管道中的源管道。 - ThenGenerateText:一个生成器,能够基于底层提供的流生成文本段(即
char[]
)。同样,立即调用它并不那么有趣,因为它作为我们管道中的中间管道。 - AndJsonDeserialize:基于底层文本段生成器的
<T>
对象实例的生成器!是的,我们可以立即调用它,因为它作为我们管道的目标管道的示例。
使用这三个(3)个示例管道……创建一个从 json 文件到对象的管道就像蛋糕一样简单,如下所示:
var myObject = new FileInfo("Path to my json file")
.PullData(...)
.ThenGenerateText(...)
.AndJsonDeserialize(...);
//Notice the WOW-effect
不信服!
是的,我们也同意!为了举例说明,这是一个很好的练习,但实际上,为遇到的每种类型创建所有这些扩展方法毫无意义……即使在这个小例子中,我们也为 3 种不同类型创建了扩展:
FileInfo
上的 PullDataFunc<Stream>
上的 ThenGenerateTextFunc<TextReader>
上的 AndJsonDeserialize
如果我们继续走这条路,我们将不得不处理许多变化,这会很快导致代码混乱。我们需要一些保持不变的东西,即我们需要识别一个 <T>
,它对我们所有的扩展方法都是相同的(类似通用流对象的东西)!
推送 (PUSH) vs. 拉取 (PULL)
在第一部分中,我们展示了 典型的流用法作为管道,并确定了 相关的工件及其在管道中的相互作用。并且,基于流的性质(单向 vs. 双向),我们确定了可以总结在以下表中的数据流相关需求。
基于推送的流
当存在数据源和可写流管道时,我们将流操作称为“基于推送的流”。换句话说,在管道中,我们至少有一个管道是仅写(即 CanRead=false)。
例如,在 C# 中,GZipStream
(DeflateStream
相同)在用于压缩数据时是仅写的。因此,如果我们想流式传输涉及压缩的数据,我们就别无选择,只能构建仅写管道!
我们将此管道称为“基于推送的管道”,并将此管道的第一根管道(源管道)命名为 Push(...)。因此,“Push(...)
”扩展方法可以应用于以下任何源管道。例如:someString.Push(...)
、someStream.Push(...)
等。
本着同样的精神,我们定义了各种中间管道(扩展方法),同时将“.Then
”作为它们的前缀。
最后,目标管道(也是扩展方法)的前缀为“.And
”。
从源到目标推送数据1
源管道 (.Push 管道) | 中间管道 (.ThenXXX 管道) | 目标管道 (.AndXXX 管道) |
---|---|---|
|
|
|
1API 可扩展到更多源/中间/目标管道,只要它们能产生兼容的签名。 2通配符操作是临时字节操作需求,例如:在不缓冲的情况下计算流中的字节数、流分叉(又名 TEEing)等…… |
由于大多数流操作都可以从 C# 的 Async-Await
API 中受益,因此我们将 Func<PushFuncStream>
确定为我们扩展方法的通用表示,其中 PushFuncStream struct
定义如下:
public struct PushFuncStream
{
// Our Writable stream
public Stream Writable { get; }
// If true, stream is disposed at the end of streaming else left open
public bool Dispose { get; }
// Associated Cancellation token
public CancellationToken Token { get; }
public PushFuncStream(Stream writable, bool dispose, CancellationToken token)
{
Writable = writable.CanWrite.ThrowIfNot(DdnDfErrorCode.Unspecified,
"Cannot write on the stream",
writable);
Dispose = dispose;
Token = token;
}
}
我们将在下一节中看到相关的实现。
基于拉取的流
同样,当存在数据源和可读流管道时,我们将流操作称为“基于拉取的流”。换句话说,在管道中,我们至少有一个管道是仅读(即 CanWrite=false)。
例如,在 C# 中,GZipStream
(DeflateStream
相同)在用于解压缩数据时是仅读的。因此,如果我们想流式传输涉及解压缩的数据,我们就别无选择,只能构建仅读管道!
我们将此管道称为“基于拉取的管道”,并将此管道的第一根管道(源管道)命名为 Pull(...)。因此,“Pull(...)
”扩展方法可以应用于以下任何源管道。例如:someByteArray.Pull(...)
、someStream.Pull(...)
等。
毫不奇怪,对于拉取管道,我们也定义了各种中间管道(扩展方法),同时将“.Then
”作为它们的前缀;以及目标管道(也是扩展方法),其前缀为“.And
”。
从源到目标拉取数据1
源管道 (.Pull 管道) | 中间管道 (.ThenXXX 管道) | 目标管道 (.AndXXX 管道) |
---|---|---|
|
|
|
1API 可扩展到更多源/中间/目标管道,只要它们能产生兼容的签名。 2通配符操作是临时字节操作需求,例如:在不缓冲的情况下计算流中的字节数等…… |
同样,为了利用 C# 的 Async-Await
API,我们将 Func<PullFuncStream>
确定为我们扩展方法的通用表示,其中 PullFuncStream struct
定义如下:
public struct PullFuncStream
{
// Our Readable stream
public Stream Readable { get; }
// If true, stream is disposed at the end of streaming else left open
public bool Dispose { get; }
public PullFuncStream(Stream readable, bool dispose)
{
Readable = readable.CanRead.ThrowIfNot(DdnDfErrorCode.Unspecified,
"Cannot read from the stream",
readable);
Dispose = dispose;
}
}
让我们讨论围绕这些概念的相关实现。
实现说明:值得一提的是,如果我们的管道完全由双向管道组成,那么 PUSH 和 PULL 管道都会产生相同的结果。
API 实现
考虑我们第一部分最初的简单任务将非常有趣,该任务是:
问题陈述给定一个二进制文件的路径,读取其所有字节。首先,使用 GZip 压缩算法对其进行解压缩,然后使用 JSON 序列化器将数据反序列化为定义明确的对象数组(即
List<T>
,其中 T 是已知的)。
我们识别出以下不同的操作,即:
- 从给定文件中读取所有字节
- 使用 GZip 算法解压缩这些字节
- 使用 Json 序列化器从解压缩的字节创建
List<T>
(T
是已知的或只是一个泛型占位符,这无关紧要)
让我们准备我们的函数式流工件来完成任务:
//Comments contains some exaplanations
//"..." are operation specific parameters which we can ignore for our discussion
//STEP 1.
//Our file based byte Generator (PULL based)
// as we need decompression which is read-only as explained above
public static Func<PullStreamFunc> Pull(this FileInfo fileInfo, ...)
{
return () => new PullStreamFunc(new FileStream(fileInfo.FullName, ...), true);
//last true is for dispose (we need to dispose file handle after reading is done)
}
//STEP 2.
//Our Gzip based byte Generator
public static Func<PullStreamFunc> ThenDecompress(this Func<PullStreamFunc> previous,
bool include = true)
{
//we will discuss "include" boolean shortly (and also when discussing Meta-Pipeline)
//Commentary on "ThenApply" is provided below separately
return previous.ThenApply(p => {
//when you will check the signature of "ThenApply" below,
//you will see, p is infact previous (i.e. Func<PullStreamFunc>)
//thus prevSrc is our PullStreamFunc!
var prevSrc = p();
//standard Gzip constructor!
var unzip = new GZipStream(prevSrc.Readable,
CompressionMode.Decompress,
!prevSrc.Dispose);
//We return a Func<PullStreamFunc>, NOT PullStreamFunc itself!
return () => new PullFuncStream(unzip, true);
//last true is for dispose (we dispose the stream once decompression is done)
}, include);
}
//STEP 3.
//Our JSON deserializer
public static T AndParseJson<T>(this Func<PullStreamFunc> previous, ...)
{
var prevSrc = previous();
using(var txtReader = new StreamReader(prevSrc.Readable, ...))
{
//using Newtonsoft.Json
using(var jsonReader = new JsonReader(txtReader, ...))
{
T instance = ... serialization logic ...
return instance;
}
}
}
//Step ThenApply Logic.
public static Func<PullFuncStream> ThenApply(this Func<PullFuncStream> src,
Func<Func<PullFuncStream>, Func<PullFuncStream>> applyFunc,
bool include = true)
{
//applyFunc => It is a higher-order function, that =>
// 1. Accepts an instance of Func<PullFuncStream> as input
// 2. Returns an instance of Func<PullFuncStream> as output
//Do you remember our construct of "If"-"Then" pipes while discussing
//example of Even Random integer generation
//In the "If" condition we were applying Identity operation when condition was Falsy!
//
//This is same logic. When :
// Include = false => we return "src" (Identity operation on Func<PullFuncStream>)
// Include = true => We pass "src" to "applyFunc" to get
// new Func that contains new PullFuncStream with added
// streaming pipe!
return include ? applyFunc(src) : src;
}
使用上述结构,解决方案如下:
问题陈述和解决方案给定一个二进制文件的路径,读取其所有字节。首先,使用 GZip 压缩算法对其进行解压缩,然后使用 JSON 序列化器将数据反序列化为定义明确的对象数组(即
List<T>
,其中 T 是已知的)。
=>
List<T> objectList = new FileInfo("Path of File", ...).Pull()
.ThenDecompress()
.AndParseJson<List<T>>( . . . );
备注:由于我们在整个操作中从未 MemoryStream (内存流)使用任何内存缓冲区;我们实现了 我们在第一部分中可视化实现的目标。
API
注意:如果您不感兴趣使用我们的 API,请随时跳过本节!
有用链接
如上所述,我们将实现分为 2 部分:
- 基于推送的管道:当至少有一个管道是仅写管道时。
- 基于拉取的管道:当至少有一个管道是仅读管道时。
事实上,为了方便某些操作(功能),我们添加了一个适配器,用于将 PULL 管道转换为 PUSH 管道。例如,如果我们使用 PULL 管道从某个 WebAPI 方法读取字节。但同时,我们想将字节压缩保存到本地磁盘;在这种情况下,同样,无需将数据缓冲在内存中,我们可以将 Pull 管道转换为 Push 管道并添加 Compression 管道。
//SYNC PullFuncStream
public static Func<PushFuncStream, Task> ThenConvertToPush(this Func<PullFuncStream> src,
int bufferSize = StdLookUps.DefaultBufferSize)
{
return ...;
}
//ASYNC PullFuncStream
public static Func<PushFuncStream, Task> ThenConvertToPush(this Func<Task<PullFuncStream>> src,
int bufferSize = StdLookUps.DefaultBufferSize)
{
return ...;
}
简化键入
VS Studio 的智能感知(Intellisense)帮助我们在输入几个字符后发现方法名称。利用智能感知的强大功能,为了简化使用,我们将 API 管道分为三个(3)不同的前缀:
- 第一个管道操作具有
.Push
或.Pull
前缀。 - 中间管道是可选的(即管道可以包含零个或任意数量的管道串联),并且总是以
.Then
为前缀。 - 目标管道的前缀为
.And
。
通过了解这些前缀,可以快速发现方法。
一旦确定了源/目标,以下签名就代表了流管道:
// "..." are operation specific parameters which we can ignore for this listing
// "XXX" is the postfix of operation. VisualStudio IntelliSense will come handy to discover all :)
//Pull based
source.PullXXX( ... ).ThenXXX( ... ).ThenXXX( ... ) ... .AndXXX( ... );
//Push based
source.PushXXX( ... ).ThenXXX( ... ).ThenXXX( ... ) ... .AndXXX( ... );
以上所有方法都有其等效的 ASYNC
版本。但链的签名(除了前面加 await
之外)不会改变。下图说明了发送方和接收方管道的概念。
元管道
目前,我们只讨论了基于需求的管道组合。的确,使用这种高度可重用和可组合的管道,我们可以快速生成功能,但对于每个功能,我们仍然需要编写管道组合代码。这就像为 REST 服务添加一个新的 GET 端点以响应新需求。但是,这也很累人,对吧?这就是为什么 Facebook 提出了 GraphQL 的设计!借助元管道,我们可以定义一次某项内容,然后一遍又一遍地使用其子组件(即广义上的可重用性)!
使用我们的 API,创建这种元管道是可能且容易的。我们将“元管道”命名为“Meta-Pipelines”,即其行为由元数据驱动的管道。让我们考虑以下简单的陈述:
问题陈述给定一个文件路径,如果扩展名为 .zip,则假定它是 GZip 压缩文件,否则是常规文件。文件包含某种已知类型 <T> 的记录数组。从文件中读取这些记录并创建 List<T>。
根据上述陈述,我们知道,知道文件路径是否以“.zip”结尾,我们就需要应用压缩。除了压缩,其余代码在两种情况下都是相同的。
如果你还在考虑使用“If-Else”写代码,那就忘了它吧!以下是使用我们的 API 编写元管道的方法:
//Meta Method to identify whether file is compressed or not
public static bool UseZip(string filePathOrName)
{
return filePathOrName.EndsWith(".zip");
}
//somewhere else in the code:
//Meta-Pipelines are created using the "include" boolean we added in the pipes
fileInfo.Pull( ... )
.ThenDecompress( ... , include:UseZip(fileInfo.FullName))
.AndParseJsonArray<T>(...);
请注意,每当 UseZip
返回 false
时;我们的管道就会绕过 Decompress 管道,就好像我们写了代码一样:“fileInfo.Pull( ... ).AndParseJsonArray<T>(...);”(这都归功于“include
”布尔值驱动的身份行为,我们上面解释过)。
这种元驱动管道用法的另一个例子是基于 Web 的流式应用程序,其中压缩、加密、编码等元信息可以从请求标头中获取。可以构建一个具有尽可能多管道的静态管道;并且,在运行时通过 内容协商 阶段基于布尔值来屏蔽不需要的管道。
了解这一点,我们可以设计基于元信息的解决方案,并对程序的运行时行为保持信心。
弹性管道
我们上面已经讲过,只有源管道(.PullXXX 或 .PushXXX)和目标管道(.AndXXX)是我们流式管道的必需管道。所有“.ThenXXX”管道都是可选的;因此,我们可以根据需要使用零个或多个管道来构建管道。这使得我们的 API 非常灵活/有弹性。
虽然我们已经实现了许多日常管道(以及下面将讨论的两个异类管道)作为我们库的一部分;但我们知道我们的库并非详尽无遗,无法满足所有用例(例如,它不包含 Brotli 压缩 的管道)。因此,为了方便扩展,我们保持所有端口开放。如果需要创建新管道以与我们的库一起使用,以下是实现新管道时应遵循的简单规则:
- 如果管道是基于 PULL 的
- 如果管道在新的源上(假设是
TNew
):创建一个接受TNew
实例并返回Func<PullFuncStream>
实例的函数(同样,请适应async
版本)。 - 如果是中间管道:创建一个接受
Func<PullFuncStream>
实例并返回Func<PullFuncStream>
实例的函数(同样,请适应async
版本)。在实现内部,从输入PullFuncStream
的Readable
属性获得的流中读取数据。 - 如果是目标管道:创建一个接受
Func<PullFuncStream>
实例并返回 TOut(或 void)的函数。(同样,请适应async
版本)。
- 如果管道在新的源上(假设是
- 同样,如果管道是基于 PUSH 的:
- 如果管道在新的源上(假设是
TNew
):创建一个接受TNew
实例并返回Func<PushFuncStream>
实例的函数(同样,请适应async
版本)。 - 如果是中间管道:创建一个接受
Func<PushFuncStream>
实例并返回Func<PushFuncStream>
实例的函数(同样,请适应async
版本)。在实现内部,从输入PushFuncStream
的Writable
属性获得的流中读取数据。 - 如果是目标管道:创建一个接受
Func<PushFuncStream>
实例并返回 TOut(或 void)的函数。(同样,请适应async
版本)。
- 如果管道在新的源上(假设是
接下来,我们将讨论我们库中使用的两个(2)此类异类管道,利用核心 API 概念的弹性。
TEEing & Super-TEEing
就像 Unix Pipes (“|”) 一样,Unix TEE(命令 tee
)是另一个著名的操作。TEE 的基本概念是读取一次并复制(输出)多次(松散地讲)。换句话说,对于每个输入,它可以处理将其写入多个目标。相同的概念在下图所示:
TEEing 在流式应用程序中是一个非常有趣用例;将源流复制到多个目标,在数据复制、数据广播等方面具有深远的用途。您可能听说过 Apache Kafka,著名的消息队列(如 Rabbit MQ 等)或 pub-sub 系统,如 REDIS。
在我们的 API 中,我们添加了一个“并发写入器”(中间)管道,它完全执行相同的操作。对于每个接收到的块,它会并发地将数据复制到连接的流!此管道仅在基于 PUSH 的(写入器)管道中可用,这也说得通。尽管如此,由于可以使用我们的 "ThenConvertToPush" 管道将 PULL 管道转换为 PUSH 管道,理论上和实际上,在消耗 PUSH API 时可以执行这种并发写入。
此管道的签名是:
//Intermediary concurrent writer
public static Func<PushFuncStream, Task> ThenConcurrentlyWriteTo(this Func<PushFuncStream, Task> src,
Stream writableStream,
...)
{
//writes concurrently on both PushFuncStream.Writable and
//supplied writableStream
return ...;
}
//////// USE-CASES
//1. Single TEE (replicates someSource's byte representation to someStream and yetAnotherStream)
await someSource.Push()
.ThenConcurrentlyWriteTo(someStream, ...)
.AndWriteStreamAsync(yetAnotherStream);
//2. Concatenated TEE (replicates someSource's byte representation to someStream1, ..., someStreamN
// and yetAnotherStream)
await someSource.Push()
.ThenConcurrentlyWriteTo(someStream1, ...)
.ThenConcurrentlyWriteTo(someStream2, ...)
...
.ThenConcurrentlyWriteTo(someStreamN, ...)
.AndWriteStreamAsync(yetAnotherStream);
我们将串联的 TEEs 称为“Super-TEEs”,它们可以支持流复制以及一些中间数据突变。下图说明了这个概念:
使用我们的 API 创建这种 Super TEEs 是很直观的。考虑一个假设的案例:
案例陈述和解决方案我们想将 List<T> JSON 序列化,将 JSON 保存到本地文件;同时,将 GZipped 字节流式传输到 Web 端点,并且,也将其 Gzip 数据加密后保存在共享磁盘上。
=>
await someList.PushJsonArray( ... )
.ThenConcurrentlyWriteTo( localFileStream, ... )
.ThenCompress( ... )
.ThenConcurrentlyWriteTo( webStream, ... )
.ThenEncrypt<...>( ... )
.ThenConcurrentlyWriteTo( sharedDiskStream, ... );
字节计数
最后,我们展示了我们最后一个异类管道的实现,它简单地计算字节数。用例是将此计数测量嵌入到响应的头值中(例如 HTTP Headers)或预先计算它们以满足某些元查询(如 HTTP Head 方法)。该管道的签名是:
public static Func<PushFuncStream, Task> ThenCountBytes(this Func<PushFuncStream, Task> src,
out IByteCounter byteCounter,
...)
{
return ...;
}
再次考虑以下假设需求:
问题陈述和解决方案读取文件并将其内容保存为 GZipped 文件,同时计算压缩比。
=>
IByteCounter counterBeforeCompress;
IByteCounter counterAfterCompress;
await someFileInfo.Push( ... )
.ThenCountBytes(out counterBeforeCompress)
.ThenCompress( ... )
.ThenCountBytes(out counterAfterCompress)
.AndWriteFileAsync(targetFileInfo, ...);Console.WriteLine("Compression Ratio: " + (counterAfterCompress.ByteCount / counterBeforeCompress.ByteCount * 100));
此管道在PUSH 管道上可用,作为 .Then
(中间)和 .And
(目标)管道。但是,在PULL 管道上仅作为 .Then
(中间)管道。
评论
基于以上(冗长的)讨论,我们最后总结了实现的目标:
- 我们避免了内存缓冲区以提高运行时内存使用效率。
- 所有中间管道都只使用必要的固定大小缓冲区实现。
- 通过端到端(即源到目标)的操作链呈现了高效的管道。
- 创建了一个提供以下功能的 API:
- 可组合性:所有管道(扩展方法)都是可组合的,并且可以根据需要使用零个或多个中间管道来构建链。
- 可读性:所有管道都非常声明式,并且只公开可选参数。
- 可维护性:管道的构建遵循单一职责原则。
- 弹性:可以轻松创建新的(普通和异类)管道,并使其成为现有 API 的有机组成部分。
- 可重用性:通过管道的组合和重组来实现(普通意义上的)可重用性。同时,通过探索 元管道 的概念来实现(广义上的)可重用性。
祝您编码愉快!