PipeStream,一个内存高效且线程安全的流






4.72/5 (22投票s)
PipeStream 是一个线程安全的读/写数据流,用于在单个生产者/单个消费者类型问题中的两个线程之间使用。
引言
.NET 提供了一个相当用户友好的线程机制,但缺乏一种内置的方式来轻松地在线程之间流式传输大量数据。在 .NET 2.0 中,Stream.Synchronized
方法通过创建一个线程安全的流包装器,为这个问题提供了一个部分解决方案。这并没有解决处理大量数据所固有的内存问题,因为 MemoryStream
在读取数据时不会动态调整大小。解决这个问题的经典方法是创建一个共享数据结构,例如字节数组,并实现一个锁定机制来确保线程安全。这需要时间来实现,并且可能导致难以发现的 bug。
PipeStream
通过将共享数据结构抽象为 Stream
接口来解决这些问题,使得在线程之间管道传输数据变得容易。
背景
我在开发一个音频文件转码器时构建了这个工具,特别是要将编码为 *.mp3 的有声读物转换为 *.aac(或 *.m4b)以便在 iPod 上使用。由于 *.m4b 的设计目的是在会话之间保留其位置,所以我决定将我现有的多个 *.mp3 有声读物合并成单个 *.m4b 文件。我的第一次尝试是使用几个管道进行一个长字符串命令,但这在一些 MP3 文件具有不兼容参数时失败了。所以,我开始第二次尝试,将 FAAC 编码器包装在 System.Diagnostics.Process
中,通过 StandardInput
流写入,并从 StandardOutput
读取。虽然 FAAC 能够直接从 MP3 编码,但它无法将多个输入文件合并成一个输出。这意味着我需要创建一个连接的独立 MP3 流作为 FAAC 的输入。我决定一个简单的方法是使用包装的 LAME 进程将 MP3 解码为原始 Wave 流,并将该流馈送给 FAAC 的输入。这个策略是成功的,但涉及到尝试在两个进程之间流式传输数据,而这两个进程最好在单独的线程中运行。MemoryStream
可以 包装成线程安全的 shell,但我发现对于较大的有声读物,我会耗尽内存。在试验了其他解决方案后,我尝试构建这个 PipeStream
,我希望它能对有类似场景的人有用。
对于不熟悉 管道 的人来说,它仅仅是在命令行中将一个进程的输出重定向到另一个进程的输入的一种方式,而无需使用任何中间数据存储。我喜欢用一个熟悉的比喻来概念化它——电子游戏!
Waka waka waka... 咳咳。所以,光轮产生一系列能量电池并写入其标准输出。这些能量电池被捕获并缓冲在管道中,供 waka 男通过其标准输入读取。同样的想法也适用于 PipeStream
,它只是将进程替换为线程,能量电池替换为数据字节,waka 男... 开玩笑的,waka 男在这两者中都是一样的。你懂的。
Using the Code
一般来说,在需要大量内存跨线程传输的情况下,像使用其他流一样使用 PipeStream
。例如(一个简单的例子)
-
首先,在生成类中创建
PipeStream
PipeStream mPipeStream; // the shared stream public void ReadWriteMultiThreadTests() { mPipeStream = new PipeStream(); // create some threads to read and write data using PipeStream Thread readThread = new Thread(new ThreadStart(ReadThread)); Thread writeThread = new Thread(new ThreadStart(WriteThread)); readThread.Start(); writeThread.Start(); writeThread.Join(); readThread.Join(); }
-
然后,在生产者线程中写入...
private void WriterThread() { string inputFile = File.ReadToEnd("myFile.txt"); int writeSize = 1024; for (int i = 0; i < str.Length; i += writeSize) { // select a substring of characters from the input string string substring = str.Substring(i, (i + writeSize < str.Length) ? writeSize : str.Length - i); sw.Write(substring.ToCharArray(), 0, substring.Length); } }
-
...最后,在消费者线程中从
PipeStream
读取数据private void ReaderThread() { char[] buffer = new char[80]; while (!sr.EndOfStream) { int readLength = sr.Read(buffer, 0, buffer.Length); // do something productive with buffer } }
我为 Stream
接口添加了一些额外的属性
MaxBufferLength
:获取或设置要在缓冲区中存储的最大字节数。BlockLastReadBuffer
:获取或设置一个值,该值指示在缓冲区变空之前是否阻止最后一个读取方法。
第二个属性在将多个流写入一个读取器的情况下很有价值——在写入器完成并且此属性设置为 false
之前,不会发生最后一个读取。
- 当设置为
true
时,Read()
将阻塞直到它可以填充传入的缓冲区和计数。 - 当设置为
false
时,Read()
不会阻塞,而是返回所有可用的缓冲区数据。
关注点
请注意,底层数据结构是 Queue<byte>
,因此其效率比 MemoryStream
低一个数量级。因此,PipeStream
最适用于 CPU 密集型进程,例如媒体编码,但也适用于需要传输大量数据的“写入-读取-遗忘”场景。
相关文章
- 分段同步访问流数据 - 解决通过分隔访问请求来在多个线程中访问单个流的问题
- C# 进程间同步和通信框架 - 在进程之间传递信息
历史
- 2006-10-17 - 版本 1.0
- 2008-10-9 - 版本 1.1 - 使用
Monitor
而不是手动重置事件,以实现更优雅的同步