通过 WCF 上传大数据






4.80/5 (16投票s)
通过 WCF 服务高效地分块上传大型数据文件,并使用 Entity Framework 将数据存储在 SQL Server 中
背景
通过网络传输大量数据一直是一个挑战。存在几种从不同角度解决此问题的方案。这些解决方案中的每一种都提出了不同的使用场景,并各有优缺点。以下是 3 种主要选项:
- FTP。虽然 FTP 速度快且效率高,但并不容易使用。它没有围绕它设计的良好 API 或对象模型,存在防火墙问题,并且在数据传输期间需要保持长时间会话打开,这不应该被中断。此外,FTP 基础架构需要配置和维护工作。
- 流式传输。流式传输是高效地通过 HTTP 或 TCP 传输大量数据的解决方案。但是,它不支持可靠的消息传递,并且在高流量场景下扩展性不佳。
- 分块。此解决方案,在所有其他解决方案中,存在与分块数据并将其重新组合成单个流相关的一些开销。但是,通过高效的架构设计,大部分开销都可以消除。分块解决方案非常适合高流量场景中的大数据传输,可以支持可靠的消息传递、消息和传输级别的安全性,提供高效的内存利用率,并在连接失败的情况下提供一种实现部分上传的直接方法。
在本文中,我们将介绍分块解决方案的一种可能实现。
引言
当前解决方案包括客户端(WPF UI 应用程序)和服务器(WCF 控制台应用程序)。在服务器端,我们有一个 WCF 服务,公开了多种操作数据文件的方法。它使用 Entity Framework Code First 方法与 SQL Server 进行交互以持久化数据。
在数据库端,我们有两个表 - BlobFiles
和 BlobFileChunks
。BlobFiles
表中的每条记录都对应于上传文件的描述,并包含其 Id (Guid)、名称、描述、大小、创建者用户 Id 和创建时间戳。另一方面,BlobFileChunks
是一个详细表,通过外键引用 BlobFiles
。此表中的每条记录都以二进制格式表示数据块,以及它在存储在 ChunkId
列中的原始文件中的顺序号(位置)。
在客户端,我们有一个网格,显示从数据库中提取的数据文件,以及允许刷新数据、上传新文件、删除现有文件、计算文件哈希码并将文件保存到磁盘的几个按钮。后两个操作纯粹是演示性的,用于模拟服务器端对文件内容的实际使用。
Using the Code
在使用代码之前,请确保服务器的 App.config
文件中的连接设置已正确配置。默认情况下,它使用具有集成安全性的本地 SQL Server 实例。
<entityFramework> <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework"> <parameters> <parameter value="Data Source=.;Integrated Security=True;MultipleActiveResultSets=True;" /> </parameters> </defaultConnectionFactory> <providers> <provider invariantName="System.Data.SqlClient" type="System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer" /> </providers> </entityFramework>
由于项目包含配置了自动数据库迁移的 EF,因此在首次运行时将自动创建数据库。
不深入细节,让我们专注于解决方案的关键点 - 上传块然后将它们组合在一起的机制。
分块
在客户端,使用以下代码将块上传到服务器
using (var service = new FileUploadServiceClient()) { var fileId = await service.CreateBlobFileAsync(Path.GetFileName(fileName), fileName, new FileInfo(fileName).Length, Environment.UserName); using (var stream = File.OpenRead(fileName)) { var edb = new ExecutionDataflowBlockOptions {BoundedCapacity = 5, MaxDegreeOfParallelism = 5}; var ab = new ActionBlock<Tuple<byte[], int>>(x => service.AddBlobFileChunkAsync(fileId, x.Item2, x.Item1), edb); foreach (var item in stream.GetByteChunks(chunkSize).Select((x, i) => Tuple.Create(x, i))) await ab.SendAsync(item); ab.Complete(); await ab.Completion; } }
如您所见,代码中使用了 ActionBlock
类(来自强大的库 TPL Dataflow),它抽象了异步并行处理,允许一次发送最多 5 个块(MaxDegreeOfParallelism
),并在内存中维护一个 5 个块的缓冲区(BoundedCapacity
)。
通过调用 CreateBlobFileAsync
,我们创建了具有文件详细信息的母记录并获取了 BlobFileId
。然后,我们打开一个文件流,调用扩展方法 GetByteChunks
,该方法返回一个 IEnumerable<byte[]>
。此可枚举对象会即时进行评估,并且每个元素都包含一个指定大小的字节数组(显然,最后一个元素可能包含较少的数据)。块大小默认为 2MB,可以根据使用场景进行调整。这些块及其顺序 ID 通过 AddBlobFileChunkAsync
方法并行发送到服务器。请注意,只要它们的顺序 ID 代表它们在文件中的实际位置,块就不必一个接一个地发送。
下面展示了 GetByteChunks
扩展方法的实现
public static IEnumerable<byte[]> GetByteChunks(this Stream stream, int length) { if (stream == null) throw new ArgumentNullException("stream"); var buffer = new byte[length]; int count; while ((count = stream.Read(buffer, 0, buffer.Length)) != 0) { var result = new byte[count]; Array.Copy(buffer, 0, result, 0, count); yield return result; } }
组合块
让我们探索服务器端的 ProcessFileAsync
方法
public async Task<string> ProcessFileAsync(Guid blobFileId) { List<int> chunks; using (var context = new FileUploadDemoContext()) { chunks = await context.Set<BlobFileChunks>() .Where(x => x.BlobFileId == blobFileId) .OrderBy(x => x.ChunkId) .Select(x => x.ChunkId) .ToListAsync(); } var result = 0; using (var stream = new MultiStream(GetBlobStreams(blobFileId, chunks))) { foreach (var chunk in stream.GetByteChunks(1024)) result = (result*31) ^ ComputeHash(chunk); } return string.Format("File Hash Code is: {0}", result); }
此方法以惰性方式读取整个文件内容并计算其哈希码。首先,我们从数据库中检索与给定 BlobFileId
相关的所有块,并按 ChunkId
排序。然后将这些块 ID 传递给 GetByteChunks
辅助方法,该方法按需逐个从数据库检索这些块的实际二进制内容。
MultiStream
类在组合块方面起着关键作用,如下所述。
MultiStream 类
MultiStream
继承自标准的 Stream
类,并表示一个支持向前搜索的只读流。Seek
方法确保位置永远不会向后移动。
public override long Seek(long offset, SeekOrigin origin) { switch (origin) { case SeekOrigin.Begin: m_position = offset; break; case SeekOrigin.Current: m_position += offset; break; case SeekOrigin.End: m_position = m_length - offset; break; } if (m_position > m_length) m_position = m_length; if (m_position < m_minPosition) { m_position = m_minPosition; throw new NotSupportedException("Cannot seek backwards"); } return m_position; }
在 Read
方法中,我们按“需要”的基础提取块,直到达到最后一个块。这样,一次只有一个块保留在内存中,这非常高效且可扩展。
public override int Read(byte[] buffer, int offset, int count) { var result = 0; while (true) { if (m_stream == null) { if (!m_streamEnum.MoveNext()) { m_length = m_position; break; } m_stream = m_streamEnum.Current; } if (m_position >= m_minPosition + m_stream.Length) { m_minPosition += m_stream.Length; m_stream.Dispose(); m_stream = null; } else { m_stream.Position = m_position - m_minPosition; var bytesRead = m_stream.Read(buffer, offset, count); result += bytesRead; offset += bytesRead; m_position += bytesRead; if (bytesRead < count) { count -= bytesRead; m_minPosition += m_stream.Length; m_stream.Dispose(); m_stream = null; } else break; } } return result; }
结论
- 当前方法要求
Client
和Server
都了解协议。 - 该解决方案非常适合内网和互联网场景。
- 如果需要,可以将 DB 持久化层轻松替换为文件系统。
- 可以在当前架构之上轻松实现可靠的消息传递。
- 仍有很大的改进空间,请随时提出您的建议或疑虑。