通过 Twitter 和 Bing Maps 组合探索 Reactive Extensions (Rx)






4.93/5 (33投票s)
在本文中,我将简要介绍响应式扩展,然后通过创建一个 Twitter / Bing Maps 混搭应用来探索它们在 Silverlight 中的使用。
注意:您可以在 我的博客 上查看 Silverlight 示例的实际运行效果。
目录
引言
无论是响应用户输入还是处理来自 Web 服务的响应,Silverlight 应用程序通常都是异步的。该框架提供了 UI 控件,这些控件会响应用户交互而触发事件。还有像 DispatcherTimer
和 WebControl
这样的类,它们会执行一些后台工作,触发事件,并将这些事件方便地封送到 UI 线程。然而,Silverlight 和其他 .NET UI 框架缺乏一种编排这些异步活动的方式。
在本文中,我将探讨响应式扩展 (Rx) 库如何为异步事件和操作提供通用接口,从而提供一种一致的方式来组合、编排和封送跨线程的事件(同时也在此过程中创造一些有趣的混搭应用!)。
本文将逐步构建几个 Rx 驱动的 Silverlight 应用程序,最后一个是一个 Twitter / Bing Maps 混搭应用,它显示了基于用户对 #uksnow 标签的推文的英国降雪地点。
什么是 Rx?
Rx 库的“使命宣言”可以总结如下:
Rx 是一个使用可观察集合组合异步和基于事件的程序的库。
实际上,Rx 提供了一个类似 Linq 的 API 来处理事件,允许开发人员以简洁且熟悉的方式聚合、过滤和查询它们。
在接下来的几节中,我将对 Rx 库进行非常简短的概述,其余部分将侧重于实际且有趣的示例。对于更详细的教程,我建议阅读“DEVHOL202 – 使用 .NET 的响应式扩展解决异步难题”。
获取 Rx 库
Rx 库可从 Rx DevLabs 页面下载。
下载并安装后,添加对 System.Reactive
、System.Observable
和 System.CoreEx
的引用,即可开始使用。
IEnumerable 的“拉取”模型
大概最容易快速理解 Rx 的方法就是将其与 Linq 进行比较。以下示例显示了一些简单的逻辑,用于查找一小序列中的所有奇数。
List<int> numbers = new List<int>()
{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
};
foreach (int number in numbers)
{
if (number % 2 == 1)
{
Debug.WriteLine(number);
}
}
运行上述代码将产生以下输出:
1
3
5
7
9
我们可以使用 Linq 的 Where
运算符重写这个简单的算法,如下所示:
var oddNumbers = numbers.Where(n => n % 2 == 1);
foreach (int number in oddNumbers)
{
Debug.WriteLine(number);
}
Linq 版查找奇数代码产生了与“手动”方法相同的结果,但是,执行方式却大不相同。Linq 查询是通过将 Where
运算符应用于源数据来构造的,然而,此时条件并未对源进行求值。如果我们展开 foreach
循环,可以看到它使用了从查询结果获得的枚举器。
IEnumerable<int> oddNumbers = numbers.Where(n => n % 2 == 1);
IEnumerator<int> enumerator = oddNumbers.GetEnumerator();
while (enumerator.MoveNext())
{
Debug.WriteLine((int)enumerator.Current);
}
(注意:展开实际上会添加一个 try
/ finally
块,请参阅 C# 语言参考)。
枚举器上每次调用 MoveNext
方法都是从我们的查询中“拉取”数据,并在需要时对源的每个元素求值该条件。这种“拉取”模型导致查询的延迟执行(或惰性求值,取决于您偏好的术语)。在上面的示例中,我们的 IEnumerabl
e 源是一个固定大小的列表,尽管更普遍地说,它只是一个我们可以从中拉取项目的数据源,并且不必具有固定的大小。
IObservable 的“推送”模型
我们使用 Rx 库可以实现完全相同的结果,即创建一个奇数列表,如下所示:
List<int> numbers = new List<int>()
{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
};
var observableNumbers = numbers.ToObservable();
var oddNumbers = observableNumbers.Where(n => n % 2 == 1);
oddNumbers.Subscribe(number => Debug.WriteLine(number));
同样,上述代码的输出与 Linq 等效代码完全相同。ToObservable
扩展方法返回一个 IObservable
,它是 Rx 中与 IEnumerable
接口相对应的接口,因为它是一个项目源。Rx 库为操作 IObservable
数据源定义了许多类似 Linq 的扩展方法,如果您通过 Intellisense 探索上面的代码,您会发现许多熟悉的方法(Where
、Select
、Max
、SelectMany
...)。
Rx 库还定义了 IObserver
接口,它是 IEnumerator
接口的对应接口,该接口被 foreach
语法“隐藏”了。IObservable
有一个 Subscribe
方法,您可以在其中提供一个 IObserver
。Observable 源将在项目被推送时调用 IObservable
上的 OnNext
方法。通常 IObserver
被 IObservable
上的 Subscribe
扩展方法隐藏,这些扩展方法会为您创建一个 IObserver
实例,并在 Observable 源调用 OnNext
时调用您的委托方法。
那么,我们已经看到了 Linq 和 Rx 之间的相似之处,但它们之间有什么区别呢?
关键区别在于什么“驱动”了执行。对于 Linq,我们迭代查询结果,“拉取”来自 IEnumerable
源的项目。对于 Rx,一旦做出对 IObservable
源的订阅,项目就会被“推送”到我们的订阅者。
上面这个微不足道的示例是为了强调 Rx 和 Linq 之间的相似性,最终结果是完全相同的。Rx 的优势在于它提供的用于从事件或异步 Web 服务调用创建 IObservable
源的扩展方法,允许开发人员应用熟悉的 Linq 风格操作。接下来我们将看一个稍微更有趣的例子,使用 Rx 来处理和操作事件。
从事件创建 Observable
您可以通过 FromEvent
工厂方法从事件创建 Observable 源,您需要提供事件源(在本例中为 TextBox
)和事件名称。当您订阅此源时,每次触发事件时都会执行您的代码。
// create an observable source from a TextChanged event
var textChangedSource = Observable.FromEvent<TextChangedEventArgs>
(searchTextBox, "TextChanged");
// subscribe to this source
textChangedSource.Subscribe(e => Debug.WriteLine(((TextBox)e.Sender).Text));
键入文本“reactive”会产生以下输出:
r
re
rea
reac
react
reacti
reactiv
reactive
注意:如果上面示例中硬编码的事件字符串让您不适,还有一个稍微更繁琐的 FromEvent
重载,它允许您显式指定添加和删除处理程序的操作。
虽然上面的示例不是特别令人兴奋,但现在我们已经将事件打包成 Observable,我们可以执行类似 Linq 的查询。在下面的示例中,使用 Select
投影运算符创建了一个只包含 TextBox
文本的 Observable。然后通过第二个 Select
进行转换,创建一个提供长度更改“事件”的 Observable。
// create an observable source from a TextChanged event
var textChanged = Observable.FromEvent<TextChangedEventArgs>
(searchTextBox, "TextChanged")
.Select(e => ((TextBox)e.Sender).Text);
// create a 'derived' event
var textLengthChanged = textChanged.Select(txt => txt.Length);
// subscribe to these two sources
textLengthChanged.Subscribe(len => Debug.WriteLine("Length: " + len.ToString()));
textChanged.Subscribe(txt => Debug.WriteLine("Text: " + txt));
同样,键入“reactive”会产生以下输出:
Length: 1
Text: r
Length: 2
Text: re
Length: 3
Text: rea
Length: 4
Text: reac
Length: 5
Text: react
Length: 6
Text: reacti
Length: 7
Text: reactiv
Length: 8
Text: reactive
超级 Rx 示例
使用 Rx,可以以简洁的方式执行基于事件的相当强大的逻辑。如果您将以下代码粘贴到一个新的 Silverlight 项目中,它将提供一个简单的绘图应用程序。
// transform the mouse move event into an observable source of screen coordinates
var mouseMoveEvent = Observable.FromEvent<MouseEventArgs>(this, "MouseMove")
.Select(e => e.EventArgs.GetPosition(this));
// create observable sources from the left button events
var mouseLeftButtonDown = Observable.FromEvent<MouseButtonEventArgs>
(this, "MouseLeftButtonDown");
var mouseLeftButtonUp = Observable.FromEvent<MouseButtonEventArgs>
(this, "MouseLeftButtonUp");
// create a 'drag event', which takes the delta in mouse movements
// when the left button is down
var draggingEvents = mouseMoveEvent.SkipUntil(mouseLeftButtonDown)
.TakeUntil(mouseLeftButtonUp)
.Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
new
{
X2 = cur.X,
X1 = prev.X,
Y2 = cur.Y,
Y1 = prev.Y
})).Repeat();
// subscribe and draw lines
draggingEvents.Subscribe(
p => {
Line line = new Line();
line.StrokeThickness = 2;
line.Stroke = new SolidColorBrush(Colors.Black);
line.X1 = p.X1;
line.Y1 = p.Y1;
line.X2 = p.X2;
line.Y2 = p.Y2;
this.LayoutRoot.Children.Add(line);
});
这里有一些强大的功能正在运行,从将鼠标移动事件转换为更易于使用的屏幕坐标源,通过 SkipUntil
和 TakeWhile
进行事件组合,以及通过 Let
和 Zip
创建增量。
我将不详细描述上面这个超级示例,因为尽管它演示了一些非常强大的技术,但这并不是编写 Silverlight(或 WPF / WinForms)应用程序时通常需要解决的问题。与其关注如何组合多个 UI 事件,不如关注 Rx 如何用于编排异步 Web 请求。
Twitter 即时搜索
这个第一个示例应用程序提供了一个 Google Instant 风格的 Twitter 搜索,即在您键入时查询 Twitter。您现在每秒可以节省 2-5 秒的 Twitter 搜索时间(如果您不明白这个引用,请查看上面的链接)。
处理用户输入
我们将从一个新的 Silverlight 项目开始,并在 MainPage.xaml 中添加一个 TextBox
。
<UserControl x:Class="TwitterRx.MainPage" ...>
<Grid x:Name="LayoutRoot" Background="White">
<TextBox x:Name="searchTextBox"/>
</Grid>
</UserControl>
在 MainPage
的构造函数中,我们可以使用 Rx 从 TextChanged
事件创建 Observable 源。
Observable.FromEvent<TextChangedEventArgs>(searchTextBox, "TextChanged")
.Select(e => ((TextBox)e.Sender).Text) // #1
.Where(text => text.Length > 2) // #2
.Do(txt => Log("TextChanged: " + txt)) // #3
.Throttle(TimeSpan.FromMilliseconds(400)) // #4
.Subscribe(txt => Log("Throttle TextChanged: " + txt)); // #5
private void Log(string text)
{
Debug.WriteLine("[" + Thread.CurrentThread.ManagedThreadId + "] " + text);
}
上面的 Rx 查询做了几件事:
- 使用
Select
投影运算符创建仅包含文本的 Observable 源。 - 使用
Where
运算符删除长度小于等于 2 个字符的文本,对于搜索应用程序,搜索非常短的单词或单个字符几乎没有意义。 Do
运算符允许您在项目从源推送到订阅者时执行某些操作,在这里我们记录输出。Throttle
运算符会忽略在给定时间间隔内到达的项目。这意味着后面的订阅者只会在用户暂停 400 毫秒后接收数据。- 最后,记录输出的订阅。
如果我键入“reactive library”,单词之间有短暂的停顿,输出如下:
[1] TextChanged: rea
[1] TextChanged: reac
[1] TextChanged: react
[1] TextChanged: reacti
[1] TextChanged: reactiv
[1] TextChanged: reactive
[5] Throttle TextChanged: reactive
[1] TextChanged: reactive
[1] TextChanged: reactive l
[1] TextChanged: reactive li
[1] TextChanged: reactive lib
[1] TextChanged: reactive libr
[1] TextChanged: reactive libra
[1] TextChanged: reactive librar
[1] TextChanged: reactive library
[5] Throttle TextChanged: reactive library
输出符合预期;但是,如果您查看括号中的数字(输出托管线程 ID),则会发生一些令人惊讶的事情。第 3 步的日志记录发生在线程 #1,即 UI 线程,然而,第 5 步的日志记录发生在线程 #5。Throttle 步骤需要使用计时器,因此为了释放 UI 线程,Rx 库将我们的观察结果封送到了线程池线程。
Rx 为我们处理线程切换非常棒,但是 Silverlight(和 WPF / WinForms)控件具有线程亲和性,这意味着它们的状态只能从 UI 线程更新。幸运的是,Rx 可以轻松地切换观察源的线程,通过各种 ObserveOn
方法。还有一个 WPF / Silverlight 特定的 ObserveOnDispatcher
方法。
在我们的应用程序中添加一个 ListBox
,然后通过将其添加到绑定到 ListBox
的集合中,我们可以输出要搜索 Twitter 的文本。
var textToSearch = new ObservableCollection<string>();
searchResults.ItemsSource = textToSearch;
Observable.FromEvent<TextChangedEventArgs>(searchTextBox, "TextChanged")
.Select(e => ((TextBox)e.Sender).Text)
.Where(text => text.Length > 2)
.Throttle(TimeSpan.FromMilliseconds(400))
.ObserveOnDispatcher()
.Subscribe(txt => textToSearch.Add(txt));
搜索 Twitter
下一步是获取上述输出并对其执行 Twitter 搜索。这可以通过多种方式实现,如果您正在构建一个支持 Twitter 的应用程序,一种好的方法可能是使用 TweetSharp .NET 库。然而,由于我只想执行一个简单的搜索,所以我决定直接使用 Twitter REST API。通过它,API 查询无需身份验证即可通过 HTTP 进行,作为简单的 GET
请求,例如 http://search.twitter.com/search.atom?q=twitter,响应以 XML 或 JSON 格式提供。
Silverlight 提供了两种进行 Web 请求的机制:WebClient
,它提供了一个简单的接口,通过一个封送到 UI 线程的事件返回字符串响应;以及功能更丰富的 HttpWebRequest
。Rx 库允许您从任何遵循标准 IAsyncResult
模式的类创建 Observable 源。HttpWebRequest
遵循此模式,因此我们在这里使用它。
使用 HttpWebRequest
,您可以调用 BeginGetResponse
并提供在响应返回时调用的回调方法。在回调中,调用 EndGetResponse
以获取响应。
以下代码片段显示了如何查询 Twitter 搜索 API 并将结果读取到 string
中。
public MainPage()
{
InitializeComponent();
var request = HttpWebRequest.Create(new Uri
("http://search.twitter.com/search.atom?q=twitter"));
request.BeginGetResponse(new AsyncCallback(ReadCallback), request);
}
private void ReadCallback(IAsyncResult asynchronousResult)
{
HttpWebRequest request = (HttpWebRequest)asynchronousResult.AsyncState;
WebResponse response = request.EndGetResponse(asynchronousResult);
Debug.WriteLine(WebResponseToString(response));
}
private string WebResponseToString(WebResponse webResponse)
{
HttpWebResponse response = (HttpWebResponse)webResponse;
using (StreamReader reader = new StreamReader(response.GetResponseStream()))
{
return reader.ReadToEnd();
}
}
使用 Rx,FromAsyncPattern
方法可用于创建函数(即匿名委托),该函数在调用时提供执行 Web 请求的 Observable 源。订阅此源将返回响应。上述代码的 Rx 等效代码如下:
var request = HttpWebRequest.Create(new Uri
("http://search.twitter.com/search.atom?q=twitter"));
var twitterSearch = Observable.FromAsyncPattern<WebResponse>
(request.BeginGetResponse, request.EndGetResponse);
twitterSearch().Select(webResponse => WebResponseToString(webResponse))
.Subscribe(responseString => Debug.WriteLine(responseString));
您应该可以使用上述模式来处理任何遵循 IAsyncResult
模式的类,例如 Visual Studio 生成的 Web 服务客户端代理。
组合它们
上面的 twitterSearch
函数的问题在于它将始终执行相同的搜索。我们可以通过创建一个接受搜索词作为输入参数的委托来解决此问题,可以使用一个泛型的 Func
委托。
private string _twitterUrl = "http://search.twitter.com/search.atom?rpp=20&since_id=0&q=";
Func<string, IObservable<string>> searchTwitter = searchText =>
{
var request = (HttpWebRequest)HttpWebRequest.Create(new Uri(_twitterUrl + searchText));
var twitterSearch = Observable.FromAsyncPattern<WebResponse>
(request.BeginGetResponse, request.EndGetResponse);
return twitterSearch().Select(res => WebResponseToString(res));
};
现在我们可以将此函数与我们的事件处理逻辑结合起来搜索 Twitter。
Observable.FromEvent<TextChangedEventArgs>(searchTextBox, "TextChanged")
.Select(e => ((TextBox)e.Sender).Text)
.Where(text => text.Length > 2)
.Throttle(TimeSpan.FromMilliseconds(400))
.SelectMany(txt => searchTwitter(txt))
.Select(searchRes => ParseTwitterSearch(searchRes))
.ObserveOnDispatcher()
.Subscribe(tweets => searchResults.ItemsSource = tweets);
SelectMany
运算符用于将输入文本绑定到一个执行 Twitter 搜索的新 Observable 源。ParseTwitterSearch
方法获取结果响应字符串,并使用一些简单的 Linq to XML 来创建 Tweet
对象的 IEnumerable
。
private IEnumerable<Tweet> ParseTwitterSearch(string response)
{
var doc = XDocument.Parse(response);
return doc.Descendants(_entryName)
.Select(entryElement => new Tweet()
{
Title = entryElement.Descendants(_titleName).Single().Value,
Id = long.Parse(entryElement.Descendants
(_idName).Single().Value.Split(':')[2]),
ProfileImageUrl = entryElement.Descendants
(_linkName).Skip(1).First().Attribute("href").Value,
Timestamp = DateTime.Parse(entryElement.Descendants
(_publishedName).Single().Value),
Author = ParseTwitterName(entryElement.Descendants
(_nameName).Single().Value)
});
}
private string ParseTwitterName(string name)
{
int bracketLocation = name.IndexOf("(");
return name.Substring(0, bracketLocation - 1);
}
订阅会将这个 Tweet
对象“集合”设置为我们搜索结果列表框的 ItemsSource
。Tweet
值对象上的 ToString
方法简单地返回 Title
,产生以下结果:
稍加润色
最后,我们可以通过为列表框应用模板和一些值转换器来美化 Twitter Instant 应用程序。我添加了一个动画加载指示器,并在键入时调整了列表的不透明度,以显示当前搜索已被取消。
Rx 查询添加了一些 Do
运算符来更新 UI 状态。
Observable.FromEvent<TextChangedEventArgs>(searchTextBox, "TextChanged")
.Select(e => ((TextBox)e.Sender).Text)
.Where(text => text.Length > 2)
.Do(s => searchResults.Opacity = 0.5) // reduce list opacity when typing
.Throttle(TimeSpan.FromMilliseconds(400))
.ObserveOnDispatcher()
.Do(s => LoadingIndicator.Visibility = Visibility.Visible) // show the loading
// indicator
.SelectMany(txt => searchTwitter(txt))
.Select(searchRes => ParseTwitterSearch(searchRes))
.ObserveOnDispatcher()
.Do(s => LoadingIndicator.Visibility = Visibility.Collapsed) // hide the
// loading indicator
.Do(s => searchResults.Opacity = 1) // return the list
// opacity to one
.Subscribe(tweets => searchResults.ItemsSource = tweets);
这样,我们就完成了。
我真正喜欢 Rx 处理此类问题的方法是,程序流变成了一个单一的可观察管道,摆脱了事件处理程序、状态标志和笨拙的线程封送逻辑。
注意:您可能已经注意到上述可观察管道中的竞态条件。如果用户键入“reactive”,稍作停顿,导致 Twitter 搜索,然后键入“library”导致第二次 Twitter 搜索,结果取决于哪个先返回。幸运的是,Rx 在这方面有一些诀窍,如果您阅读 DevLabs: Reactive Extensions for .NET (Rx),您会发现 Switch
或 TakeUntil
运算符可以用来解决这个问题。
Bing Maps / Twitter 混搭
在下一个示例中,我们将更进一步,创建一个可观察管道,该管道将 Twitter 搜索与 Bing Maps API 提供的地理编码相结合。
让它下雪
英国人喜欢谈论天气,而没有什么比雪更能引起他们的谈论了。本周,雪下得……非常大! 我最喜欢的混搭应用之一是 Ben Marsh 的 #uksnow,您可以使用 #uksnow Twitter 标签发布您所在地区的英国邮政编码以及降雪量。Ben 的混搭应用在地图上添加小雪图标,实时显示推文。我一直在考虑创建一个 Silverlight 克隆版这个混搭应用,Rx 库和我们最近的过量降雪给了我完美的理由!
轮询 Twitter
与前面的示例一样,uksnow 混搭应用使用了可观察管道。管道的前几步如下所示:
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(30))
.SelectMany(ticks => searchTwitter("%23uksnow", _lastTweetId))
.Select(searchResult => ParseTwitterSearch(searchResult))
.ObserveOnDispatcher()
.Do(tweet => AddTweets(tweet))
.Do(tweets => UpdateLastTweetId(tweets))
...
这次,使用 Observable.Timer
扩展方法每三十秒创建一个“滴答”。使用一个略微修改过的 searchTwitter
函数,如上一个示例所示,用于查询 Twitter 并解析结果。然后将推文推送到 UI 线程,以便可以将它们添加到视图中。AddTweets
方法只是将它们添加到 UI 右侧的列表中,同时确保最多存在 100 条推文。
修改后的 searchTwitter
函数接受一个 lastId
参数,该参数用于查找自给定 ID 以来的所有推文,以便在每次 30 秒定时器滴答时,我们将最近的推文添加到列表中。
private string _twitterUrl = "http://search.twitter.com/search.atom?rpp=100&since_id=";
Func<string, long, IObservable<string>> searchTwitter = (searchText, lastId) =>
{
var uri = _twitterUrl + lastId.ToString() + "&q=" + searchText;
var request = (HttpWebRequest)HttpWebRequest.Create(new Uri(uri));
var twitterSearch = Observable.FromAsyncPattern<WebResponse>
(request.BeginGetResponse, request.EndGetResponse);
return twitterSearch().Select(res => WebResponseToString(res));
};
UpdateLastTweetId
方法更新 _lastTweetId
字段。
private long _lastTweetId = 0;
/// <summary>
/// Update the recorded Id of the most recent tweet
/// </summary>
private void UpdateLastTweetId(IEnumerable<Tweet> tweets)
{
if (tweets.Any())
{
_lastTweetId = Math.Max(_lastTweetId, tweets.Max(t => t.Id));
}
}
注意,因为可观察管道正在更新一个字段,所以我们必须小心潜在的并发问题。在我们的例子中,ObserveOnDispatcher
运算符意味着上面的代码将始终在 UI 线程上调用,但是,我们管道的第二步读取 _lastTweetId
的值,并且不会在 UI 线程上执行。如果减少了定时器滴答,在没有引入锁的情况下,这段代码可能会导致一些问题。
在继续进行接下来的几个管道步骤之前,我们先看看沿管道传递的项目。
/// <summary>
/// A tweet!
/// </summary>
public class Tweet
{
public long Id { get; set; }
public string Title { get; set; }
public string Author { get; set; }
public string ProfileImageUrl { get; set; }
public DateTime Timestamp { get; set; }
public Tweet()
{ }
public Tweet(Tweet tweet)
{
Id = tweet.Id;
Title = tweet.Title;
ProfileImageUrl = tweet.ProfileImageUrl;
Author = tweet.Author;
Timestamp = tweet.Timestamp;
}
public override string ToString()
{
return Title;
}
}
/// <summary>
/// A tweet with a postcode and snowfall factor
/// </summary>
public class UKSnowTweet : Tweet
{
public string Postcode { get; set;}
public int SnowFactor { get; set;}
public UKSnowTweet(Tweet tweet)
: base(tweet)
{
}
public UKSnowTweet(UKSnowTweet tweet)
: this((Tweet)tweet)
{
Postcode = tweet.Postcode;
SnowFactor = tweet.SnowFactor;
}
public override string ToString()
{
return Postcode + " " + SnowFactor.ToString() + " " + base.ToString();
}
}
/// <summary>
/// A geocoded tweet
/// </summary>
public class GeoCodedUKSnowTweet : UKSnowTweet
{
public Point Location { get; set; }
public GeoCodedUKSnowTweet(UKSnowTweet tweet)
: base(tweet)
{
}
}
管道的第一部分创建 Tweet
对象,其中一些可能包含邮政编码和雪况,因此我们可以从中创建 UKSnowTweets
。最后,如果地理编码成功,我们可以创建一个 GeoCodedUKSnowTweet
。
推送到 Bing
让我们看看完成我们管道的接下来的步骤。
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(30))
.SelectMany(ticks => searchTwitter("%23uksnow", _lastTweetId))
.Select(searchResult => ParseTwitterSearch(searchResult))
.ObserveOnDispatcher()
.Do(tweet => AddTweets(tweet))
.Do(tweets => UpdateLastTweetId(tweets))
// the next steps ...
.SelectMany(tweets => tweets)
.SelectMany(tweet => ParseTweetToUKSnow(tweet))
.SelectMany(snowTweet => searchBing(snowTweet))
.ObserveOnDispatcher()
.Subscribe(geoTweet => AddSnow(geoTweet));
在更新最后一条推文 ID 后,使用 SelectMany
运算符将传递到管道中的 IEnumerable<Tweet>
项目投影到单个 Tweet
项目,然后将其传递到下一个管道步骤。下一个 SelectMany
使用 ParseTweetToUKSnow
来匹配推文中的邮政编码和雪况。例如,"Ben_fisher: #uksnow so51 1/10 hoping for a 10 :)"
是一个“有效”的 uksnow 推文。SelectMany
有一个很好的副作用,即如果推文无法解析,它就不会继续到下一个管道步骤。这是解析方法:
/// <summary>
/// Parses the given tweet returning a UKSnowTweet if the postcode and snow
/// regexes match
/// </summary>
private IEnumerable<UKSnowTweet> ParseTweetToUKSnow(Tweet tweet)
{
string postcode = GetFirstMatch(tweet.Title, @"[A-Za-z]{1,2}[0-9]{1,2}");
string snowFactor = GetFirstMatch(tweet.Title, @"[0-9]{1,2}/10");
if (postcode!="" && snowFactor!="")
{
yield return new UKSnowTweet(tweet)
{
Postcode = postcode,
SnowFactor = int.Parse(snowFactor.Split('/')[0])
};
}
}
之后,再次通过 SelectMany
使用 Bing Maps API 进行地理编码。与我们的 Twitter 搜索一样,Observable FromAsynchPattern
用于构建合适的函数。
Func<UKSnowTweet, IObservable<GeoCodedUKSnowTweet>> searchBing = tweet =>
{
var uri = _geoCodeUrl + tweet.Postcode + "?o=xml&key=" + _mapKey;
var request = (HttpWebRequest)HttpWebRequest.Create(new Uri(uri));
var twitterSearch = Observable.FromAsyncPattern<WebResponse>
(request.BeginGetResponse, request.EndGetResponse);
return twitterSearch().Select(res => WebResponseToString(res))
.Select(response => ExtractLocationFromBingGeoCode(response))
.Select(loc => new GeoCodedUKSnowTweet(tweet)
{
Location= loc
});
};
传入一个 UKSnowTweet
,返回一个 Observable,该 Observable 在订阅时,如果地理编码成功,则返回一个 GeoCodedUKSnowTweet
。ExtractLocationFromBingGeoCode
方法同样是一个简单的 Linq to XML,用于解析 XML 响应。
最后,我们 ObserveOnDispatcher
并订阅,将生成的地理编码推文推送到我们的 AddSnow
方法。该方法将一个雪图像添加为 Bing Maps 控件的子项,使用 MapLayer
附加属性将其添加到所需的纬度/经度。
/// <summary>
/// Adds the given tweet to the map
/// </summary>
private void AddSnow(GeoCodedUKSnowTweet geoTweet)
{
var location = new Location(geoTweet.Location.X, geoTweet.Location.Y);
int factor = geoTweet.SnowFactor;
Image image = new Image();
image.Tag = geoTweet;
image.Source = new BitmapImage
(new Uri("/TwitterRx;component/snow.png", UriKind.Relative));
image.Stretch = Stretch.None;
image.Opacity = (double)factor / 10.0;
Map.Children.Add(image);
MapLayer.SetPosition(image, location);
MapLayer.SetPositionOrigin(image, PositionOrigin.Center);
}
至此,我们的混搭就完成了……
注意:要自己运行此示例,您需要下载 Bing Maps Silverlight SDK,并注册以获取 Bing Maps API 密钥。
结论
Rx 库无疑是一个非常聪明而强大的工具,并且与 Linq 的相似之处也确实有助于学习过程。许多我见过的例子都集中在 UI 事件处理上,正如我之前所说,这并不是我经常会做的事情。通常,Silverlight 应用程序由绑定到各种 UI 控件状态的 ViewModel
以及命令组成,因此您可以编写复杂的应用程序而不必接触 UI 事件。在我看来,Rx 的真正优势在于程序的流程编排。我非常喜欢我能够将 uksnow 混搭应用创建为一个单一的 Observable 查询或管道,使应用程序流程清晰、简洁且可维护。
那么……您有什么看法?您认为您会使用 Rx 吗?它是一个巧妙的解决方案,还是有点难以理解?
延伸阅读
我发现以下资源非常有用。
- DevLabs: Reactive Extensions for .NET (Rx) – Rx 库主页,前往此处获取下载、教程等……
- DEVHOL202 – 使用 .NET 的响应式扩展解决异步难题 – Rx 的全面介绍。
- Pencho Popadiyn 的《在 Silverlight 中使用响应式扩展》
- Sacha Barber 的《Rx 的乐趣》
- 101 个 Rx 示例
- .NET (Rx) 响应式扩展论坛
历史
-
2010 年 12 月 2 日 - 文章首次发布