65.9K
CodeProject 正在变化。 阅读更多。
Home

EchoStream - .NET 的 Echo/Tee Stream

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.75/5 (15投票s)

2003年4月7日

12分钟阅读

viewsIcon

91466

downloadIcon

1388

介绍 EchoStream 类并演示其用法。

引言

EchoStream 是一个功能齐全的 .NET 回声流实现。简而言之,回声流会将两个其他流“合并”,并且写入它的任何内容都会依次写入两个底层流。这有时被称为“tee”流。EchoStream 支持这种 tee 功能以及一种特殊的读回声方式,其中读取总是来自两个流之一,并且读取到的任何内容都会回显到第二个流。最后,EchoStream 包含广泛的错误处理选项,允许用户精确选择如何处理回声错误。

背景

在我编写项目的 Java 版本时,我发现一个名为 TeeOutputStream (或 TeeStream) 的类非常有用。它由 Anil Hemrajani 编写,可以在 Sun 的 Java Developer Site 上找到。我修改并扩展了这个类,并编写了一个配套的 TeeInputStream,一切都很顺利。

现在我们的活跃开发平台是 .NET,我发现自己需要完全相同的功能。但是,我自己找不到现成的实现,所以我决定从头开始编写一个。结果就是 EchoStream

EchoStream 是用 C# 实现的,我的代码示例也是用该语言编写的。

Using the Code

EchoStream 最简单的用法是作为 tee 流,即只写流。一个可能需要这种用法的例子是网络通信。您可能需要将通过网络流发送的所有内容记录到本地文件中。以下代码演示了这种用例。

byte[] outBytes = GetDataToWrite();
NetStream netStream = GetNetworkStream();
FileStream logStream = GetLogStream();
EchoStream stream = new EchoStream(
  netStream, logStream, EchoStream.StreamOwnership.OwnNone
);

// Let's write to the echo stream.  The text that is written will end up
// in both netStream and logStream.
stream.Write(outBytes, 0, outBytes.Length);

// Now, we're done with the echo stream.  Closing it will flush the
// underlying streams, but will not close them because of
// EchoStream.StreamOwnership.OwnNone, above.
stream.Close();

源代码相当直观。只要 EchoStream 是打开的,写入它的内容就会传播到传递给其构造函数的两个流中。

如上面的代码所述,您可以使用 EchoStream.StreamOwnership 枚举来控制 EchoStream 在关闭自身时是否关闭其任一、任二或全部 constituent 流。

如果您直接写入任何底层流,EchoStream 将一无所知,并且写入不会被回显。在某些情况下,这样做可能很有用。EchoStream 从不缓冲其输入,因此在底层流的写入操作与 EchoStream 的写入操作之间插入内容应该始终是安全的。

现在,碰巧的是,如果您需要记录通过流从应用程序发出的任何内容,您可能还需要记录从应用程序传入的任何内容。EchoStream 也处理这种情况,它将其中一个输入流标识为 PrimaryStream,另一个标识为 SlaveStream。在写入时,主流和从流的工作方式相同(嗯,几乎相同;见下文)。但是,在读取时,回声流始终从主流读取,从不从从流读取。它从主流读取的任何内容都会被“回显”,即写入到从流中。因此得名。

EchoStream 将传递给其构造函数的第一个流作为主流,第二个流作为从流。这些流随后可以通过 PrimaryStreamSlaveStream 属性访问。

这段代码展示了如何从回声流读取。

NetStream netStream = GetNetworkStream();
FileStream logStream = GetLogStream();
EchoStream stream = new EchoStream(
  netStream, logStream, EchoStream.StreamOwnership.OwnNone
);

// Read from the echo stream.  A read on the echo stream results in a
// read from the primary stream, and a write to the slave stream.
// NOTE: In many cases, you'd use a Reader instead of directly reading
// into a byte[], but I have not used one here for clarity.
byte[] inBytes = new byte[4096];    // Read 4k at a time.
int nRead = stream.Read(inBytes, 0, inBytes.Length);

// At this point, nRead bytes have been read from netStream, and nRead
// bytes have thus been written to logStream, as the input was logged to
// that stream.

作为另一个例子,假设您需要读取和写入网络流,但需要将输入和输出发送到两个单独的流。由于 EchoStream 本身从不进行任何缓冲,因此您可以通过创建两个具有相同主流但从流不同的 EchoStream 对象来完成此操作,如以下代码所示。

byte[] outBytes = GetDataToWrite();
NetStream netStream = GetNetworkStream();
FileStream outLogStream = GetOutLogStream();
FileStream inLogStream = GetInLogStream();

EchoStream outStream = new EchoStream(
  netStream, outLogStream, EchoStream.StreamOwnership.OwnNone
);
EchoStream inStream = new EchoStream(
  netStream, inLogStream, EchoStream.StreamOwnership.OwnNone
);

// Write to the output stream.  This will write to netStream and
// outLogStream.
outStream.Write(outBytes, 0, outBytes.Length);

// Read from the input stream.  This will read from netStream and
// write to inLogStream.
byte[] inBytes = new byte[4096];    // Read 4k at a time.
int nRead = stream.Read(inBytes, 0, inBytes.Length);

// Now, we're done with the echo stream.  Closing it will flush the
// underlying streams, but will not close them because of
// EchoStream.StreamOwnership.OwnNone, above.
stream.Close();

异常处理

上面的示例会顺利运行,直到有一天它在一个磁盘空间只有 20MB 的客户机器上运行。突然,它会抛出异常,网络操作会失败,即使网络上没有发生任何问题。在事后分析中,可能会发现您正在传输的 25MB 数据从未存储在磁盘上,但却发生了磁盘空间错误。答案当然是日志文件。按照代码的编写方式,写入日志文件必须成功,才能使网络操作也成功。

在某些情况下,您可能会发现写入从流与写入主流同样重要,两者都必须成功才能使操作成功。但是,在上面刚刚描述的案例中,我们假设即使在写入日志流时发生任何事情,网络流上的通信也不应中断。

EchoStream 在异常处理方面对主流和从流的处理方式不同,即使在写入情况下也是如此。(这是上面提到的写入情况的一个区别)。当两个流都将被操作修改时,EchoStream 始终先修改主流,并将任何抛出的异常传播回调用者。但是,在主流成功修改后,EchoStream 的错误处理能力就发挥作用了。EchoStream 支持一组属性来控制在写入从流时发生异常时的行为。这些属性如下。

  • SlaveReadFailAction
  • SlaveReadFailFilter
  • SlaveWriteFailAction
  • SlaveWriteFailFilter
  • SlaveSeekFailAction
  • SlaveSeekFailFilter
  • LastReadResult

对于流的每个主操作,EchoStream 都支持一对属性,用于指定失败时要采取的操作,以及一个可选的过滤器。操作可以是 PropogateIgnoreFilter 之一。

默认操作是 Propogate。设置此操作后,主流已修改后对从流执行的操作引起的任何异常都将传播出 EchoStream。这是最高效的行为,因为 EchoStream 在操作的任何时候都不需要进入昂贵的 try 块。

对于上述场景最有用的操作是 Ignore。设置此操作后,主流已修改后对从流执行的操作引起的任何异常都将被 EchoStream 默默捕获并忽略。因此,对于上面描述的示例,在第一次读取或写入之前将以下行添加到代码中,就可以解决日志记录失败导致整个操作失败的问题。

outStream.SlaveReadFailAction = EchoStream.SlaveFailAction.Ignore;
outStream.SlaveWriteFailAction = EchoStream.SlaveFailAction.Ignore;
outStream.SlaveSeekFailAction = EchoStream.SlaveFailAction.Ignore;

inStream.SlaveReadFailAction = EchoStream.SlaveFailAction.Ignore;
inStream.SlaveWriteFailAction = EchoStream.SlaveFailAction.Ignore;
inStream.SlaveSeekFailAction = EchoStream.SlaveFailAction.Ignore;

另外,在此特定情况下,我们只想忽略日志流可能引起的所有潜在异常,我们可以使用以下只写快捷属性。

outStream.SlaveFailActions = EchoStream.SlaveFailAction.Ignore;
inStream.SlaveFailActions = EchoStream.SlaveFailAction.Ignore;

这对于这种情况更具可维护性,因为您不必在以后更改代码以适应 EchoStream 中添加的新异常相关属性;使用快捷属性可以避免这种情况。

最后一个操作 Filter 可能是最不常见但绝对最灵活的操作。您不能直接将任何“操作”属性设置为此值。相反,当您设置任何过滤器属性时,它都会隐式设置,如以下代码所示。

inStream.SlaveWriteFailFilter = new EchoStream.SlaveFailHandler(OnWriteFail);
Debug.Assert(inStream.SlaveWriteFailAction == EchoStream.SlaveFailAction.Filter);
...
private EchoStream.SlaveFailAction OnWriteFail(
			object oSender, EchoStream.SlaveFailMethod failMethod,
			Exception exc)
{
  // Here, examine the given failMethod and the exception that occurred, and
  // return one of SlaveFailAction.Propogate or SlaveFailAction.Ignore.
  // Returning SlaveFailAction.Filter will cause an InvalidOperationException.
}

正如您所见,使用过滤器可以让您检查发生的异常并指示 EchoStream 如何继续。您可以使用相同的方法来处理读取、写入和搜索失败,方法是使用 failMethod 参数来区分它们,或者为每种情况注册不同的方法。此外,与“操作”属性一样,您可以使用 SlaveFailFilters 方法一次性设置所有过滤器的处理程序。

最后,对于 Propogate 情况需要说明一点。您可能会决定为了获得最大的效率,您希望避免 EchoStream 内部昂贵的 try 块,而是允许异常传播回您自己代码中的一个处理程序,该处理程序安装在某个位置,这样它就不必为每次读取和写入操作都进入一次。您的处理程序可以以某种方式处理异常(例如,通过分离 EchoStream 并转为只写入主流,例如,如果由于磁盘空间不足而导致日志流“变坏”),然后重新启动您的读取或写入循环。这可以正常工作,但读取操作中的异常除外。在这种情况下,异常会导致 Read 的返回值从您的角度丢失,因此您无法知道从流中读取了多少数据以处理发生在从流抛出异常之前的发生在主流上的成功读取的结果。这就是 LastReadResult 属性的用武之地。LastReadResult 始终反映流上发生的最后一次 Read 操作的结果,让您可以从中断的地方继续。

附加方法和属性

ReadWrite 已经得到了广泛的讨论,但还有其他方法可能会修改底层流。

SeekPosition 属性的工作方式完全相同。它们使用 SlaveSeekFailActionSlaveSeekFailFilter 属性,与为 ReadWrite 描述的完全相同。此外,这些属性会尝试在两个底层流上正确工作。例如,假设您向一个流写入“I see a little silhoueto”然后将其作为 EchoStream 中的主流。再假设您还没有向从流写入任何内容。向回声流写入“ of a goat.” 结果是主流包含“I see a little silhouette of a goat.”,从流只包含“of a goat.”。现在您意识到您错误地拼写了“lamb”并将“goat”写错了,所以您决定修复这个错误。如果您将流的位置设置为“I see...”字符串的长度,该位置将直接设置在主流上,但将根据主流位置的变化为从流计算相对位置。结果是您得到了预期的结果:主流的流指针回到“silhouette”中“o”之后的那个位置,从流的流指针回到零位置。

SeekPosition 类似的是 SetLength。此方法通过改变主流的大小来改变从流的大小,而不是将它们都设置为相同的大小。这与 SeekPosition 的实现精神相同。

Stream 的最后一个可以修改流本身的方法是 Flush。但是,Flush 只是写入的一个特殊情况,由于缓冲而被延迟。EchoStream 从不缓冲读写操作,但其 constituent 流可能缓冲,因此调用 EchoStream 上的 Flush 仍然可能导致底层流之一出现异常。然而,由于 Flush 只是写入的一个特殊情况,EchoStream 只是使用了与 Write 相同的异常处理属性。通过这种方式,只要您记住 Flush 可能会调用 Write 异常处理框架,底层流进行的任何缓冲对您的错误处理代码来说都是相对透明的。

关注点

EchoStream 的初始实现非常简单明了。直到我开始编写该类的文档时,我才意识到该类在异常处理方面的不足之处。流的性质,例如许多流不支持搜索操作或这些操作很慢,阻碍了我以完全异常安全的方式(即,两个流都成功更改,或者两者都不更改)编写类的方法。然而,因为回声流中的两个流之一通常会从属于另一个(至少在我想到的一些用例中),所以尝试将调用者与某些故障隔离开来,或者在从属流发生故障时让他们控制会发生什么,是有意义的。因此,EchoStream 的异常处理设施应运而生,这在启发 EchoStream 的 Java Tee 流中是完全不存在的。

添加此错误处理代码将源代码(包括文档)的大小从约 360 行增加到约 850 行,并大大增加了某些核心方法类的复杂性。但是,我相信有了这些错误处理功能,该类会更加健壮,并且在我自己以及 CodeProject 社区的任何人的使用下,经过一些隐藏错误的排除后,它将是一个非常有用和可靠的工具。

最后,根据需要,EchoStream 的代码下载还包括我的 Covidimus.Diagnostics.Debug 类。本文没有涵盖该类,但您可能想看一下;它可能对您有用。

增强功能

这个类最可能的一个增强功能实际上是我可能永远不会添加的,原因是为了效率考虑,那就是能够拥有多个从流。我不想直接将此功能添加到 EchoStream,因为它意味着要维护一个从流数组,并在每次修改操作时遍历它们。为了最大程度的健壮性,每个从流都需要单独处理错误(这意味着必须进入的任何 try 块都需要为列表中的每个流进入一次)。即使在 Propogate 的情况下,我也只是不想在最常见的情况是遍历单个项目时,为 EchoStream 方法添加遍历数组的开销。可以通过将多个 EchoStream 对象链接在一起来实现此行为,但这比遍历数组的效率还要低。我认为真正的解决方案是开发一个与 EchoStream 并行的类,名为 MulticastEchoStream,它支持多个从流。我将把这样一个“庞然大物”的开发留给读者作为练习。

历史

  • 2003 年 4 月 7 日 - 首次发布
© . All rights reserved.