65.9K
CodeProject 正在变化。 阅读更多。
Home

WPF / MVVM 实时交易应用程序

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (90投票s)

2012 年 2 月 10 日

GPL3

19分钟阅读

viewsIcon

307405

该项目是一个使用 WPF / MVVM 开发的实时多线程交易应用程序框架

源代码可在 WpfRealTime CodePlex 网站上找到

前言

2009 年 7 月 1 日,我接到一位招聘经纪人的电话,他承诺给我一个“无法拒绝的报价”。我不知道他是如何得知我的姓名、电话号码或工作单位的,但他似乎在我与他反复强调的 WPF/MVVM 之间建立了非常强的联系,并且在第二个 M 上稍微提高了嗓门。当时我非常满意地安逸地坐在一家银行中间业务 IT 部门一个“不太容易到达的楼层”。我从事的项目虽然具有战略意义,但却毫无意义,这几乎是完美的,所以我想我不会轻易被动摇。

为了消除他对我的 MVVM 联系的怀疑,我同意与客户见面,在一次快速的盘问后,我的 MVVM 参与显而易见。不久之后就有了判决,我被聘请来构建一个“使用 WPF/MVVM 的实时低延迟多线程前台交易应用程序”。

要求

我的新经理言简意赅,毫不犹豫地给我提供了一套需求。它们是这样的:

  • 应用程序需要始终保持响应,“非常快”地显示交易数据,永不挂起、冻结、崩溃或耗尽内存。

我多次搬桌子试图躲藏(这是我以前成功使用过的技巧),但我的经理每次都能找到我,并催促我更新。我别无选择,只能继续我的任务。于是我“谷歌”了一下,显然“响应性和快速”与实时或低延迟有关(我们稍后会详细讨论),“挂起、冻结和崩溃”是一个复杂自适应系统研究的领域,我决定在此基础上添加复合 UI,因为它在现代 UI 系统中是必不可少的。

所以我的技术要求是:

  • 实时可靠性
  • 适应性
  • 可扩展性、可伸缩性和模块化(它们本质上是一回事,但放在一句话里听起来更好)

实时系统开发速成班

“实时系统”一词可以指很多不同的东西。RT 系统的一个特性是它们了解它们运行的硬件,并使用非常低级的编程语言来访问硬件特定的调用。它们通常直接运行在硬件上(没有操作系统),或者借助实时执行系统或实时操作系统。现在,由于它们被设计为在特定硬件上运行,因此它们不易扩展、移植等。这就是为什么通用设计模式不适用于实时系统——它们“过于通用”。

然而,RT 系统的一个主要特性是它们是根据“实时约束”设计的。这并不意味着它们运行速度快(那只是低延迟),而是所有任务都保证在定义的“实时约束”内完成执行。即,我保证在 50 毫秒内显示任何新到达的数据。

所以,每个 RT 系统的核心将是一个实时调度算法。总体的想法是,系统中的所有任务可以分为周期性或偶发性。周期性任务是预先已知的,我们知道它们的开始时间、运行时间(预算或截止时间)、优先级。偶发性任务可以随时启动,我们只能在它们需要被中断之前分配估计的预算(处理器时间)。一些比较知名的算法是“基于优先级的抢占式调度器”和“最早截止日期优先”。顾名思义,最早截止日期优先算法是根据任务的截止日期(任务必须执行的时间)来组织任务的。基于优先级的算法是根据任务的优先级来组织任务,并在出现具有更高优先级的任务时中断当前任务。其中一种基于优先级的算法是速率单调调度器。它是这样工作的:

速率单调调度器将所有周期性任务放入等待队列。在新周期开始时,调度器将检查等待队列并将选定的任务移至“就绪运行”队列。此时,如果“就绪运行”队列中有任何任务的截止日期早于当前调度任务的截止日期,调度器将执行上下文切换。速率单调算法将累积偶发性任务,并将它们作为批次在指定的周期性任务上运行,因为它们被认为是较低优先级。

众所周知,从 .NET 4.0 开始,Microsoft 发布了任务并行库 (TPL),它允许开发人员定义任务并编写自定义任务调度器来运行它们。这是我们实现实时框架的绝佳场所。我们将定义 PeriodicTaskSporadicTask 类,它们将继承自 System.Threading.Task 类,并实现我们自己的 PriorityTaskScheduler 类,它将继承自 TaskScheduler 类。由于我们可以缓冲市场数据的到来(借助 Rx),并以固定的时间间隔将其分发到 GUI,因此我们将这些任务称为周期性任务。任何用户交互(单击按钮等)都无法预测,因此我们将这些任务称为偶发性任务。我们将以速率单调算法为基础,唯一的区别是我们处理偶发性任务的方式。我的要求是“GUI 需要始终保持响应”,这就是为什么我认为偶发性任务的优先级高于任何周期性任务,并且会先运行它们。

复杂自适应系统

虽然我见过很多关于复杂系统应如何适应不同条件、监控资源、自我验证和恢复组件的研究材料,但我还没有在 UI 世界中看到太多相关的实现。因此,我将在框架中内置一些此功能,并希望这足以满足我的“永不挂起、冻结或崩溃”的要求。

由于我使用的是复合或模块化 UI,因此实现一些组件监控功能非常容易。主 Shell 与其模块建立连接,并发送定期的心跳信号以确保系统的所有部分都具有响应性。如果任何模块未能响应,我们将需要从内存中卸载该屏幕,并在可能的情况下重新加载一个新实例。我将每个视图运行为一个新的 Window 类,这样它就可以分配自己的调度器。这样,如果某个特定视图“挂起”,UI 的其余部分仍然“活跃”。我们的主 Shell “永不挂起或冻结”,因为我们不在其调度器上执行任何工作。它的工作是加载系统组件、监控心跳信号并重新加载其他模块/视图。因此,由于每个视图都有自己的调度器,我们可以处理更多的更新而不会降低整个应用程序的速度。

每个视图还将分配一个 MaintenanceEngineer 类,其职责是监控视图的内存使用情况、发送心跳信号以及监控调度器队列。它可以决定我们是否在周期性更新或用户交互(偶发性任务)方面落后。

复合 UI

在复合 UI 方面,WPF 开发人员的选择很多。出于我的目的,我将选择范围缩小到三个框架:Prism、Marlon Grech 的 MeffedMVVM 和 Sacha Barber 的 Cinch。我的原则是,使用一个来源是抄袭,而使用多个来源是研究。这很适合我,因为我只想为我的实时框架窃取一些有用的功能。

Prism

Prism 最好的地方是……嗯……模块化。这是通过 Bootstrapper 类来实现的,该类负责加载模块。我更喜欢 Marlon 使用 MEF 的方式,所以我认为我们会从他的 MeffedMVVM 中窃取它。其次是……解耦……通过 EventAggregator 类实现。同样,我更喜欢 Sacha 用他的 Mediator 类来实现的方式,他从 Karl Shifflett 的作品中借鉴,Karl Shifflett 又从 Marlon 那里借鉴,Marlon 又从 Josh Smith 那里借鉴(但不一定按此顺序)。我也会对其进行一些修改,我们稍后可以讨论。还有区域适配器,我不会使用(因为它过于复杂且在我看来很没用),Unity 我也不会使用,因为我可以使用 MEF 来实例化我的对象,并且我不想构建像 Unity 那样长的依赖链(我有一个想法,ViewModelsServices 应该完全解耦,并且只通过 Mediator 进行通信,它们订阅主题并向这些主题广播异步消息——一种 GUI 企业总线,请耐心等待,也许稍后我可以更好地解释它)。所以实际上,我们从 Prism…嗯…什么都没借。

MeffedMVVM

在 MeffedMVVM 中,Marlon 使用 MEF 来链接 ViewsViewModels。我将使用他的想法,但我将使用 MEF 来解析 ViewModelsServices。它们之间不需要相互引用,因为它们都引用单例 Mediator 类,该类提供发布/订阅机制并选择 TaskScheduler 来运行任务。还有一个强有“一个 View – 一个 ViewModel”的连接,它在设计时定义,因此无需在运行时动态链接它们。Marlon 还为他的视图定义了“设计时”行为,因此每个控件都有一些虚拟数据可以在 Blend 中显示。我将使用这个想法,但将其应用于 Services 而不是 Views。所以……每个 Service 都有一个 MockService 版本,通过简单的配置更改,您应该能够使用真实或模拟服务来运行 GUI。这允许您在服务器关闭(他们那样做)或服务器端组件未准备好时运行和处理 GUI(我认为这并非 Marlon 所指的“设计时数据”,但无论如何……)。所以,从 MeffedMVVM 中,我们将借用……嗯,MEF 和 MVVM。

Cinch

现在,我们将从这里借鉴很多东西。所以……谢谢 Sacha。我们从 WeakEventActionWeakActionEventToCommandTrigger 和稍作修改的 Mediator 开始。它们非常出色,我无需重复它们的工作原理,因为您可以参考 Sacha Barber 在 Cinch by Sacha Barber 上的原始帖子。

MVVM 及其他

您可能已经猜到,我将使用 MVVM,因此有一些 SimpleCommandAttachedProperties 类。现在可能无法追溯作者,但我仍将它们归功于 Sacha Barber。

另一个值得一提的类是 Entity。这是一个旧的 .NET 技巧,可以通过自定义 PropertyDescriptorITypedList 将数据绑定到网格。因此,所有数据都作为 Entity 对象传递,如果您需要有关其工作原理的更多详细信息,请谷歌“property bag .NET”。

我还使用 Reactive Extensions (Rx) 来节流和合并到达的数据。

设计

好吧……我不得不说,这家银行的餐厅太糟糕了,所以我决定继续我的设计。

因此,在所有通常的 MVVM 成分都存在的情况下——ViewsViewModelsServices——我们还有一个 Shell,它只负责加载/卸载模块并监控心跳速率/资源等。我们还有实现业务特定逻辑的模块。每个模块都有 ViewsRibbonViewViewModelsServiceObservers。当 Shell 启动时,Bootstrapper 将使用 MEF 在磁盘上发现模块。它将 RibbonViews 添加到 Shell 的内容中,在单独的 Windows 中加载 Views(每个都有自己的调度器),并创建 Services。它还将实例化 MediatorViewModels 和 services 将在其中“注册它们的兴趣”。所以发生的情况是 Mediator 具有定义系统中所有可能流程的主题列表。它将维护一个委托列表,以便在特定主题上广播消息时运行。

因此,ViewModelsServices 都加载到内存中,通过 Mediator 消息总线进行通信,Mediator 选择正确的 TaskScheduler 来保证实时更新,ViewModels 更新 ViewsShellMaintenanceEngineers 对话以确保每个人都正常工作。

勺子?没有勺子 (xaml.cs)

我一直在争论(与我自己)Bootstrapper 应该解析 View 还是 ViewModel,这引出了 V 应该引用 VM 还是 VM 应该引用视图的老话题。说实话,我认为区别不大,只要你选择一种模式并坚持下去。所有 MVVM、MVC 和 MVP 所做的都非常相似,只有一些细微的差别(参考我之前的 MVVM 帖子)。无论如何,重点是 MVVM 的主要目标是实现“Blend-ability”,对我来说,这意味着一件事——没有 xaml.cs。有很多好的框架,人们仍然会在 xaml.cs 中编写代码,因此您需要在 VM 和 xaml.cs 之间切换以查找内容所在的位置,这会导致一些混乱。

所以我对“无 xaml.cs”采取了字面意义……并删除了 xaml.cs。所以如果你查看项目,只会找到 xaml 文件而没有对应的 xaml.cs。这是可能的,因为您真正需要从 .cs 文件中获得的唯一东西是调用 Initialise() 并分配 DataContext,这正是我将在 BaseViewModel 中做的。所以有效地,每个 ViewModel 创建自己的 View,初始化其内容并分配 DataContext,这对我来说是有意义的,因为控制 View 是 VM 的职责。

在又一次“更新会议”后,我的经理说服我,我仍然没有看到更大的图景。所以,这就是它——更大的图景。

因为我认为绘制“大图”图表是孩子们的游戏——我让我的四岁和三岁的孩子为我做的(好吧……实际上没有,我自己做的:()

Bigger Picture

图 1 - 大图

所以,如果我们从后往前看,第一个数据录入点是 Services。所有服务都有真实和模拟实现类,可以被 MEF 发现和加载。在我的情况下,真实服务在一个单独的进程中加载并通过 WCF NamedPipes 进行通信,而模拟服务则由主应用程序引用。您需要做的就是更改 VS 中的 Solution Configuration 来构建真实或模拟版本。这是通过一个构建后脚本实现的,该脚本在构建后更改配置文件值。这是在没有连接到服务器的情况下测试/开发应用程序的好方法。Services 接收数据并构建键值对数据契约。这些事件被“爆炸”并通过 Rx 进行观察。然后,它们将被节流并通过观察到的固定时间间隔“抛向”GUI(这是我们实时约束的第一部分)。通常,服务和服务观察者之间将是一对一的连接,但并非必然。一个 ServiceObserver 可以观察多个服务并使用 Rx Merge 扩展将这些事件呈现为好像它们来自同一个源(查看 BaseServiceObserver 类)。如果新的版本在旧版本被调度之前到达,ServiceObserver 将合并任何数据。它将使用 EntityBuilder 构建 Entity,并使用 Mediator 将这些 Tasks 广播给所有感兴趣的人。

因此,Entities 将进入 MediatorMediator 将找到已注册兴趣的 ViewModel 并找到它需要运行的委托。Mediator 将创建一个封装了要运行的委托和该委托新到达数据的任务。Mediator 将该任务传递给适当的 TaskSchedulerLongRunningTaskScheduler 将在单独的线程上运行所有任务,并具有处理器亲和性。因此,所有服务都只在一个处理器上处理它们的数据,而将其他处理器留给 PriorityTaskScheduler。如果您仍然记得我们关于实时系统了解它们运行的硬件的说法?所以,在我的情况下,我知道我将使用四核机器,并通过一系列测试确定该应用程序在所有服务运行在一个处理器上,而另外三个由 PriorityTaskScheduler 用于运行周期性和偶发性任务时运行效果最好。这取决于您来优化您的应用程序和使用的硬件。(因为 TS 是系统中所有数据流的一个入口点,所以在那里记录所有您需要统计的数据非常容易)

Processor Affinity

图 2 - 处理器亲和性

PriorityTaskScheduler 将启动三个线程(每个核心一个),它们将处理传入的任务。Tasks 将排在等待队列中,并每隔预定时间移至就绪运行队列(在必要时执行上下文切换)。现在,我们将运行的委托是定义在 ViewModels 上的方法,这些方法已注册了对特定主题的兴趣。它将通知 View Entity 已更新。

传统上,这必须调用 INotifyPropertyChanged。这种方法存在问题。任何 Windows 老手都知道“UI 元素只能在拥有它们的线程上更新”。这实际上并不正确,或者说不完全正确……应该重新表述为“UI 框架开发人员希望您在拥有它们的线程上更新 UI 元素”。因此,WPF 开发人员也不例外,因此他们强迫我们在属性设置器中调用 INotifyPropertyChanged,并在每次需要更新属性时调用 Dispatcher.BeginInvoke。这显然对我们不起作用,因为 Entity 在多个线程上不断地实时更新,所以想象一下我们每次都调用 Dispatcher.BeginInvoke?GUI 根本不会“非常响应”。因此,我们将允许 Entity 在多个线程上更新,并将 INotifyPropertyChanged 移到一个单独的方法中,该方法可以在我们准备好通知 View 时调用。我可以看到有些人开始考虑潜在的竞态条件,这是真的(在某种程度上),属性的值可能与属性的当前值不同(另一个线程进来改变了它)……然而,由于我们保证在指定的实时约束内(例如 50 毫秒)调用 INotifyPropertyChanged,因此我们保证用户将看到我们收到的最新数据,延迟最多为 50 毫秒。对 INPC 的实际调用是在 NotifyCollection 中完成的(我们稍后会更详细地介绍),所以 VM 开发人员需要做的就是调用 AddOrUpdate 方法,然后就搞定了。

现在请记住,我说过通用设计模式并不总是适用于实时应用程序?在这里肯定就是这种情况,因为在纯 WPF UI 体系结构中,Entity 不会了解 UI 特定元素,例如背景颜色等。但是,由于我们不关心模式而关心实时性,因此我们可以通过提前做大量工作来节省大量的 UI 线程时间。这就是为什么 Entity 知道其单元格内容、颜色、大小,并在后台线程上分配它们,以稍后通知 UI 线程。因此,Entity 成为了那个不断更新的、用于 UI 绑定的元素的脉动缓存,并且 GUI 保证在实时约束内被通知到最新状态。

这就是数据在 WPF Real-Time 框架中的流动方式。事件(来自服务器)变成实体(合并和节流),实体变成任务(保证实时执行)。

WPF Real-Time App

图 3 - WPF 实时应用

通过允许数据自由流动而不担心锁,而是保证 RT 约束,我们能够实现“非常响应的 GUI”和“非常快的更新”。这基本上就是大局。我只想强调一些值得为您的未来项目借鉴的主要类。它们在这里:

关键类

BlockingCircularBuffer

我从 C++ BOOST 库的 CircularBuffer 类中借鉴了这一点,并将其用作 .NET 的 BlockingCollection。所以它实际上与 BlockingCollection 类似。唯一的区别是,与 BlockingCollection 不同,它使用固定大小的数组,这更节省内存。

...
        public void Enqueue(T item)
        {
            lock (_lock)
            {
                while (IsFull() || _writingSuspended)
                {
                    Monitor.Wait(_lock);
                }
                _buffer[_writerIndex] = item;
                _writerIndex++;
                _writerIndex %= _size;
                Monitor.Pulse(_lock);
            }
        }
...
        public T Dequeue()
        {
            T item;
            lock (_lock)
            {
                while (IsEmpty())
                {
                    Monitor.Wait(_lock);
                }
                item = _buffer[_readerIndex];
                _readerIndex++;
                _readerIndex %= _size;
                Monitor.Pulse(_lock);
            }
            return item;
        }
...

NotifyCollection

此类实现 INotifyCollectionChanged,因此我们可以在 AddOrUpdate 中只抛出 OnCollectionChangedNotifyCollectionChangedEventArgs 来最大程度地减少更新次数。这使我们可以将数据存储在 Dictionary 中(这样搜索更方便快捷),并且只抛出 OnCollectionChanged 而无需搜索和更新集合的元素。

    public class NotifyCollection<T> : Collection<T>, ITypedList, INotifyCollectionChanged where T : SelfExplained
    {
         ...
        public void AddOrUpdate(IEnumerable<T> items, bool isReplace)
        {
            foreach (var item in items)
            {
                if (!Contains(item))
                {
                    Add(item);
                    OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
                }
                else
                {
                    if (isReplace)
                    {
                        OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, item, item));
                    }
                    else
                    {
                        foreach (var property in item.GetPropertyNames())
                        {
                            item.OnPropertyChanged(property);
                        }
                    }
                }
            }
        }
         ...
    }

WpfGridColumn

这就是绑定魔法发生的地方。

...
        protected override FrameworkElement GenerateEditingElement(DataGridCell cell, object dataItem)
        {
            TextBox editElement = new TextBox {BorderThickness = new Thickness(0.0), Padding = new Thickness(0.0)};

            System.Windows.Data.Binding textBinding = new System.Windows.Data.Binding(cell.Column.Header + ".DisplayValue")
                                                          {Source = dataItem};
            editElement.SetBinding(TextBox.TextProperty, textBinding);

            System.Windows.Data.Binding backgroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Background")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.BackgroundProperty, backgroundBinding);

            System.Windows.Data.Binding foreGroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Foreground")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.ForegroundProperty, foreGroundBinding);

            return editElement;
        }
...

中介器 (Mediator)

关于这个类说得够多了。您知道它做什么……将任务分发给已注册的用户……并选择正确的 TaskScheduler。

...
        public bool Broadcast(string key, params object[] message)
        {
            List<WeakAction> wr;
            lock (_registeredHandlers)
            {
                if (!_registeredHandlers.TryGetValue(key, out wr))
                    return false;
            }

            foreach (var cb in wr)
            {
                Delegate action = cb.GetMethod();
                if (action == null) continue;
                switch (cb.TaskType)
                {
                    case TaskType.Background:
                        // check if already running
                        if (!_baseQueue.ContainsKey(cb.Target.Target))
                        {
                            var bgTask = _backgroundTaskFactory.StartNew(() => action.DynamicInvoke(message));
                            _baseQueue.Add(cb.Target.Target, bgTask);
                        }
                        break;

                    case TaskType.Periodic:
                        var periodicTask = new PeriodicTask(() => action.DynamicInvoke(message), _budget);
                        periodicTask.Start(_preemptiveScheduler);
                        break;

                    case TaskType.Sporadic:
                        // one Periodic to run all sporadics
                        var sporadicTask = new SporadicTask(() => action.DynamicInvoke(message), _budget);
                        sporadicTask.Start(_preemptiveScheduler);
                        break;
                    case TaskType.LongRunning:
                        //UI
                        _staTaskFactory.StartNew(() => action.DynamicInvoke(message));
                        break;
                }

            }

            lock (_registeredHandlers)
            {
                wr.RemoveAll(wa => wa.HasBeenCollected);
            }

            return true;
        }
...

PriorityTaskScheduler

实现速率单调算法。等待队列、就绪运行队列……我们不是已经讨论过它了吗?

...
       public PriorityTaskScheduler()
        {
            //TODO: create category
            //_messageCounter = new PerformanceCounter();
            _tsp = Convert.ToInt32(ConfigurationManager.AppSettings["TASK_SCHEDULER_PERIOD"]);
            _buferSize = Convert.ToInt32(ConfigurationManager.AppSettings["BUFFER_SIZE"]);
            _threshold = Convert.ToDouble(ConfigurationManager.AppSettings["SUSPENSION_THRESHOLD"]);
            _readyToRunQueue = new BlockingCircularBuffer<PeriodicTask>(_buferSize);
            _waitingQueue = new ConcurrentQueue<Task>();

            var executor = new System.Timers.Timer(_tsp);
            executor.Elapsed += WaitingQueueToReadyToRun;
            executor.Start();

            for (int i = 0; i < Environment.ProcessorCount; i++)
            {
                _backgroundTaskFactory.StartNew(() =>
                {
                    while (true)
                    {
                        var task = _readyToRunQueue.Dequeue();
#if DEBUG
                        _log.Debug("Message Dequeued");
                        //_messageCounter.Decrement();
#endif    
                        if (TryExecuteTask(task))
                        {
                            var span = DateTime.UtcNow - task.TimeCreated;
                            if (DateTime.UtcNow > task.Deadline)
                            {
                                _log.Warn(String.Format("Real-time Deadline exceeded : {0}", span.TotalMilliseconds));
                                //throw new ApplicationException("Real-time Deadline exceeded");
                            }
#if DEBUG
                            _log.Debug("Message Done");
#endif
                        }
                    }
                });
            }    
        }

        private void WaitingQueueToReadyToRun(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task task;
            while (_waitingQueue.TryDequeue(out task))
            {
                // check budget and invoke a context switch
                PeriodicTask headTask;
                if (task is PeriodicTask)
                {
                    headTask = (PeriodicTask)task;
                    var nextToRun = _readyToRunQueue.Peek();

                    if ((nextToRun != null) && (nextToRun.Status == TaskStatus.WaitingToRun)
                        && headTask.Deadline < nextToRun.Deadline)
                    {
                        _log.Info("Context switching at: " + DateTime.UtcNow);
                        var dequeuedTask = _readyToRunQueue.Dequeue();
                        _readyToRunQueue.Enqueue(headTask);
                        _readyToRunQueue.Enqueue(dequeuedTask);
                    }

                    _readyToRunQueue.Enqueue(headTask);
                }  
            }
        }

        protected override void QueueTask(Task task)
        {
#if DEBUG
            _log.Debug("Message Enqueued");
            //_messageCounter.Increment();
#endif
            // jump the queue
            if (task is SporadicTask)
            {
                TryExecuteTask(task);
                _log.Info("Sporadic jumped the queue at: " + DateTime.UtcNow);
                return;
            }
            
            _waitingQueue.Enqueue(task); 
        }
...

BaseServiceObserver

使用 Rx 来合并数据。

...
        public virtual void AddServicesToObserve(IEnumerable<IService> services)
        {
            _entityIndex = new ConcurrentDictionary<ulong, Entity>();

            var observer = services.Select(s => 
                Observable.FromEvent<EventArgs<DataRecord>>(h => s.DataReceived += h, h => s.DataReceived -= h))
                .ToList()
                .Merge();

            observer.Select(x => x.EventArgs.Value)
                    .Where(x => x.DataRecordKey != null)
                    .BufferWithTime(TimeSpan.FromMilliseconds(_od))
                    .Where(x => x.Count > 0)
                    .Subscribe(DataReceived, LogError);
        }
...

BaseViewModel

解析 View 并调用 Initialise(),这样我们就可以摆脱 xaml.cs。

...
        protected BaseViewModel(string view, bool vmResolvesView, bool signedMaintenanceContract)
        {
            Log = LogManager.GetLogger(view);

            // ViewModel resolves the view
            if (vmResolvesView)
            {
                var attr = (ViewAttribute)GetType().GetCustomAttributes(typeof(ViewAttribute), true).FirstOrDefault();
                if (attr == null) throw new ApplicationException("ViewAttribute is missing");

                ViewReference = (FrameworkElement)Activator.CreateInstance(attr.ViewType);

                Uri resourceLocater = new Uri("/" + attr.ViewType.Module.ScopeName.Replace(".dll", "") + ";component/Views/" +
                                                attr.ViewType.Name + ".xaml", UriKind.Relative);
                Application.LoadComponent(ViewReference, resourceLocater);
                ViewReference.DataContext = this;
            }
            // View already resolved the viewModel

            if (ViewReference is Window)
            {
                ((Window)ViewReference).Left = Convert.ToDouble(ViewReference.GetValue(WindowProperties.LeftProperty));
                ((Window)ViewReference).Top = Convert.ToDouble(ViewReference.GetValue(WindowProperties.TopProperty));
                ((Window)ViewReference).Width = Convert.ToDouble(ViewReference.GetValue(WindowProperties.WidthProperty));
                ((Window)ViewReference).Height = Convert.ToDouble(ViewReference.GetValue(WindowProperties.HeightProperty));
            }

            if (signedMaintenanceContract)
                DispatcherFacade = new MaintenanceEngineer(view, Log, Dispatcher);

        }
...

MaintenanceEngineer

监控调度器队列并在队列增长时调用 DoEvents 扩展。

...
        private long _operationsQueueCount = 0;
        private readonly ConcurrentDictionary<DispatcherOperation, object> _operations = new ConcurrentDictionary<DispatcherOperation, object>();

        private void Heartbeat(long l)
        {
            var heartbeat = new Heartbeat(_dynamicViewName, String.Format("{0} View heartbeat sent at: {1}", _dynamicViewName, DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, false);
            Action w = () => Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            AddToDispatcherQueue(w);
        }

        private void MonitorDispatcherQueue(long l)
        {
            if (_operationsQueueCount != 0)
                _log.Info(String.Format("Dispatcher Operations In Queue {0}, ", _operationsQueueCount));

            if (_operationsQueueCount > _qs)
            {
                _log.Info("Pushing all Dispatcher operations");
                Application.Current.DoEvents();
                _operations.Clear();
                Interlocked.Exchange(ref _operationsQueueCount, 0);
            }
        }


        #region IDispatcherFacade Members

        public void AddToDispatcherQueue(Delegate workItem)
        {
            var operation = _dispatcher.BeginInvoke(DispatcherPriority.Background, workItem);

            operation.Completed += (s, o) =>
            {
                Interlocked.Decrement(ref _operationsQueueCount);
                object t;
                _operations.TryRemove((DispatcherOperation)s, out t);
            };
            _operations.TryAdd(operation, null);
            Interlocked.Increment(ref _operationsQueueCount);

        }
        #endregion
...

RealService

一些 WCF 内容。

...
        public RealService()
        {
            // out channel
            _channelFactory = new ChannelFactory<IRemoteSubscriptionService>(
                new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/SubscriptionService");
            _proxy = _channelFactory.CreateChannel();

            // in channel
            ServiceHost host = new ServiceHost(new ConnectionListener(this));
            host.AddServiceEndpoint(typeof(IRemotePublishingService),
                 new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/Bond/PublishingService");
            host.Open();
        }

        private void SendData(DataRecord data)
        {
            EventHandler<EventArgs<DataRecord>> handler = DataReceived;
            if (handler != null)
            {
                handler(this, new EventArgs<DataRecord>(data));
            }
        }

        #region IService Members
        [RegisterInterest(Topic.BondServiceGetData, TaskType.Background)]
        public void GetData()
        {
            try
            {
                _proxy.GetData(new RequestRecord() { AssetType = AssetType });
            }
            catch (Exception ex)
            {
                throw new ApplicationException("WCF channel is closed", ex);
            }
        }

        public event EventHandler<EventArgs<DataRecord>> DataReceived;
...

BondServiceObserver

请注意,我们可以在将“GUI 拥有的”实体传递给 Dispatcher 之前对其进行修改。

    [Export(typeof(BaseServiceObserver))]
    public class BondServiceObserver : BaseServiceObserver
    {
        public BondServiceObserver()
        {
            AddEventExploder(EventExploder);
        }

        public override void AddServicesToObserve(IEnumerable<IService> services)
        {
            //pass filter
            base.AddServicesToObserve(services.Where(s => s.AssetType == AssetType.Bond));
        }

        public void EventExploder(IEnumerable<Entity> messages)
        {
            // apply rules
            RuleEngine.ApplyRules(messages);

            // broadcast news or updates using Mediator EventExploder
            Mediator.GetInstance.Broadcast(Topic.BondServiceDataReceived, messages); 
        }
    }

BondViewModel

示例 View Model。

...
        public BondViewModel() : base("Bond Module", true, true)
        {
            DynamicViewName = "Bond Module";
            _grid = GetRef<DataGrid>("MainGrid");
 
            // InputManager.Current.PreProcessInput += new PreProcessInputEventHandler(Current_PreProcessInput);
            // InputManager.Current.PostProcessInput += new ProcessInputEventHandler(Current_PostProcessInput);

            Entities = new NotifyCollection<Entity>(EntityBuilder.LoadMetadata(AssetType.Common, AssetType.Bond));
            CreateColumnsCommand = new SimpleCommand<Object, EventToCommandArgs>((parameter) => true, CreateColumns);
        }

        [RegisterInterest(Topic.BondServiceDataReceived, TaskType.Periodic)]
        private void DataReceived(IEnumerable<Entity> entities)
        {
            Action w = () => Entities.AddOrUpdate(entities, false);
            DispatcherFacade.AddToDispatcherQueue(w);
        }

        [RegisterInterest(Topic.BondModuleHang, TaskType.Sporadic)]
        private void Hang()
        {
            ProcessOnDispatcherThread(() => Thread.Sleep(10000));
        }

        [RegisterInterest(Topic.BondModuleOpen, TaskType.Sporadic)]
        private void OpenClose(bool  state)
        {
            ProcessOnDispatcherThread(() =>
            {
                if (state)
                    ((Window)ViewReference).Show();
                else
                    ((Window)ViewReference).Hide();
            });
        }
...

Bootstrapper

使用 MEF 来发现和加载组件。

...
        private readonly Action<Lazy<IDynamicViewModel>> _createView = ((t) =>
        {
            var vm = t.Value;

            Mediator.GetInstance.Register(vm);
            Window view = (Window)((BaseViewModel)vm).ViewReference;
            RunningWindows.TryAdd(vm.DynamicViewName, view);
            var heartbeat = new Heartbeat(vm.GetType().ToString(), String.Format("{0} View loaded at: {1}", vm.GetType().ToString(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

            Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            //view.Show();
            view.Closed += (sender, e) => view.Dispatcher.InvokeShutdown();
            Dispatcher.Run();
        });

        private void InjectDynamicViewModels(bool multiDispatchers)
        {
            foreach (var lazy in DynamicViewModels)
            {
                Lazy<IDynamicViewModel> localLazy = lazy;
                if (multiDispatchers)
                {
                    Mediator.GetInstance.Broadcast(Topic.BootstrapperLoadViews, localLazy); 
                }
                else
                {
                    CreateView(localLazy);
                }
                
            }
        }

        private void InjectServices()
        {
            foreach (var lazy in Services)
            {
                var service = lazy.Value;
                Mediator.GetInstance.Register(service);
                var heartbeat = new Heartbeat(service.GetType().ToString(), String.Format("{0} Service loaded at: {1}", service.GetType(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

                Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            }
            //inject service observers
            foreach (var lazy in ServiceObservers)
            {
                lazy.Value.AddServicesToObserve(Services.Select(s => s.Value));
            }
        }

        private void InjectStaticViewModels(MainWindow mainWindow)
        {
            foreach (var vm in StaticViewModels)
            {
                Mediator.GetInstance.Register(vm);
                mainWindow.RibbonRegion.Items.Add(new TabItem { Content = ((BaseViewModel)vm).ViewReference, Header = vm.StaticViewName });
            }
        }
...

终章

我的要求是构建一个“使用 WPF/MVVM 的实时低延迟多线程前台交易应用程序”,该应用程序“需要始终保持响应,非常快地显示交易数据,永不挂起、冻结、崩溃或耗尽内存”。

我通过使用 .NET 并行任务框架实现速率单调调度算法实现了这一点,因此它是实时的。

我在多个 Dispatcher 上运行 GUI,因此它非常响应。

我允许数据自由流动并在非 UI 线程上修改“GUI 元素”,因此它更新 GUI 的速度非常快。

我厚颜无耻地窃取了其他 WPF 开发人员的组件,特别是 Sacha Barber、Marlon Grech、Kent Boogaart。

我认为我失败的唯一地方是提供一个通用的机制来处理偶发性事件。目前,由开发人员决定将事件标记为偶发性。理想情况是有一个机制,在每个用户操作进入 WPF 事件路由之前都被自动捕获并作为偶发性事件处理。我曾试图为此使用 System.Windows.Input.InputManager,但失败了。因此,如果有人知道如何拦截 WPF 中的任何用户输入并在我们的 TaskScheduler 中将其作为偶发性事件进行处理,我很想听听您的意见。

© . All rights reserved.