一个基本的CEP引擎,用于实时分析股票市场数据
我们能使用LinqRx作为简陋的复杂事件处理器吗?
引言
如果您从未听说过复杂事件处理 (CEP),以及它是什么以及为什么您可能想使用它,那么 维基百科 是一个不错的起点。或者也许微软的 StreamInsight 的介绍会更好。
市面上有很多免费和付费的解决方案。对于 .NET 开发者来说,主要有两个似乎是:
StreamInsight - 编程非常愉快,但需要 SQL 许可证
NEsper - 有时有点晦涩难懂,但它是 Esper 的免费端口,并且功能稍多一些。
假设我们不想学习新的 API,并且已经熟悉 Rx,而且我们也不想额外支付许可费用。我们能否利用 Rx 来廉价地自己实现?
背景
我不想仅仅展示一些说明理论的代码,而是想提供一个实际的例子。所以我为自己设定了目标:
1) 创建一个东西,可以查询免费的在线股票行情数据源。
2) 定期检索股票行情数据并将其推送到订阅者。
3) 分离这些更新,为实时数据创建流或窗口。
4) 对这些窗口执行时间序列计算。
我使用了 TPL 和一个名为 Yahoo! Managed 的 NuGet 包的组合来解决 1 和 2。虽然雅虎会延迟更新,但我们可以持续轮询它以将更新推送到我们的系统中。不幸的是,Google Finance 不再提供免费 API,所以我们不能使用它。
对于 3 和 4,我改编了 MSDN 中的示例,并从 Intro To Rx 、101 Rx samples 以及 codeproject 上关于 Rx 的无数其他文章中汲取了灵感。
使用代码
所附代码是使用 VSExpress 2012 for Windows Desktop 开发的,因此应该可以从

这里发生的是,首先程序创建一个 QuoteStream 来封装纳斯达克 100 指数的所有组件价格行情,即苹果、微软、英特尔等的价格。其次,因为我们只对苹果感兴趣,所以我们创建一个 QuoteView,它使用过滤器订阅 QuoteStream 的更新。
appleView.SubscribeTo(allQuotes.Where(x => x.ID == "AAPL"));
第三,我们创建一个 QuoteSlidingWindow,它将保留由 TimeSpan 对象和 QuotesData.LastTradeTime 属性定义的窗口的历史数据。同样,滑动窗口订阅 View 发布的内容,并且它本身是可观察的,因此我们可以订阅并使用 Rx 进行计算,例如 5 分钟移动平均价格。
static void CalculateAverage(IList<QuotesData> quotes) { double average = quotes.Average(x => x.LastTradePriceOnly); }
关注点
我使用 **Stream** 这个词来大致类比 StreamInsight 使用的术语,并实现 IObservable<T>。这里的 Stream 对象可以保存从外部源更新的数据,并可以将更新推送到观察者。
**View** 是 Stream 的任何过滤后的派生,但实现 IObserver<T> 和 IObservable<T>,以便更新可以通过中间容器进行管道传输和过滤。
**SlidingWindow** 是数据序列的一个离散窗口,它随着每个更新向前增量移动。类似于 SQL 分区/窗口,并与其他 CEP 实现中的滑动窗口非常相似。当一个值超出距离下一个更新向前延伸的时间段时,窗口将发布其结果,然后删除该值。
要在 C# 中实现这一点,我们需要做两件事。首先,覆盖 IObserver<T> 接口的 OnNext 函数,以便推送到它的更新修改底层数据并适时发布。
public override void OnNext(QuotesData value)
{
DataStream.Enqueue(value);
if (DataStream.Count > 0)
{
var latestQuoteTime = DataStream.Max(x => x.LastTradeTime);
bool publish = false;
//find and remove quotes that are now outside of the sliding window
while (DataStream.Count > 0)
{
var first = DataStream.Peek();
if (latestQuoteTime - first.LastTradeTime >= WindowLength)
{
DataStream.Dequeue();
publish = true;
}
else
break;
}
//we publish to subscribers only if a quote is exiting the window period
if (publish)
{
foreach (var observer in Observers)
observer.OnNext(DataStream.ToList());
}
}
}
其次,我们必须实现 IObservable<IList<T>>,以便我们可以将当前窗口数据管道传输到其观察者。我像这样在 QuoteWindow 基类上完成:
public IDisposable Subscribe(IObserver<IList<QuotesData>> observer)
{
if (!Observers.Contains(observer))
Observers.Add(observer);
return new WindowUnsubscriber(Observers, observer);
}
**TumblingWindow** 与滑动窗口类似,只不过它在序列的第一个值进入时开始,并在超出窗口长度的值进入时结束。例如,如果窗口是 5 分钟,第一个值是 13:00:00,那么 13:05:00 或更晚的值将完成窗口,发布结果,然后清除任何底层数据。
我们可以重用我们为 SlidingWindow 已经创建的许多代码来 TumblingWindow 的实现。实际上,QuoteTumblingWindow 只是覆盖了 OnNext。
public override void OnNext(QuotesData value)
{
//does this new value complete the current window? If so publish and then reset.
if (DataStream.Count > 0)
{
if (value.LastTradeTime - WindowStart >= WindowLength)
{
foreach (var observer in Observers)
observer.OnNext(DataStream.ToList());
DataStream.Clear();
}
}
//start a new window
if (DataStream.Count == 0)
WindowStart = value.LastTradeTime;
DataStream.Enqueue(value);
}
如您所见,我们可以(相对)轻松地实现所需的接口,并进行一些自定义,以实现一个管道和过滤模式,直到最终函数来执行我们需要的任何时间序列计算。我们还应该能够根据需要添加和删除对上述每个实体的订阅。
增加价值
在完成了基础架构之后,我们现在可以开始为我们接收到的数据增加价值。交易领域中两个常见的指标是时间加权平均价 (TWAP) 和成交量加权平均价 (VWAP)。
**TWAP** ,传统上是通过找到时间窗口内每个 tick 的开盘价、收盘价、最高价和最低价的平均值,然后取这些值的平均值来计算的。我采用了 Tradestation 上概述的更简单、可能更好的方法来计算每个时间窗口的 TWAP 值。
private static double twap(IList<QuotesData> quotes)
{
if (quotes.Count > 0)
{
DateTime max = DateTime.MinValue;
DateTime min = DateTime.MaxValue;
string ticker = quotes[0].ID;
double totalPrice = 0.0;
int n = quotes.Count;
double twap = 0.0;
foreach (var q in quotes)
{
totalPrice += q.LastTradePriceOnly;
if (q.LastTradeTime > max)
max = q.LastTradeTime;
if (q.LastTradeTime < min)
min = q.LastTradeTime;
}
if (n > 0 & totalPrice > 0.0)
{
twap = totalPrice / n;
return twap;
}
}
return double.NaN;
}
**VWAP** ,与 TWAP 类似,除了我们用交易的股票量来加权每个 tick。不用说,卖出一百万股股票的价格为 1.00 美元比卖出一股股票的价格为 100 美元更具相关性。同样,我为每个发布的时间窗口计算 VWAP。
private static double vwap(IList<QuotesData> quotes)
{
if (quotes.Count > 0)
{
DateTime max = DateTime.MinValue;
DateTime min = DateTime.MaxValue;
string ticker = quotes[0].ID;
double totalWeightedPrice = 0.0;
double totalQuantity = 0.0;
double vwap = 0.0;
foreach (var q in quotes)
{
double quantity = (double)q.Values(QuoteProperty.LastTradeSize);
totalWeightedPrice += (q.LastTradePriceOnly * quantity);
totalQuantity += quantity;
if (q.LastTradeTime > max)
max = q.LastTradeTime;
if (q.LastTradeTime < min)
min = q.LastTradeTime;
}
if (totalQuantity > 0 & totalWeightedPrice > 0.0)
{
vwap = totalWeightedPrice / totalQuantity;
return vwap;
}
}
return double.NaN;
}
要生成更长时间运行的 TWAP 或 VWAP,我们可以随时将结果管道传输到另一个 Window。同样,如果我们想做一些更复杂的事情,比如根据我们的计算执行新的交易,那么我们需要用 IObserver<T> 包装我们的执行 API,然后将构建订单所需的相应数据管道传输过去。但这将是另一天的话题。
结论
在处理了 StreamInsight 和 Esper (Java) 但没有 Rx 之后,这篇文章的研究和编写大约花费了 2 个工时。在英国,IT 经理通常将开发项目成本定为每天 600 英镑,所以我们可以估计在这种情况下成本不到 1500 英镑。与零售价约为 2500 英镑的 SQL Server 标准版相比,再加上实施成本,表面上看我们节省了一笔可观的费用。
然而,限制在于该解决方案的可扩展性不高,并且不支持许多其他真正的 CEP 解决方案所提供的广泛功能。
故障排除
您可能会发现默认实现只检索一个股票报价后就暂停。如果是这样,那么很可能是交易所已过交易时间。如果是这种情况,我提供了富时 100 指数 (FTSE-100) 的符号,以便您可以轻松切换,但如果您想要其他东西,目前您需要从雅虎复制代码/粘贴。
可能的未来更新
更复杂的标准计算,例如投资组合 VaR
这与 StreamInsight 或 NEsper (C#) 或 Esper (Java) 中的相应计算相比如何?
所有反馈都非常感谢。
历史
修订 #1 - 文章和代码的骨架
修订 #2 - 添加了 TumblingWindows、TWAP 和 VWAP 的计算器。