65.9K
CodeProject 正在变化。 阅读更多。
Home

一个基本的CEP引擎,用于实时分析股票市场数据

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (10投票s)

2014年4月10日

CPOL

6分钟阅读

viewsIcon

29864

downloadIcon

931

我们能使用LinqRx作为简陋的复杂事件处理器吗?

引言

如果您从未听说过复杂事件处理 (CEP),以及它是什么以及为什么您可能想使用它,那么 维基百科 是一个不错的起点。或者也许微软的 StreamInsight 的介绍会更好。

市面上有很多免费和付费的解决方案。对于 .NET 开发者来说,主要有两个似乎是:

StreamInsight - 编程非常愉快,但需要 SQL 许可证

NEsper - 有时有点晦涩难懂,但它是 Esper 的免费端口,并且功能稍多一些。

假设我们不想学习新的 API,并且已经熟悉 Rx,而且我们也不想额外支付许可费用。我们能否利用 Rx 来廉价地自己实现?

背景

我不想仅仅展示一些说明理论的代码,而是想提供一个实际的例子。所以我为自己设定了目标:

1) 创建一个东西,可以查询免费的在线股票行情数据源。

2) 定期检索股票行情数据并将其推送到订阅者。

3) 分离这些更新,为实时数据创建流或窗口。

4) 对这些窗口执行时间序列计算。

我使用了 TPL 和一个名为 Yahoo! Managed 的 NuGet 包的组合来解决 1 和 2。虽然雅虎会延迟更新,但我们可以持续轮询它以将更新推送到我们的系统中。不幸的是,Google Finance 不再提供免费 API,所以我们不能使用它。

对于 3 和 4,我改编了 MSDN 中的示例,并从 Intro To Rx 101 Rx samples 以及 codeproject 上关于 Rx 的无数其他文章中汲取了灵感。

使用代码

所附代码是使用 VSExpress 2012 for Windows Desktop 开发的,因此应该可以从 构建和运行。对于其他版本,您的使用方式可能会有所不同,但一切顺利的话,您应该会看到类似这样的内容:

这里发生的是,首先程序创建一个 QuoteStream 来封装纳斯达克 100 指数的所有组件价格行情,即苹果、微软、英特尔等的价格。其次,因为我们只对苹果感兴趣,所以我们创建一个 QuoteView,它使用过滤器订阅 QuoteStream 的更新。

 appleView.SubscribeTo(allQuotes.Where(x => x.ID == "AAPL"));

第三,我们创建一个 QuoteSlidingWindow,它将保留由 TimeSpan 对象和 QuotesData.LastTradeTime 属性定义的窗口的历史数据。同样,滑动窗口订阅 View 发布的内容,并且它本身是可观察的,因此我们可以订阅并使用 Rx 进行计算,例如 5 分钟移动平均价格。

        static void CalculateAverage(IList<QuotesData> quotes)
        {
            double average = quotes.Average(x => x.LastTradePriceOnly);
        }  

关注点

我使用 **Stream** 这个词来大致类比 StreamInsight 使用的术语,并实现 IObservable<T>。这里的 Stream 对象可以保存从外部源更新的数据,并可以将更新推送到观察者。

**View** 是 Stream 的任何过滤后的派生,但实现 IObserver<T> 和 IObservable<T>,以便更新可以通过中间容器进行管道传输和过滤。

**SlidingWindow** 是数据序列的一个离散窗口,它随着每个更新向前增量移动。类似于 SQL 分区/窗口,并与其他 CEP 实现中的滑动窗口非常相似。当一个值超出距离下一个更新向前延伸的时间段时,窗口将发布其结果,然后删除该值。

要在 C# 中实现这一点,我们需要做两件事。首先,覆盖 IObserver<T> 接口的 OnNext 函数,以便推送到它的更新修改底层数据并适时发布。

  public override void OnNext(QuotesData value)
        {

            DataStream.Enqueue(value);

            if (DataStream.Count > 0)
            {
                var latestQuoteTime = DataStream.Max(x => x.LastTradeTime);
                bool publish = false;

                //find and remove quotes that are now outside of the sliding window
                while (DataStream.Count > 0)
                {
                    var first = DataStream.Peek();
                    if (latestQuoteTime - first.LastTradeTime >= WindowLength)
                    {
                        DataStream.Dequeue();
                        publish = true;
                    }
                    else
                        break;
                }

                //we publish to subscribers only if a quote is exiting the window period
                if (publish)
                {
                    foreach (var observer in Observers)
                        observer.OnNext(DataStream.ToList());
                }
            }

        } 

其次,我们必须实现 IObservable<IList<T>>,以便我们可以将当前窗口数据管道传输到其观察者。我像这样在 QuoteWindow 基类上完成:

        public IDisposable Subscribe(IObserver<IList<QuotesData>> observer)
        {
            if (!Observers.Contains(observer))
                Observers.Add(observer);
            return new WindowUnsubscriber(Observers, observer);
        } 

**TumblingWindow** 与滑动窗口类似,只不过它在序列的第一个值进入时开始,并在超出窗口长度的值进入时结束。例如,如果窗口是 5 分钟,第一个值是 13:00:00,那么 13:05:00 或更晚的值将完成窗口,发布结果,然后清除任何底层数据。

我们可以重用我们为 SlidingWindow 已经创建的许多代码来 TumblingWindow 的实现。实际上,QuoteTumblingWindow 只是覆盖了 OnNext。

public override void OnNext(QuotesData value)
        {
            //does this new value complete the current window? If so publish and then reset.
            if (DataStream.Count > 0)
            {
                if (value.LastTradeTime - WindowStart >= WindowLength)
                {
                    foreach (var observer in Observers)
                        observer.OnNext(DataStream.ToList());
                    DataStream.Clear();
                }
            }
            //start a new window
            if (DataStream.Count == 0)
                WindowStart = value.LastTradeTime;
            DataStream.Enqueue(value);
 
        } 

如您所见,我们可以(相对)轻松地实现所需的接口,并进行一些自定义,以实现一个管道和过滤模式,直到最终函数来执行我们需要的任何时间序列计算。我们还应该能够根据需要添加和删除对上述每个实体的订阅。

增加价值

在完成了基础架构之后,我们现在可以开始为我们接收到的数据增加价值。交易领域中两个常见的指标是时间加权平均价 (TWAP) 和成交量加权平均价 (VWAP)。

**TWAP** ,传统上是通过找到时间窗口内每个 tick 的开盘价、收盘价、最高价和最低价的平均值,然后取这些值的平均值来计算的。我采用了 Tradestation 上概述的更简单、可能更好的方法来计算每个时间窗口的 TWAP 值。

private static double twap(IList<QuotesData> quotes)
        {
            if (quotes.Count > 0)
            {
                DateTime max = DateTime.MinValue;
                DateTime min = DateTime.MaxValue;
                string ticker = quotes[0].ID;
                double totalPrice = 0.0;
                int n = quotes.Count;
                double twap = 0.0;
                foreach (var q in quotes)
                {
                    totalPrice += q.LastTradePriceOnly;
                    if (q.LastTradeTime > max)
                        max = q.LastTradeTime;
                    if (q.LastTradeTime < min)
                        min = q.LastTradeTime;
                }
                if (n > 0 & totalPrice > 0.0)
                {
                    twap = totalPrice / n;
                    return twap;
                }
            }
            return double.NaN;
        } 

**VWAP** ,与 TWAP 类似,除了我们用交易的股票量来加权每个 tick。不用说,卖出一百万股股票的价格为 1.00 美元比卖出一股股票的价格为 100 美元更具相关性。同样,我为每个发布的时间窗口计算 VWAP。

 private static double vwap(IList<QuotesData> quotes)
        {
            if (quotes.Count > 0)
            {
                DateTime max = DateTime.MinValue;
                DateTime min = DateTime.MaxValue;
                string ticker = quotes[0].ID;
                double totalWeightedPrice = 0.0;
                double totalQuantity = 0.0;
                double vwap = 0.0;
                foreach (var q in quotes)
                {
                    double quantity = (double)q.Values(QuoteProperty.LastTradeSize);
                    totalWeightedPrice += (q.LastTradePriceOnly * quantity);
                    totalQuantity += quantity;
                    if (q.LastTradeTime > max)
                        max = q.LastTradeTime;
                    if (q.LastTradeTime < min)
                        min = q.LastTradeTime;
                }
                if (totalQuantity > 0 & totalWeightedPrice > 0.0)
                {
                    vwap = totalWeightedPrice / totalQuantity;
                    return vwap;
                }
            }
            return double.NaN;
        } 

要生成更长时间运行的 TWAP 或 VWAP,我们可以随时将结果管道传输到另一个 Window。同样,如果我们想做一些更复杂的事情,比如根据我们的计算执行新的交易,那么我们需要用 IObserver<T> 包装我们的执行 API,然后将构建订单所需的相应数据管道传输过去。但这将是另一天的话题。

结论

在处理了 StreamInsight 和 Esper (Java) 但没有 Rx 之后,这篇文章的研究和编写大约花费了 2 个工时。在英国,IT 经理通常将开发项目成本定为每天 600 英镑,所以我们可以估计在这种情况下成本不到 1500 英镑。与零售价约为 2500 英镑的 SQL Server 标准版相比,再加上实施成本,表面上看我们节省了一笔可观的费用。

然而,限制在于该解决方案的可扩展性不高,并且不支持许多其他真正的 CEP 解决方案所提供的广泛功能。

故障排除

您可能会发现默认实现只检索一个股票报价后就暂停。如果是这样,那么很可能是交易所已过交易时间。如果是这种情况,我提供了富时 100 指数 (FTSE-100) 的符号,以便您可以轻松切换,但如果您想要其他东西,目前您需要从雅虎复制代码/粘贴。

可能的未来更新

更复杂的标准计算,例如投资组合 VaR

这与 StreamInsight 或 NEsper (C#) 或 Esper (Java) 中的相应计算相比如何?

所有反馈都非常感谢。

历史

修订 #1 - 文章和代码的骨架

修订 #2 - 添加了 TumblingWindows、TWAP 和 VWAP 的计算器。

© . All rights reserved.