GestSpace:Rx框架和Leap Motion的实战案例






4.96/5 (15投票s)
关于Rx框架如何拯救我的高频事件处理生命的故事
关于Rx框架如何拯救我的高频事件处理生命的故事
引言
有时,我会了解到一项看起来颇为酷炫的技术……
但我找不到原因。我知道其中蕴含着一个想法,一个火花,一种我尚未完全理解的灵感,我能感觉到它就在那里,但目前还遥不可及。
在这种情况下,这项技术就是Rx框架(也称为Reactive Framework)。
——Leap Motion,我也爱你,别吃醋。
在阅读了出色的 Intro To Rx 网站后,我的反应是:“这很酷,但为什么要在事件驱动范式对我有效多年之后,还要转向响应式范式呢?”
于是我开始阅读关于Rx的文章,并对一件事感到非常沮丧:有太多的入门文章教你如何用RX做基本操作,但总是认为事件驱动方法更好、更简单。没有一篇文章给我一个明确的理由说明为什么使用响应式框架比事件驱动更容易。
唯一提到的场景是交易报价……但我不在交易行业工作,我为什么要关心呢?
然后,我萌生了一个WPF项目中使用Leap Motion(GitHub链接)的想法,并对Leap Motion的数据吞吐量进行了一些有趣的度量。
- 每秒处理30到100帧
- 每帧1-5根手指(每秒30-500根手指)
- 每帧0-1个手势
- 每帧1-2只手(每秒30-200只手)
这才是真正的高频事件……我终于可以真正尝试Rx框架了……结果棒极了!它拯救了我的生命。
这篇文章不是Intro To Rx,事实上,你不需要理解Rx就能理解我的文章。这是一篇关于RX如何让我的生活变得更好的真实、实际的案例,我将与你分享我的经验。请随意推广或根据你自己的生活进行情境化,并深入研究RX教程或大量出色的channel9视频。
事件流,事件处理器
Rx框架引入了两种类型:Observable<T>,代表某个输出事件的东西,其中T是事件的类型。
以及Observer<T>,它是一个将要观察/处理这些事件的实体。
我不太喜欢谈论Observer/Observable,因为它让我想起了我还是个可怜的Java程序员时的事件驱动编程时代……如果C#发明了事件,那也不是为了多年后回到石器时代。
我将使用Observable<T>这个术语,但我会使用事件流或事件源这个术语来代替。
而不再使用Observer<T>,我会使用事件处理器。
这使得我对事件溯源设计模式的词汇更加贴近。
说“为事件流限流10秒”比说“为Observable限流10秒”更有意义。
“过滤流”比“过滤Observable”更有意义。
我同意对于AI来说,这是语义上相同的,但作为人类,很容易想象对水流进行限流和过滤,而不是对一个“Observable”进行操作。
你想象到的,你就能理解。
将Leap Motion数据转换为事件流
第一个目标是使用Leap Motion数据和RX来获取我调试窗口中的FPS和手指计数。
第一步是实现Leap Motion的基础类Listener。
Leap Motion的开发者只实现了OnFrame,当Leap Motion有新帧到来时,它就会被调用。
OnFrame在次线程上每秒调用30到100次。
我的目标是类ReactiveListener能够访问这些帧作为事件流。
为此,你可以使用rx的Subject<T>类,它继承自Observable<T>,你可以向其中推送事件T。
正如你所见,OnFrame并不复杂。
public override void OnFrame(Controller arg0)
{
NewFrameDetected.OnNext(arg0.Frame());
}
现在我可以将帧作为Observable获取了。
public IObservable<Frame> Frames()
{
return NewFrameDetected;
}
从这里开始,我终于可以在我的ViewModel中用这个漂亮的表达式计算FPS(每秒帧数)了。
reactiveListener
.Frames()
.Timestamp()
.Buffer(2)
.ObserveOn(main.UI)
.Subscribe(o =>
{
var seconds = (o[1].Timestamp - o[0].Timestamp).TotalSeconds;
FPS = (int)(1.0 / seconds);
});
TimeStamp返回带有时间戳的帧。
Buffer表示我关心一次取2帧。(将IObservable<Frame>转换为IObservable<IList<Frame>>)。
ObserveOn指定事件处理器(由Subscribe定义)应在UI线程的SynchronizationContext上执行,以更新UI。
Subscribe更新FPS属性。
正如你在本部分开头GIF视频中看到的,调试窗口显示了Leap Motion检测到多少根手指。
它是如何实现的?
reactiveListener
.FingersMoves()
.ObserveOn(main.UI)
.Subscribe(o =>
{
FingerCount++;
});
但我如何减去手指呢?这有点棘手。这是代码,我会在稍后解释。
reactiveListener
.FingersMoves()
.Select(c => c
.ObserveOn(main.UI)
.Subscribe(cc =>
{
}, () =>
{
FingerCount--;
}))
.Subscribe();
要理解这段代码,你需要理解什么是单个FingerMove。
public IObservable<IGroupedObservable<int, Finger>> FingersMoves()
{
return Frames() /*frames*/
.SelectMany(f => f.Fingers) /*fingers*/
.GroupByUntil(f => f.Id,
g => g.ThrottleWithDefault(TimeSpan.FromMilliseconds(300)) /*duration*/);
}
那么这是什么意思?
首先,我从帧流开始,使用SelectMany提取每一帧中的所有手指。
然后,我按ID对它们进行分组,这样我就可以获得每根手指的数据流。
这是一个简单的流表示,也称为Marble Diagram(时间是水平轴)。
GroupByUntil的第二个参数返回一个事件流(称为“duration”),当duration输出事件时,该组被认为“已关闭”。如果之后有相同ID的手指到来,GroupByUntil会创建一个新组。
在这种情况下,当其组停止接收事件300毫秒时,duration会输出一个值。
那么,让我们回到递减手指计数的表达式。
reactiveListener
.FingersMoves()
.Select(c => c
.ObserveOn(main.UI)
.Subscribe(cc =>
{
}, () =>
{
FingerCount--;
}))
.Subscribe();
我订阅了groups stream输出的每一个finger group。
然后,对于我在这组中看到的每个手指,我都不做任何操作。
Subscribe的第二个参数指定,当组的流关闭时,我将递减FingerCount。
将手指表示为流,并按ID分组非常方便。将它们分组为移动。
Leap Motion有时会错过1到2帧的手指(大约5毫秒),如果仅仅因为手指错过了2帧就认为用户想要进行新的移动,那就太愚蠢了。
300毫秒是一个普通人不会注意到的时间跨度,而且你可以确定Leap Motion不会错误地错过300毫秒的手指,这达到了完美的平衡。
将你的ViewModel与你的事件流连接起来
那么,在开始之前,GestSpace是什么?
GestSpace是一个Leap Motion应用程序,可以用手控制你的电脑。
当GestSpace看到你的手时,它会显示一个可定制的控制面板。
控制面板由可定制的磁贴组成。(事实上,大多数磁贴是与手势关联的可配置键盘快捷键)。
要使用磁贴,你需要用手指选中它(图1)。
然后用你的手锁定它(图2)。
然后移动你的手(图3)。
你的手势取决于磁贴的类型。图3展示了如何调节音量。
这个应用程序有3种状态:
- 控制面板隐藏
- 控制面板可见且处于导航状态
- 控制面板可见且磁贴已锁定
这在枚举MainViewState中得到体现,正如你在这个类图中可以看到的。
你还可以注意到,ViewModel不直接引用之前的ReactiveListener,而是引用ReactiveSpace。
ReactiveListener和ReactiveSpace的区别在于,ReactiveSpace暴露了与我的项目高度耦合的流,而ReactiveListener可以在其他未来项目中使用。
ReactiveSpace暴露了sIsLocked,一个布尔流,每次应该锁定或解锁磁贴时都会发送一个值。
所以,这是我的MainViewModel中发生的事情。
spaceListener
.IsLocked()
.ObserveOn(UI)
.Subscribe(locked =>
{
if(this.State != MainViewState.Minimized)
State = locked ? MainViewState.Locked : MainViewState.Navigating;
});
我如何创建IsLocked流?非常简单。
- 如果有一只手有4根手指,我就被锁定了。
- 如果在过去200毫秒内没有出现带有4根手指的手,我就被解锁了。
- 新订阅者到此流应该立即获得最后接收到的布尔值。
public IObservable<bool> IsLocked()
{
if(_IsLocked == null)
{
var handIsPresent = listener
.Frames()
.SelectMany(f => f.Hands)
.Where(h => h.Fingers.Count >= 4)
.Select(s => true); //Step 1
var handIsAbsent = handIsPresent
.Throttle(TimeSpan.FromMilliseconds(200)); //Step 2
var handPresence =
handIsPresent
.Merge(handIsAbsent.Select(o => false)) /Step 3
.DistinctUntilChanged() //Step 4
.Replay(1); //Step 5
handPresence.Connect(); //The replay stream start listening the input stream
_IsLocked = handPresence;
}
return _IsLocked;
}
步骤1 – 我创建一个流,当有带有4根手指的帧出现时,发送True布尔值(handIsPresent)。
步骤2 – 我创建一个流,当第一个流在200毫秒内没有输出值时发送True布尔值(使用handIsAbsent进行限流)。
步骤3 – 我合并这两个流,并将handIsAbsent的True转换为False。
步骤4 – 我只保留连续不同的值(输出流不能连续发送两个false或两个true)。
步骤5 – 我创建一个流,当新的订阅者订阅时,它会重放这个流的最后一个值。
我说简单,应该改为:一旦你花几个小时去理解这个问题……但现在RX框架表达式对我来说完全自然了。
现在,我如何用手控制音量,
每个磁贴都关联一个继承自PresenterViewModel的类。这些类负责在锁定后订阅ReactiveSpace,并从事件流中更改其属性的值。
音量的PresenterViewModel是ValuePresenterViewModel,目前它只用于音量,你不能为其绑定键盘快捷键。
在WPF端,ValuePresenterViewModel是一个风格化的进度条,所以暴露的属性应该不会让你感到惊讶。
但是,仔细看看ValuePresenterViewModel最后一个构造函数的签名。
它等待一个事件流。
构造函数只需监听getValue,然后更新Value属性。
public ValuePresenterViewModel(double minValue, double maxValue, IObservable<double> getValue, Action<double> setValue)
{
this.setValue = setValue;
this.getValue = getValue;
this.MinValue = minValue;
this.MaxValue = maxValue;
this._Subscription = getValue
.Subscribe(d =>
{
_Value = Normalize(d);
OnPropertyChanged(() => this.Value);
});
}
private double Normalize(double value)
{
value = Math.Min(MaxValue, value);
value = Math.Max(MinValue, value);
return value;
}
然后,音量磁贴就只是传递正确的Observable的问题了。
我使用了出色的 CoreAudioAPI。
MMDeviceEnumerator devEnum = new MMDeviceEnumerator();
MMDevice defaultDevice = devEnum.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eMultimedia);
return new ValuePresenterViewModel(
minValue: 0,
maxValue: 100,
setValue: (v) => defaultDevice.AudioEndpointVolume.MasterVolumeLevelScalar = (float)(v / 100.0),
getValue: Observable.FromEvent<AudioVolumeNotificationData>(
o => defaultDevice.AudioEndpointVolume.OnVolumeNotification += o,
o => defaultDevice.AudioEndpointVolume.OnVolumeNotification -= o)
.Select(n=>n.MasterVolume)
.Merge(Observable.Return(defaultDevice.AudioEndpointVolume.MasterVolumeLevelScalar))
.Select(v=>(double)(v * 100.0)));
你可以看到,我使用Observable.FromEvent将一个传统的事件(object sender + EventArgs)转换为了一个事件流。我使用merge与一个只返回当前音量的事件流,所以,当Presenter订阅时,它会直接获取当前音量,而无需等待下一个OnVolumeNotification触发。
好的……现在让我们看看另一个例子:CyclePresenterViewModel,你可以用它来进行滚动等操作。
与ValuePresenterViewModel不同,你可以脚本化当你在圆圈上画圆时,圆圈会做什么。
在这个截图中,我配置了Circle,当我逆时针旋转时模拟按下键盘的UP键5次,顺时针旋转时模拟按下键盘的DOWN键5次。
这是一个包含3个命令的小语言:PRESS、UP和DOWN,每个命令都可以接受多个参数,还有一个小智能提示来帮助你。
我也可以写:PRESS UP,UP,UP,UP,UP。
键盘模拟功能得益于出色的库 InputSimulator。
然后,正如GIF图像所示,我只需要将手朝正确的方向转动即可触发这些键盘快捷键。
CyclePresenterViewModel的订阅代码简直太简单了,再次感谢RX框架。
我将一个手指的方向投影到当前旋转点的切线上,然后更新旋转。(在rotation的setter中,有逻辑来决定何时触发键盘快捷键)。
protected override IDisposable SubscribeCore(ReactiveSpace spaceListener)
{
return
spaceListener
.ReactiveListener
.FingersMoves()
.Concat()
.ObserveOn(UI)
.Subscribe(c =>
{
var v = c.TipVelocity.To2D();
var cos = Math.Cos(Helper.DegreeeToRadian(Rotation));
var sin = Math.Sin(Helper.DegreeeToRadian(Rotation));
var tan = new Vector((float)sin, (float)cos, 0);
var man = tan.Dot(v);
Rotation -= man / 100; //Some magic happens in the setter
});
}
请记住,FingerMoves是按ID分组的手指,Concat获取它找到的第一个组,并输出其内容。当组关闭时,它会获取下一个组。将任何你想转换的东西变成事件流
我已经向你展示了如何将一个事件转换为事件流,或者将Leap Motion的数据转换为事件流。
但这并不是全部。
通过GestSpace,你可以配置在前景程序更改时自动切换到某个磁贴。
在下面的截图中,我指定当iexplore、firefox或chrome处于前台时,我想切换到浏览器控件。
这样,你就可以根据你在电脑上正在做的事情来情境化你的手势。
这是一个例子,我切换窗口,然后在codeproject上,准备好向上/向下滚动,并转到上一页/下一页。
在上一个截图中,你看到我向用户提示当前的前景程序。如何做到的?
通过轮询,使用事件流!
_ProgListener = new ForegroundProgramListener();
_ProgListener.ForegroundProcess
.ObserveOn(UI)
.Subscribe(pid =>
{
if(pid != _CurrentPid)
{
using(var p = Process.GetProcessById(pid))
{
CurrentProgram = p.ProcessName;
var tile = Tiles.FirstOrDefault(t => t.BelongsToFastContext(p.ProcessName));
if(tile != null)
CurrentTile = tile;
}
}
});
这是ForegroundProcess的代码。
ForegroundProcess
=
Observable.Interval(TimeSpan.FromMilliseconds(800))
.Select(_ =>
{
var hwnd = user32.GetForegroundWindow();
uint pid = 0;
user32.GetWindowThreadProcessId(hwnd, out pid);
return (int)pid;
})
.DistinctUntilChanged();
Observable.Interval是一个事件流,每800毫秒输出一次,然后我获取前台窗口的pid,然后只输出与前一个值不同的值,就完成了。
结论
RX框架让我的生活更轻松。
这是一种新的事件处理思维方式,它对于简单的用例很棒,但对于处理高频事件来说是必不可少的。
理解这个概念需要时间,我建议你阅读Intro To Rx并在channel9上观看视频,这是值得的。
我在GroupByUntil中发现了一个bug,我在这里 记录了,除了这一点,它非常好用。
源代码 在GitHub上,如果你有Leap Motion,请随时进行实验。:)