Rx 的乐趣






4.93/5 (68投票s)
简要了解 DevLabs 响应式框架。
引言
本文旨在初步探索微软 DevLabs 项目“响应式框架”(简称 Rx)。虽然听起来有点奇怪,但本文附带的演示解决方案中没有任何可重用的东西。但它应该能帮助您理解 Rx 的工作原理。
必备组件
由于响应式框架仍处于开发阶段,尚未完全集成到主流 .NET Framework 中(尚未),您仍然需要下载并安装一个 MSI 安装程序,可以在以下网址找到:http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx。演示应用程序依赖于下载并安装“Rx for .NET 4.0”选项。
您还需要 Visual Studio 2010 和 .NET 4.0。
Rx
Rx 是一个用于使用可观察集合来组合异步和基于事件的程序的库。
“AJAX”中的“A”代表异步,事实上,现代基于 Web 和云的应用程序在根本上就是异步的。事实上,Silverlight 禁止所有阻塞的网络和线程操作。然而,异步编程绝不仅限于 Web 和云场景。传统的桌面应用程序也必须在面对长延迟 I/O 操作和其他昂贵的后台任务时保持响应能力。
无论是 Web/云还是客户端应用程序,交互式应用程序的另一个共同属性是它们是事件驱动的。用户通过接收来自鼠标、键盘和其他输入的异步事件流的 GUI 与应用程序进行交互。
Rx 是标准 LINQ 序列运算符的超集,它通过新的 .NET 4.0 接口 IObservable<T>
和 IObserver<T>
将异步和基于事件的计算公开为推送式可观察集合。它们是 .NET Framework 中熟悉的用于拉式可枚举集合的 IEnumerable<T>
和 IEnumerator<T>
接口的数学对偶。
IEnumerable<T>
和 IEnumerator<T>
接口允许开发人员创建可重用的抽象,以消费和转换来自各种具体可枚举集合的值,例如数组、列表、数据库表和 XML 文档。类似地,Rx 允许程序员使用对可观察集合(如 .NET 事件和 APM 计算、PFx 并发 Task<T>
、Windows 7 传感器和位置 API、SQL StreamInsight 时间事件流、F# 一等事件以及异步工作流)的 LINQ 查询来连接复杂的事件处理和异步计算。
- 截至 2010 年 9 月 7 日的响应式框架主页。
我将在这里添加几句话,以澄清我将如何向熟悉通用 .NET 编程的人解释 RX。
在您已经进行的大部分 .NET 编程中,您很可能习惯于钩接事件,监听这些事件触发订阅的委托,然后在一个事件处理程序委托中运行一些代码。您可以将此视为一种拉式机制。这没问题,但使用 Rx,这种情况被颠倒了,当发生某些事情时,它会通过新的 IObservable<T>
和 IObserver<T>
接口立即推送到订阅者。诚然,仅凭这一点似乎并没有什么革命性的,但有几点我认为真正让 Rx 闪耀。我希望在讨论下面所示的演示应用程序时能向您展示这些。
讨论演示应用程序
我在本文顶部提供的 VS2010 解决方案链接中包含了四个小型演示项目。在接下来的子节中,我将解释文章下载演示解决方案中的演示应用程序。我想指出,这些演示应用程序在性质上都相当相似,因为我认为使用 Rx 最重要的方面是订阅 IObservable<T>
,所以这就是为什么演示应用程序都采用相似的视角。这并不是说 Rx 只做这些;绝非如此,它还能做很多其他事情,这只是我个人认为值得探讨和与大家分享的内容。
在下面的每个演示部分开始时,我将简要讨论演示应用程序的功能,然后展示一张截图,然后解释演示项目中的代码。
RxDraw (.NET 4.0)
演示描述
此演示说明了如何使用 Subject<T>
,它同时是 IObserver<T>
和 IObservable<T>
。其中包含一个 RxPanel
用户控件,它包含一个 Subject<T>
,该 Subject<T>
在 MainWindow.xaml.cs 中被订阅。RxPanel
用户控件监听鼠标移动,当它认为已形成完整的折线时,它会通知订阅者。它还说明了当您订阅某项内容时,会获得一个 IDisposable
,这非常棒,可以在 using
语句中使用,并且比需要跟踪的事件处理程序委托更容易管理。
可以看到,在单个 WPF Window
中托管了两个面板。Window
称为 MainWindow.xaml,顶部面板实际上是一个名为 RxPanel
的 UserControl
。
RxPanel
用户控件使用一个名为 Subject<T>
的 Rx 对象,这是一个非常方便的类,它同时是 IObserver<T>
和 IObservable<T>
,这使得它非常易于使用。基本思想是,每当我们获得一个完整的 PolyLine
时,我们将使用 RxPanel
用户控件中包含的 Subject<T>
来向其订阅者发出发生了某事的信号(我认为这是一种反应)。
这是 RxPanel
的完整代码;需要注意的主要地方是两个 Rx 字段以及 MouseUp
事件处理程序中发生的事情。那就是 Subject<T>
被告知要通知其订阅者发生了某事。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
namespace RxDraw
{
/// <summary>
/// Interaction logic for RxPanel.xaml
/// </summary>
public partial class RxPanel : UserControl
{
private Point previousPoint = new Point();
private Polyline currentLine = new Polyline();
//Reactive fields
private readonly Subject<Polyline> mouseMovesInPanel =
new Subject<Polyline>();
public IObservable<Polyline> MouseMoves {
get { return mouseMovesInPanel.AsObservable(); } }
public RxPanel()
{
InitializeComponent();
}
private void Canvas_MouseDown(object sender, MouseButtonEventArgs e)
{
if (e.LeftButton == MouseButtonState.Pressed)
{
currentLine = new Polyline
{
Stroke = Brushes.Magenta,
StrokeThickness = 4
};
canv.Children.Add(currentLine);
currentLine.Points.Add(e.GetPosition(canv));
}
}
private void Canvas_MouseMove(object sender, MouseEventArgs e)
{
if (e.LeftButton == MouseButtonState.Pressed)
currentLine.Points.Add(e.GetPosition(canv));
}
private void Canvas_MouseUp(object sender, MouseButtonEventArgs e)
{
mouseMovesInPanel.OnNext(currentLine);
}
}
}
那么实际的订阅是如何工作的呢?这非常简单。下面显示了 MainWindow.xaml.cs 代码中最相关的部分,它展示了订阅是如何工作的。
private void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
rxPanelSubcription = rxPanel.MouseMoves.Subscribe(SubscriptionMethod);
}
private void SubscriptionMethod(Polyline rxPanelPolyline)
{
canvAutoDrawn.Children.Add(CreateLineFromOther(rxPanelPolyline));
}
private Polyline CreateLineFromOther(Polyline existingLine)
{
Polyline polyline = new Polyline();
polyline.StrokeThickness = 2;
polyline.Stroke = Brushes.Black;
foreach (Point point in existingLine.Points)
{
polyline.Points.Add(new Point(point.X, point.Y));
}
return polyline;
}
到目前为止,这一切看起来都像是标准的事件处理,对吧?嗯,.NET 中标准事件的一个可能存在问题的方面是,事件有时很难确定何时取消订阅 EventHandler
委托。现在使用 Rx,当您订阅 IObservable<T>
时,您将获得一个 IDisposable
,我认为它的生命周期比 EventHandler
更容易处理。IObservable<T>
的另一个非常美好的地方是,它是一个功能齐全的对象,您可以在方法之间传递它,这意味着您可以在代码的任何地方传递 IObservable<T>
。好吧,这也可以使用 EventHandler
委托完成,但会变得更加混乱,并且在订阅时使用 IObservable<T>
是您获得 IDisposable
的地方,这使得它非常容易处理;只需在 IDisposable
订阅上调用 Dispose()
即可完成。甚至可以将其包装在 using
语句中,多方便啊?我确实喜欢这一点。
RxSimpleFormEvent (.NET 4.0)
演示描述
此演示说明了我们如何使用 Rx 有效地使用 LINQ to Events,并在看到订阅的事件更改时做出反应。它还展示了我们如何使用新的 System.Disposables
命名空间,以及如何将 IDisposable
订阅范围限定在整个 Windows Forms 消息循环。
这是一个相当简单的演示,它使用 Rx 来监听表单上的 MouseDown 事件,然后更新表单上的 Label
控件,Rx 是订阅到该控件的 IObservable<T>
。这是它运行时的截图,没什么花哨的。
对于实际的表单,没有什么可说的了;这是表单的全部代码(不包括设计器生成的代码)。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
namespace RxSimpleFromEvent
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
public string RxUpdateableLabelText
{
set
{
label3.Text = value;
}
}
}
}
此演示的实际工作实际上发生在标准 WinForms 项目的 Program.cs(C# Windows Forms 项目)类中。这是 Program
类的完整代码。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;
namespace RxSimpleFromEvent
{
static class Program
{
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
static void Main()
{
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Form1 frm = new Form1();
IObservable<IEvent<MouseEventArgs>> moves =
Observable.FromEvent<MouseEventArgs>(frm, "MouseMove")
.Where(x => x.Sender.GetType().Equals(typeof(Form1)));
//subscribe, which gives us an IDisposable, that we can use in a using
//statement, which will wrap the entire Application message pump, so
//will only Dispose when the applications main form (Form1 in this case)
//is closed
using (moves.Subscribe(
//OnNext (Normal operation)
evt =>
{
frm.RxUpdateableLabelText = evt.EventArgs.Location.ToString();
}))
{
Application.Run(frm);
}// moves is unsubsribed at this point, as its an IDisposable, neato
}
}
}
此代码段中有几个有趣的地方需要注意,例如
- 请看我们如何有效地使用标准的 LINQ 语法,尽管我们实际上是在响应事件。好吧,在这种情况下,我只是观察源自
Form1
的事件,所以它不是那么花哨,但您明白了。我们正在对事件使用 LINQ 语法。 - 由于
IObservable<T>
(在此情况下为IObservable<IEvent<MouseEventArgs>>
)为我们提供了一个IDisposable
,我们可以使用IDisposable
将using
语句包装在整个应用程序的消息循环周围。同样,我知道这是一个奇怪的例子,但您可以使用它来自动取消订阅一些您知道会短暂存在的事物,在自己的代码中使用一个相对较小的using
范围。
RxAsyncWebCalls (.NET 4.0)
演示描述
Rx 声称能让我们的生活更轻松的另一个领域是异步调用,过去我们曾使用经典的 Begin/End 异步方法调用。
此演示说明了如何使用常规异步调用和 Rx 来执行异步操作。在此示例中,它调用了一个小型演示 WCF 服务,名为“DemoWCFService”,其中包含一个名为“GetData(int value)
”的方法。如果传入的数字在 1-100 之间,该方法将正常工作,否则将抛出 SOAP 错误。这个小型演示足以向您展示 Rx 可以使用的异步模式,以及如何使用 IObservable<T>.OnCompleted
和 IObservable<T>.OnError()
操作来处理可能发生的错误。
UI 的外观如下,可以看到有四个按钮。
- 使用常规异步调用 WCF 服务,向
GetData(int value)
传递一个有效数字,该数字应该可以正常工作。 - 使用常规异步调用 WCF 服务,向
GetData(int value)
传递一个无效数字,该数字应抛出 SOAP 错误。 - 使用 Rx 调用 WCF 服务,向
GetData(int value)
传递一个有效数字,该数字应该可以正常工作。 - 使用 Rx 调用 WCF 服务,向
GetData(int value)
传递一个无效数字,该数字应抛出 SOAP 错误。
服务合同/服务
为完整起见,以下是 WCF 服务合同。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Web;
using System.Text;
namespace DemoWCFService
{
[ServiceContract]
public interface IService1
{
[OperationContract]
[FaultContract(typeof(GenericFault))]
List<Person> GetData(int value);
}
[DataContract]
public class Person
{
public Person(int age, string firstName, string lastName)
{
this.Age = age;
this.FirstName = firstName;
this.LastName = lastName;
}
[DataMember]
public int Age { get; set; }
[DataMember]
public string FirstName { get; set; }
[DataMember]
public string LastName { get; set; }
}
[DataContract]
public class GenericFault
{
public GenericFault(string errorMessage)
{
this.ErrorMessage = errorMessage;
}
[DataMember]
public string ErrorMessage { get; set; }
}
}
以及服务实现。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Web;
using System.Text;
namespace DemoWCFService
{
public class Service1 : IService1
{
public List<Person> GetData(int value)
{
if (value > 100 || value < 1)
{
string errString = "Dummy error : Service1 can only return 1-100 items";
GenericFault fault = new GenericFault(errString);
throw new FaultException<GenericFault>(fault, new FaultReason(errString));
}
else
{
List<Person> people = new List<Person>();
for (int i = 0; i < value; i++)
{
people.Add(new Person(i, "person " + i.ToString() + "_FName",
"person " + i.ToString() + "_LName"));
}
return people;
}
}
}
}
因此,正如您所见,我们有一个标准的 WCF 服务,它简单地返回一个 List<Person>
,前提是 GetData(int value)
方法用一个介于 1-100 之间的数字调用;否则,将抛出 SOAP 错误。
如前所述,我为常规异步调用和 Rx 都包含了对演示 WCF 服务的良好和不良调用。唯一实际改变的是发送到 WCF 服务 GetData(int value)
方法的数字,因此为简洁起见,我只展示每种常规异步调用和基于 Rx 的选项的示例。您可以在附带的演示代码中检查完整代码。
常规调用
当我们以异步方式调用 WCF 服务或任何 Web 服务时,这是一个相当成熟且经过检验的模式。调用 BeginXXX
方法,获取一个 IAsyncResult
对象,该对象可用于调用 EndXXX
方法,此时您将获得结果。对于演示应用程序,其外观如下。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
//our demo WCF reference
using RxAsyncWebCalls.DummyWCFService;
using System.Threading;
using System.ServiceModel;
namespace RxAsyncWebCalls
{
public partial class Form1 : Form
{
private SynchronizationContext context;
public Form1()
{
InitializeComponent();
this.Load += Form1_Load;
}
void Form1_Load(object sender, EventArgs e)
{
//set up the SynchronizationContext
context = SynchronizationContext.Current;
if (context == null)
{
context = new SynchronizationContext();
}
}
/// <summary>
/// Shows calling a WCF service asynchronously using IAsyncResult
/// Should be successful
/// </summary>
private void BtnStandardWCFCall_Click(object sender, EventArgs e)
{
Service1Client client = new Service1Client();
try
{
pnlWait.Visible = true;
Application.DoEvents();
//conventional calling of WCF Async method
client.BeginGetData(10,
(iar) =>
{
try
{
List<Person> people = client.EndGetData(iar);
context.Send(new SendOrPostCallback(delegate(object state)
{
dgvPeople.DataSource = (List<Person>)state;
}), people);
}
catch (FaultException<GenericFault> fex)
{
MessageBox.Show("Error\r\n" + fex.Detail.ErrorMessage);
}
}, null);
}
catch (Exception ex)
{
MessageBox.Show("Ooops Something wrong\r\n" + ex.Message + "\r\n\r\n" +
ex.StackTrace);
}
finally
{
if (client != null)
{
client.Close();
client.Abort();
pnlWait.Visible = false;
Application.DoEvents();
}
}
}
}
}
除了异步调用之外,我们所做的只是处理 WCF 服务抛出的特定类型的 SOAP 错误(WCF FaultException<T>
),并确保表单控件使用正确的 UI 线程进行更新,方法是使用标准的 Windows Forms SynchronizationContext
。除此之外,这都是相当标准的异步代码。
Rx 异步调用
现在,以下是 Rx 组织关于常规异步调用的说法。
BeginXXX
方法是启动 Web 服务调用的方法。除了接受要传递给 Web 方法的所有参数外,它还接受一个 AsynCallback
委托。当收到服务的响应时,将调用此委托。此代码非常笨拙,并且异步调用的数据方面并不直观。此外,与其他异步数据源(例如我们的 TextBox)的组合变得非常困难。也不清楚如何取消挂起的请求,从而保证不会再调用回调过程。处理错误情况也很困难。
- 摘自响应式框架安装中包含的Rx HOL .NET.pdf。
因此,话不多说,我现在向您展示 Rx 代码,如下所示。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
//our demo WCF reference
using RxAsyncWebCalls.DummyWCFService;
using System.Threading;
using System.ServiceModel;
namespace RxAsyncWebCalls
{
public partial class Form1 : Form
{
private SynchronizationContext context;
public Form1()
{
InitializeComponent();
this.Load += Form1_Load;
}
void Form1_Load(object sender, EventArgs e)
{
//set up the SynchronizationContext
context = SynchronizationContext.Current;
if (context == null)
{
context = new SynchronizationContext();
}
}
/// <summary>
/// Shows calling a WCF service asynchronously using RX
/// Should be Success
/// </summary>
private void BtnAsyncUsingRx_Click(object sender, EventArgs e)
{
Service1Client client = new Service1Client();
try
{
pnlWait.Visible = true;
Application.DoEvents();
Func<int,IObservable<List<Person>>> peopleObs =
Observable.FromAsyncPattern<int, List<Person>>(
client.BeginGetData, client.EndGetData);
//call WCF service, with something that should work ok
IObservable<List<Person>> res = peopleObs(10);
var subscription = res.Subscribe(
people =>
{
context.Send(new SendOrPostCallback(delegate(object state)
{
dgvPeople.DataSource = (List<Person>)state;
}), people);
},
//OnError (Error operation) -- should NOT happen
fault =>
{
FaultException<GenericFault> actualWCFFault =
(FaultException<GenericFault>)fault;
MessageBox.Show("Error\r\n" + actualWCFFault.Detail.ErrorMessage);
},
//OnCompleted - Should happen, as no Fault happened
() =>
{
MessageBox.Show("Completed Ok");
});
}
catch (Exception ex)
{
MessageBox.Show("Ooops Something wrong\r\n" + ex.Message + "\r\n\r\n" +
ex.StackTrace);
}
finally
{
if (client != null)
{
client.Close();
client.Abort();
pnlWait.Visible = false;
Application.DoEvents();
}
}
}
}
}
其中有几个有趣的地方需要注意,例如
- 我们可以使用
Observable.FromAsyncPattern
语法,在单行中将Func<T>
委托声明为IObservable<T>
,其中包含 BeginXXX 和 EndXXX。这实际上非常不错,因为我们可以像调用普通方法一样简单地调用从Observable.FromAsyncPattern
语法获得的异步Func<T>
委托。即便如此,我们仍然需要订阅调用此Func<T>
委托的结果(一个IObservable<T>
)。 - 由于我们使用的是 Rx 的异步功能,因此我们可以利用创建
IObservable<T>
订阅时允许的OnError()
和OnCompleted()
(仅在使用异步 Rx 操作时调用)操作。我们可以在原始演示屏幕截图中看到OnCompleted()
的工作原理,但OnError()
操作代码呢?下面的屏幕截图显示了这一点,这显然是 WCF 服务抛出 SOAP 错误(FaultException<T>
)的结果。
但它实际上更容易阅读/理解和使用吗?我个人认为**不是**,但您可以自己决定;毕竟,玩转最新技术是时下最流行的。我个人仍然会以老式方式进行,我认为它更简洁。并非所有新技术都是好技术,我认为人们太容易被冲昏头脑(包括我自己,但最近我回到了我的根源,让新技术再成熟一些,称我为文艺复兴人吧)。
MefRx (.NET 3.5)
演示描述
当我开始写这篇文章时,我本想让它包含所有 .NET 4.0 项目,但似乎 Rx 的一些内容已集成到 .NET 4 中,而一些内容则没有。而且,由于很多内容都在 System
命名空间中,所以不只是完全限定类型的问题。因此,这是一个奇怪的项目,它引用了 Rx DLL(其他项目也一样),但它不以 .NET 4 为目标,而是以 .NET 3.5 为目标。
此演示说明了我们如何使用 Rx 有效地使用 LINQ to Events,但这次,它还展示了如何使用 MEF 从 Rx 对象中 [Export]
IObservable<EventResult>
,该对象将被添加到 MEF CompositionContainer
中,并可以像任何普通 MEF [Import]
一样被导入。由于 [Import]
是延迟加载的,感兴趣的方可以通过标准的 MEF 延迟加载技术(通常是 Func
委托)来获取 [Import]
。
我承认,就目前而言,这是一段完全无用的代码,但如果有一些绘图库,并且您想基于导出绘图事件等内容来执行某些操作,那么它可能会有用。只是看看这些东西是否可行很有趣,对吧?您永远不知道何时可能需要 MEF 和 Rx。
那么演示做了什么?嗯,当发生 MouseDown
事件时,它基本上只是更新一个 Form Label
,但它使用 MEF 来 Export
和 Import
MouseDown
事件。
这是必不可少的截图。
这个需要多做解释,我们将从 MEF 部分开始。
从一个简单的帮助接口开始。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MefRX
{
public interface IExportableName
{
string ExportableName { get; }
}
}
然后,我们有一个特殊的 MEF Export
属性。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition;
namespace MefRX
{
[MetadataAttribute]
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class ObservableExportAttribute : ExportAttribute
{
public ObservableExportAttribute() :
base(typeof(Func<object, IObservable<EventArgsWrapper>>)) { }
public string ExportableName { get; set; }
}
}
然后,我们有一个简单的单例 MEF CompositionContainer
宿主类,我们在其中仅宿主延迟导出,并使用当前 Assembly
创建实际的 CompositionContainer
。看看我们是如何使用 MEF 的延迟支持的,我们可以在稍后真正需要时使用 Resolve()
方法来获取 Export
?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition.Hosting;
using System.ComponentModel.Composition;
namespace MefRX
{
public sealed class SimpleComposer
{
#region Data
static readonly SimpleComposer instance = new SimpleComposer();
#endregion
#region Ctor
static SimpleComposer()
{
}
private SimpleComposer()
{
}
#endregion
#region Public Methods
/// <summary>
/// Resolve an exported function
/// </summary>
/// <param name="name"></param>
/// <returns></returns>
public static Func<object, IObservable<EventArgsWrapper>> Resolve(string name)
{
var rxSenders = Instance.RxEventExporters;
if (rxSenders == null) throw new InvalidOperationException(
"Part composer not initialized");
try
{
return Instance.RxEventExporters.FirstOrDefault(
s => s.Metadata.ExportableName == name).Value;
}
catch (Exception ex)
{
return null;
}
}
public void Compose()
{
var catalog = new AggregateCatalog();
catalog.Catalogs.Add(new AssemblyCatalog(
System.Reflection.Assembly.GetExecutingAssembly()));
var container = new CompositionContainer(catalog);
container.ComposeParts(Instance);
}
#endregion
#region Public Properties
[ImportMany]
public Lazy<Func<object, IObservable<EventArgsWrapper>>,
IExportableName>[] RxEventExporters { get; set; }
public static SimpleComposer Instance
{
get
{
return instance;
}
}
#endregion
}
}
接下来要做的是 Rx 部分,所以我们首先创建 MEF 将找到并放入 CompositionContainer
的 Export
,做法如下。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition;
using System.Windows.Forms;
namespace MefRX
{
[Export]
public class ExportedTriggers
{
[ObservableExport(ExportableName = "MouseDown")]
public IObservable<EventArgsWrapper> MouseDownExport(object obj)
{
var control = obj as Form1;
var trigger = from kd in control.GetMouseDown()
select new EventArgsWrapper()
{
Sender = kd.Sender,
EventArgs = kd.EventArgs,
};
return trigger;
}
}
public static class FormExtensions
{
public static IObservable<IEvent<MouseEventArgs>> GetMouseDown(this Form frm)
{
var allMouseDowns = Observable.FromEvent<MouseEventHandler, MouseEventArgs>
(h => new MouseEventHandler(h),
h => frm.MouseDown += h, //subscription handler
h => frm.MouseDown -= h //unsubscribe handler
);
return allMouseDowns;
}
}
}
其中有一个名为 EventArgsWrapper
的小辅助类,它看起来像这样。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MefRX
{
public class EventArgsWrapper
{
public object Sender { get; set; }
public object EventArgs { get; set; }
}
}
所以现在,我们只需要使用 MEF 进行一些 Import
。如何做到这一点?嗯,如下所示。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.ComponentModel.Composition;
using System.Threading;
namespace MefRX
{
public partial class Form1 : Form
{
private IDisposable obsDisposable;
public Form1()
{
InitializeComponent();
this.FormClosing += Form1_FormClosing;
}
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
{
obsDisposable.Dispose();
}
private void Form1_Load(object sender, EventArgs e)
{
IObservable<EventArgsWrapper> obs =
SimpleComposer.Resolve("MouseDown").Invoke(this);
//subscribe on WinForms SynchronizationContext
obsDisposable = obs.SubscribeOnWindowsForms(this).Subscribe(x =>
{
MouseEventArgs o = (MouseEventArgs)x.EventArgs;
label1.Text = string.Format("x:{0}-y:{1}", o.X, o.Y);
});
}
}
}
看看我们是如何使用 SimpleComposer.Resolve()
方法的,它将查询延迟导出,使用我在前面展示的 ObservableExportAttribute
应用于 Rx IObservable<T>
的元数据来查找正确的 Export
。另一个有趣的观察点是,我们正在 Windows Forms SynchronizationContext
上进行观察。WPF 也有一个,它使用 Dispatcher
。
哦,您还可以看到我保留了订阅的引用,并在表单的 Closing
事件上调用了 Dispose()
。
最后一点需要注意的是,为了让这一切正常工作,我们必须创建 MEF CompositionContainer
,我们在 Program
类中这样操作。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;
namespace MefRX
{
static class Program
{
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
static void Main()
{
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
SimpleComposer.Instance.Compose();
Application.Run(new Form1());
}
}
}
现在,将功劳归于应得的人,我将这个 MEF/Rx 示例代码的很大一部分基于 CodeProject 用户 Anoop Madhusudanan 为他极其出色的文章《WPF Extensibility Hacks or WEX》所做的更大项目。
做得好 Anoop,真是太棒了。向您致敬。
延伸阅读
本文和演示解决方案侧重于 IObservable.Subscribe
,因为我认为这是非常重要的一个方面,也是人们首先使用 Rx 的主要原因之一。Rx 还有很多其他功能,例如一些函数式编程技术,如 Fold/zip,以及生成数据序列等等。
例如,这是 Rx 中 IObservable<T>
的一部分扩展方法的屏幕截图;正如您所见,还有很多其他内容。
我建议您使用以下链接进行更多阅读。
一位读者还向我推荐了一个名为“RxSandBox”的优秀工具,您可以在此处获取,它提供了一个优秀的 UI 来尝试 Rx 操作;它确实非常酷,看起来像这样。
本文到此结束
这就是我在这篇文章中想说的全部内容。一如既往,我想说的是,如果您喜欢这篇文章并觉得它很有趣,请花点时间给它投票,并可能留下评论。
谢谢。