65.9K
CodeProject 正在变化。 阅读更多。
Home

在后台线程上从 Web API 流式传输大型结果集到 WPF

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2014年12月25日

CPOL

5分钟阅读

viewsIcon

27039

downloadIcon

564

在后台线程上将大型结果集从 Web API 流式传输到 WPF, 并在 Datagrid 上显示。

引言

我最近在做一个 N 层应用程序,我想探索的一个想法是从服务器端将大型结果集流式传输到客户端,并实时显示数据。该应用程序使用 ASP.NET Web API 将大型结果集流式传输到 WPF 客户端的后台线程,并在接收到内容后立即显示在屏幕上。我还将展示如何使用 Dapper(一个简单的 ORM)通过 yield return 按记录返回结果。本文将解释创建示例解决方案的步骤。

背景

我将通过示例代码来演示这个想法,该代码允许用户按名字搜索一个人,并显示匹配搜索条件的姓名列表。

Using the Code

该解决方案包含 2 个项目:服务项目和 WpfClient 项目。该解决方案应包含运行示例代码所需的所有内容。您只需要确保安装了 .NET Framework 4.5,因为我的代码使用了新的 async await 关键字。为了调试,请同时启动这两个项目。

服务项目

服务项目是一个 Web API MVC4 项目。我有 2 个控制器:Home Controller 和 Name Controller。Home Controller 是创建项目时生成的默认控制器,在本例中未使用,所以我们将忽略它。Name Controller 包含我们的服务,它们将演示 Web API 的流式传输功能。

Name Controller 中有两个 Action。分别是 SimulateLargeResultSet Action 和 UsingDapper Action。为了正确地将请求路由到相应的 Action,我们需要在 WebApiConfig.cs 中添加以下路由。

public static void Register(HttpConfiguration config)
        { 
            config.Routes.MapHttpRoute(
                name: "Api",
                routeTemplate: "api/{controller}/{action}",
                defaults: new {first = string.Empty} 
            ); 
        }

SimulateLargeResultSet Action 演示了当 SQL Server 返回大型结果集时会发生的情况。检索大型结果集需要时间。为了模拟延迟,在 for 循环的末尾添加了 Thread.Sleep(250) 。它将创建一个带有 Id 和名称的 Person 对象返回给客户端。

Action 首先创建一个 HttpResponseMessage,这在 Web API Action 中很常见。

HttpResponseMessage response = Request.CreateResponse();

然后,它将 HttpResponseMessage.Content 设置为 PushStreamContentPushStreamContent 是所有魔术发生的地方,因为它实际上支持数据生产者希望使用 stream 直接写入(同步或异步)的场景。

response.Content = new PushStreamContent(....); 

最后,该 Action 返回响应以建立 stream,这在常规 Web API Action 中也很常见。

return response;

PushStreamContent 的一个重载构造函数接受一个 Action 和 mediaType,我们将将其指定为 "text/plain"。该 Action 将每隔四分之一秒将一个 Json 序列化的 Person 对象写入输出 stream

for (int i = 0; i < 20000; i++ )
{ 
    var name = new Person()
    {
         Id = i,
         Name = string.Format("Name for Id: {0}", i),
     };
 
     var str = await JsonConvert.SerializeObjectAsync(name);
 
     var buffer = UTF8Encoding.UTF8.GetBytes(str);
 
     // Write out data to output stream
     await outputStream.WriteAsync(buffer, 0, buffer.Length);
 
     //simulate each additional row scan time required in sql server
     //when we are retrieving many rows
     Thread.Sleep(250);//wait a quarter of a second
 }

示例代码中的另一个 Action 是 UsingDapper Action,它默认不使用,因为它需要设置 SQL Server 数据库。其逻辑与 SimulateLargeResultSet Action 非常相似,但它包含连接到 SQL 数据库并从 Name 表中读取的代码。您需要创建数据库结构并更改连接 string 才能使其正常工作。我使用的是 Dapper v1.12.0.0 来执行 SQL 查询并为每条记录 yield return 一个 person 对象。这样一来,就不必等到整个结果返回后再将输出推送到客户端。我将 buffered 参数设置为 false ,并将 commandtimeout 设置为无限。

var persons = connection.Query<Person>(dynamicQuery,
                                new { FirstName = first },//define the parameter
                                buffered: false, //yield return for each reader
                                commandTimeout: 0, //indefinitely
                                commandType: CommandType.Text);

每个 foreach 执行将从数据库读取结果,将其序列化为 Json,然后写入输出 stream

foreach (var person in persons)
{
     var str = await JsonConvert.SerializeObjectAsync(person);
 
     var buffer = UTF8Encoding.UTF8.GetBytes(str);
 
     // Write out data to output stream
     await outputStream.WriteAsync(buffer, 0, buffer.Length);                                
}

WPF 客户端项目

WPF 客户端项目使用 Prism 4.1 和 MVVM 方法完成。运行示例代码时,您会看到一个 First Name 文本框,用户可以在其中输入要搜索的姓名,以及一个 datagrid ,它将显示来自服务项目的所有结果。

MainWindow.xaml 中有一个名为 MainWindowViewModel ViewModel,其构造函数中包含以下代码行。

BindingOperations.EnableCollectionSynchronization(Persons, _personsLock);

此代码行允许从后台线程修改 Observable 集合,而不会导致 WPF 抛出错误。WPF 要求所有 UI 更改代码都在调度器线程(即 UI 线程)上运行。在较新版本的 .NET Framework 中,它会自动将所有 INotifyPropertyChanged.PropertyChanged marshalling 到 UI 线程,但它不会对 INotifyCollectionChanged.CollectionChanged 进行此操作。您必须使用 EnableCollectionSynchronization(...) 或手动将添加/删除操作 dispatch 回 UI 线程。我发现使用 BindingOperations 比使用调度器快得多。

DataGrid 绑定到 MainWindowViewModel 类中的 Persons 属性,该属性是 ObservableCollection<Person> 类型,我们将实时向其中添加 person 对象。

public ObservableCollection<Person> Persons
        {
            get
            {
                if (_persons == null)
                {
                    _persons = new ObservableCollection<Person>();
                }
                return _persons;
            }
            set
            {
                _persons = value;
                RaisePropertyChanged(() => Persons);
            }
        }

在 View Model 中,有两个命令:Search Command 和 Cancel Command。Search 命令只是启动一个后台线程,调用 Web API 服务流,并返回一个 Person 列表。然后,它会将 person 对象添加到 Persons ObservableCollection 中,该集合将在 UI 中实时反映出来。

using (Stream stream = await response.Content.ReadAsStreamAsync())
{
      // Read response stream
      byte[] readBuffer = new byte[512];    //TODO: make this not fail when 
                        //response object is larger than 512 byte
      int bytesRead = 0;
      while ((bytesRead = stream.Read(readBuffer, 0, readBuffer.Length)) != 0)
      {
            ct.ThrowIfCancellationRequested();
            string personString = Encoding.UTF8.GetString(readBuffer, 0, bytesRead);
 
            var person = JsonConvert.DeserializeObject<Person>(personString);
            persons.Add(person); 
       }
}

While 循环的开始处,它会检查取消令牌是否已被请求。

ct.ThrowIfCancellationRequested()

如果请求了,它将停止从服务流式传输。Cancellation 令牌已连接到 Cancel 命令,该命令又连接到 Cancel 按钮。因此,当用户单击 Cancel 按钮时,它将停止 stream

如果点击了取消按钮,服务方将收到一个 ErrorCode == -2147023667 的异常。

catch (HttpException ex)
{
    if (ex.ErrorCode == -2147023667) // The remote host closed the connection. 
    {
         return;
    }
    
}

结论

好了,各位。希望大家喜欢阅读这篇文章,就像我喜欢写它一样,并觉得这篇文章有帮助。谢谢。

© . All rights reserved.