65.9K
CodeProject 正在变化。 阅读更多。
Home

通过 WCF 上传大数据

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.80/5 (16投票s)

2014年11月14日

CPOL

4分钟阅读

viewsIcon

43855

downloadIcon

2063

通过 WCF 服务高效地分块上传大型数据文件,并使用 Entity Framework 将数据存储在 SQL Server 中

 

背景

通过网络传输大量数据一直是一个挑战。存在几种从不同角度解决此问题的方案。这些解决方案中的每一种都提出了不同的使用场景,并各有优缺点。以下是 3 种主要选项:

  • FTP。虽然 FTP 速度快且效率高,但并不容易使用。它没有围绕它设计的良好 API 或对象模型,存在防火墙问题,并且在数据传输期间需要保持长时间会话打开,这不应该被中断。此外,FTP 基础架构需要配置和维护工作。
  • 流式传输。流式传输是高效地通过 HTTP 或 TCP 传输大量数据的解决方案。但是,它不支持可靠的消息传递,并且在高流量场景下扩展性不佳。
  • 分块。此解决方案,在所有其他解决方案中,存在与分块数据并将其重新组合成单个流相关的一些开销。但是,通过高效的架构设计,大部分开销都可以消除。分块解决方案非常适合高流量场景中的大数据传输,可以支持可靠的消息传递、消息和传输级别的安全性,提供高效的内存利用率,并在连接失败的情况下提供一种实现部分上传的直接方法。

在本文中,我们将介绍分块解决方案的一种可能实现。

引言

当前解决方案包括客户端(WPF UI 应用程序)和服务器(WCF 控制台应用程序)。在服务器端,我们有一个 WCF 服务,公开了多种操作数据文件的方法。它使用 Entity Framework Code First 方法与 SQL Server 进行交互以持久化数据。

在数据库端,我们有两个表 - BlobFilesBlobFileChunksBlobFiles 表中的每条记录都对应于上传文件的描述,并包含其 Id (Guid)、名称、描述、大小、创建者用户 Id 和创建时间戳。另一方面,BlobFileChunks 是一个详细表,通过外键引用 BlobFiles。此表中的每条记录都以二进制格式表示数据块,以及它在存储在 ChunkId 列中的原始文件中的顺序号(位置)。

在客户端,我们有一个网格,显示从数据库中提取的数据文件,以及允许刷新数据、上传新文件、删除现有文件、计算文件哈希码并将文件保存到磁盘的几个按钮。后两个操作纯粹是演示性的,用于模拟服务器端对文件内容的实际使用。

Using the Code

在使用代码之前,请确保服务器的 App.config 文件中的连接设置已正确配置。默认情况下,它使用具有集成安全性的本地 SQL Server 实例。

<entityFramework>
  <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework">
    <parameters>
      <parameter value="Data Source=.;Integrated Security=True;MultipleActiveResultSets=True;" />
    </parameters>
  </defaultConnectionFactory>
  <providers>
    <provider invariantName="System.Data.SqlClient" 
              type="System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer" />
  </providers>
</entityFramework>

由于项目包含配置了自动数据库迁移的 EF,因此在首次运行时将自动创建数据库。

不深入细节,让我们专注于解决方案的关键点 - 上传块然后将它们组合在一起的机制。

分块

在客户端,使用以下代码将块上传到服务器

using (var service = new FileUploadServiceClient())
{
  var fileId = await service.CreateBlobFileAsync(Path.GetFileName(fileName),
    fileName, new FileInfo(fileName).Length, Environment.UserName);

  using (var stream = File.OpenRead(fileName))
  {
    var edb = new ExecutionDataflowBlockOptions {BoundedCapacity = 5, MaxDegreeOfParallelism = 5};

    var ab = new ActionBlock<Tuple<byte[], int>>(x => service.AddBlobFileChunkAsync(fileId, x.Item2, x.Item1), edb);

    foreach (var item in stream.GetByteChunks(chunkSize).Select((x, i) => Tuple.Create(x, i)))
      await ab.SendAsync(item);

    ab.Complete();

    await ab.Completion;
  }
}

如您所见,代码中使用了 ActionBlock 类(来自强大的库 TPL Dataflow),它抽象了异步并行处理,允许一次发送最多 5 个块(MaxDegreeOfParallelism),并在内存中维护一个 5 个块的缓冲区(BoundedCapacity)。

通过调用 CreateBlobFileAsync,我们创建了具有文件详细信息的母记录并获取了 BlobFileId。然后,我们打开一个文件流,调用扩展方法 GetByteChunks,该方法返回一个 IEnumerable<byte[]>。此可枚举对象会即时进行评估,并且每个元素都包含一个指定大小的字节数组(显然,最后一个元素可能包含较少的数据)。块大小默认为 2MB,可以根据使用场景进行调整。这些块及其顺序 ID 通过 AddBlobFileChunkAsync 方法并行发送到服务器。请注意,只要它们的顺序 ID 代表它们在文件中的实际位置,块就不必一个接一个地发送。

下面展示了 GetByteChunks 扩展方法的实现

 public static IEnumerable<byte[]> GetByteChunks(this Stream stream, int length)
{
  if (stream == null)
    throw new ArgumentNullException("stream");

  var buffer = new byte[length];
  int count;
  while ((count = stream.Read(buffer, 0, buffer.Length)) != 0)
  {
    var result = new byte[count];
    Array.Copy(buffer, 0, result, 0, count);
    yield return result;
  }
}

组合块

让我们探索服务器端的 ProcessFileAsync 方法

 public async Task<string> ProcessFileAsync(Guid blobFileId)
{
  List<int> chunks;
  using (var context = new FileUploadDemoContext())
  {
    chunks = await context.Set<BlobFileChunks>()
      .Where(x => x.BlobFileId == blobFileId)
      .OrderBy(x => x.ChunkId)
      .Select(x => x.ChunkId)
      .ToListAsync();
  }

  var result = 0;

  using (var stream = new MultiStream(GetBlobStreams(blobFileId, chunks)))
  {
    foreach (var chunk in stream.GetByteChunks(1024))
      result = (result*31) ^ ComputeHash(chunk);
  }

  return string.Format("File Hash Code is: {0}", result);
}

此方法以惰性方式读取整个文件内容并计算其哈希码。首先,我们从数据库中检索与给定 BlobFileId 相关的所有块,并按 ChunkId 排序。然后将这些块 ID 传递给 GetByteChunks 辅助方法,该方法按需逐个从数据库检索这些块的实际二进制内容。

MultiStream 类在组合块方面起着关键作用,如下所述。

MultiStream 类

MultiStream 继承自标准的 Stream 类,并表示一个支持向前搜索的只读流。Seek 方法确保位置永远不会向后移动。

public override long Seek(long offset, SeekOrigin origin)
{
  switch (origin)
  {
    case SeekOrigin.Begin:
      m_position = offset;
      break;
    case SeekOrigin.Current:
      m_position += offset;
      break;
    case SeekOrigin.End:
      m_position = m_length - offset;
      break;
  }

  if (m_position > m_length)
    m_position = m_length;

  if (m_position < m_minPosition)
  {
    m_position = m_minPosition;
    throw new NotSupportedException("Cannot seek backwards");
  }

  return m_position;
}

Read 方法中,我们按“需要”的基础提取块,直到达到最后一个块。这样,一次只有一个块保留在内存中,这非常高效且可扩展。

public override int Read(byte[] buffer, int offset, int count)
{
  var result = 0;

  while (true)
  {
    if (m_stream == null)
    {
      if (!m_streamEnum.MoveNext())
      {
        m_length = m_position;
        break;
      }
      m_stream = m_streamEnum.Current;
    }

    if (m_position >= m_minPosition + m_stream.Length)
    {
      m_minPosition += m_stream.Length;
      m_stream.Dispose();
      m_stream = null;
    }
    else
    {
      m_stream.Position = m_position - m_minPosition;
      var bytesRead = m_stream.Read(buffer, offset, count);
      result += bytesRead;
      offset += bytesRead;
      m_position += bytesRead;
      if (bytesRead < count)
      {
        count -= bytesRead;
        m_minPosition += m_stream.Length;
        m_stream.Dispose();
        m_stream = null;
      }
      else
        break;
    }
  }

  return result;
}

结论

  • 当前方法要求 ClientServer 都了解协议。
  • 该解决方案非常适合内网和互联网场景。
  • 如果需要,可以将 DB 持久化层轻松替换为文件系统。
  • 可以在当前架构之上轻松实现可靠的消息传递。
  • 仍有很大的改进空间,请随时提出您的建议或疑虑。
© . All rights reserved.