使用 Rx 进行动态聚合订单簿






4.82/5 (5投票s)
使用响应式扩展来构建和消费聚合订单簿。
访问 Bitbucket 仓库以浏览随附代码。欢迎 Fork 并提交 Pull Request...
引言
对于给定的资产,聚合订单簿结合了来自多个交易所的数据,并呈现了两个排序表,分别称为 Bids
(买盘)和 Asks
(卖盘);Bids
包含买入订单,而 Asks
包含卖出订单。在查看订单簿的人看来,Bids
中的最顶行代表他们可以卖出的最高价格,而 Asks
中的最顶行代表他们可以买入的最便宜价格。
在给定时间和给定资产下,聚合订单簿可能看起来像这样
在此示例中,希望购买 10 单位该资产的市场参与者会看到,对于 10 单位的交易量,参与者愿意出售的最便宜价格可以在交易所 B 和 A 上找到,价格分别为 101 和 101.5。在其他条件相同的情况下,拆分订单的最佳方式是在交易所 B 购买 5 单位,在交易所 A 购买 5 单位,总计 1012.5。如果坚持单一交易所,则在 B 处将花费 1030(5*101+ 5*105),或在 A 处花费 1027.5(101.5*5 +104*5)。将订单拆分到多个交易所有助于节省总执行价格。
既然已经确定聚合订单簿并非完全没有意义,我们如何在实时构建它们呢?需要同时从多个场所获取数据,并在每次更新到达时重新排列聚合订单簿。如果采用面向对象思维,这个练习绝非易事。然而,采用更函数式的方法并依赖 .NET 的响应式扩展(Rx),事实证明是一条有益的途径。对于不熟悉 Rx 的人来说,首选的信息来源是 www.introtorx.com。
不可变对象流
该问题可以看作是组装推送到消费者端的不可变对象流。每个交易所都公开一个订单簿流,其中仅包含来自该交易所的订单。我们将这些源合并成一个可观察序列并按顺序处理项目。每次新的订单簿进来时,我们都会将其通过一个累加器函数,该函数将其与聚合订单簿的最新版本结合,并返回中间值。通过累加器函数运行合并流(一个称为扫描的过程)的结果是聚合订单簿的可观察序列。该序列可以被处理以执行诸如更新动态显示、寻找套利机会或仅显示到控制台等操作。
直接跳到本文的结论,以下是上述图表在定义了适当的抽象之后在代码中的体现
IExchangeClient exchangeA = new MockClient(0);
IExchangeClient exchangeB = new MockClient(1);
IExchangeClient exchangeC = new MockClient(2);
var mergedOrderBookStream = Observable.Merge(exchangeA.OrderBookStream,
exchangeB.OrderBookStream,
exchangeC.OrderBookStream);
var aggregatedOrderBookStream = mergedOrderBookStream.Scan(
new AggregatedOrderBook(),
(aob, orderBook) => aob.InsertBook(orderBook));
var consoleSubscription = aggregatedOrderBookStream.Subscribe(Console.WriteLine);
IExchangeClient
与各个交易所通信的代码被抽象到 IExchangeClient
接口后面。
public interface IExchangeClient
{
int ExchangeID
{
get;
}
IObservable<ExchangeOrder> OrderStream
{
get;
}
IObservable<IOrderBook> OrderBookStream
{
get;
}
Task<ExchangeOrder> SubmitOrder(OrderSide oSide, OrderType oType, decimal price, decimal size);
Task<ExchangeOrder> CancelOrder(ExchangeOrder order);
void Connect();
void Disconnect();
}
每个实现都包含将信息传输到和从底层场所传输的特定逻辑,无论是通过 HTTP Web API、WebSockets 还是 FIX 协议,都向应用程序的其余部分公开相同的属性和方法。可查询数据(例如订单和订单簿)以**将数据推送到消费者**的流的形式呈现。在 Rx 中,此类对象由 IObservable
接口定义。
例如,我们假设某个交易所公开了一个 HTTP 方法来获取最新的订单簿,并且我们已经编写了一个 HttpGetOrderBook()
方法来执行该请求。我们希望每 5 秒调用一次此方法,等待回复,然后将结果推送到流中供我们的消费者使用。如果请求耗时超过 10 秒,我们希望它超时但继续重试。公开的订单簿序列必须对每个消费者都相同。以下是我们如何使用 Rx 实现此目的
var _orderBookStream = Observable.Create<IOrderBook>(obs =>
{
obs.OnNext(HttpGetOrderBook());
return Disposable.Empty;
})
.Concat(Observable.Empty<IOrderBook>().Delay(5))
.Repeat()
.Timeout(TimeSpan.FromSeconds(10))
.Retry()
.Publish();
这将调用 HttpGetOrderBook()
,将值推送到订阅者,等待结果序列完成,然后连接延迟 5 秒的空序列。延迟后,该过程会重复,再次发送请求,等待响应,然后再次暂停 5 秒,再重复。如果任何请求耗时过长,我们会引入超时,并在发生超时时重试。Publish
关键字通过将序列转换为 IConnectableObservable
来保证所有订阅者都消耗相同的流。一旦我们在此序列上调用 Connect()
,数据就会开始被推送,所有订阅者都将感知到相同的 orderbooks
流。要断开连接,我们只需处置 Connect()
返回的对象。IExchangeClient
接口公开了两种方法(Connect
和 Disconnect
),允许启动和停止从单个交易所接收数据。
在第一个代码片段中,我使用了三个 MockClients
。这是一种 IExchangeClient
类型,它模拟交易所的活动,可用于执行测试。它实际上是此应用程序中最难构建的部分,可能值得单独写一篇文章,因为其中也有一些非常酷的 Rx 代码。
IOrderBook、OrderBook 和 OrderComparer
OrderBooks
只是围绕两个已排序的 Bids
和 Asks
集合的简单封装。
public interface IOrderBook
{
int ExchangeID
{
get;
}
ImmutableSortedSet<ExchangeOrder> Bids
{
get;
}
ImmutableSortedSet<ExchangeOrder> Asks
{
get;
}
}
已排序的集合使用 ImmutableSortedSet
类型定义,这是一个 .NET 类,它根据可以显式提供的**比较器**处理所有排序逻辑。
public class OrderBook : IOrderBook
{
public OrderBook(int exchnangeID, IEnumerable<ExchangeOrder> bids, IEnumerable<ExchangeOrder> asks)
{
ExchangeID = exchnangeID;
Bids = bids.ToImmutableSortedSet(comparer: OrderComparer.DescBidComparer());
Asks = asks.ToImmutableSortedSet(comparer: OrderComparer.DescAskComparer());
}
public int ExchangeID { get; private set; }
public ImmutableSortedSet<ExchangeOrder> Bids { get; private set; }
public ImmutableSortedSet<ExchangeOrder> Asks { get; private set; }
}
在这种情况下,我们实现了一个订单比较器,它接收两个订单,如果第一个订单分别优于、等效于或劣于第二个订单,则返回 -1、0 或 1。
public class OrderComparer : Comparer<ExchangeOrder>
{
//-1 for Bid comparer
//1 for Ask comparer
readonly int _priceComparisonCoeff;
OrderComparer(int priceComparisonCoeff)
{
_priceComparisonCoeff = priceComparisonCoeff;
}
//!! counterintuitive but we need SortedSets to be in descending order. Instead of calling reverse
//all the time, we implement the behavior in the comparer
//Returns: -1 if x is better than y
// 0 if x is equivalent to y
// 1 if x is worse than y
//
// bids and asks:> market order always wins over limit order
// if both market orders and same timestamp, most remaining volume wins
//
// bids:> most expensive wins
// asks:> cheapest wins,
// if same price, oldest wins,
// if same price and timestamp, most remaining volume wins
//
//ASSUMPTION: x and y are on the same side (BID or ASK).
//We do not look at ID or ExchangeID
//The order of comparisons is: price, timestamp, remainingvolume (note that size does not matter)
public override int Compare(ExchangeOrder x, ExchangeOrder y)
{
//one market order and one limit order
if (x.OType == OrderType.Market && y.OType == OrderType.Limit)
{
return -1;
}
if (x.OType == OrderType.Limit && y.OType == OrderType.Market)
{
return 1;
}
//two limit orders
if (x.Price.CompareTo(y.Price) != 0)
{
return _priceComparisonCoeff * x.Price.CompareTo(y.Price);
}
if (x.UTCTimestamp.CompareTo(y.UTCTimestamp) != 0)
{
return x.UTCTimestamp.CompareTo(y.UTCTimestamp);
}
//two market orders or two limit orders with the same prices
if (x.RemainingVolume.CompareTo(y.RemainingVolume) != 0)
{
return -x.RemainingVolume.CompareTo(y.RemainingVolume);
}
//they have the same characteristics. not necessary same ID
//not good because we are not supposed to have two equivalent orders in the orderbook
return 0;
}
public static OrderComparer DescBidComparer()
{
return new OrderComparer(-1);
}
public static OrderComparer DescAskComparer()
{
return new OrderComparer(1);
}
}
AggregatedOrderBook(聚合订单簿)
聚合订单簿看起来就像它的标准对应物,但它结合了来自多个场所的订单。它包含两个用适当的比较器(用于 Bids
和 Asks
)实例化的 ImmutableSortedSets
,以及一个累加器函数,该函数在每次收到新订单簿形式的更新时,都会重新排列它们之间的订单。InsertBook
方法以常规订单簿为参数(其中包含属于单个交易所的订单),返回聚合订单簿的新副本,其中受更新影响的交易所的所有订单都替换为传入的订单。ImmutableSortedSet
和 OrderComparer
的组合协同工作,以确保 Bids
和 Asks
保持以适当的顺序排列。
public AggregatedOrderBook InsertBook(IOrderBook orderBook)
{
int exchangeID = orderBook.ExchangeID;
var modifiedAggregatedBids = Bids;
var modifiedAggregatedAsks = Asks;
var correspondingBids = Bids.Where(a => a.ExchangeID == exchangeID);
modifiedAggregatedBids = Bids.Except(correspondingBids).Union(orderBook.Bids);
var correspondingAsks = Asks.Where(a => a.ExchangeID == exchangeID);
modifiedAggregatedAsks = Asks.Except(correspondingAsks).Union(orderBook.Asks);
return new AggregatedOrderBook(modifiedAggregatedBids, modifiedAggregatedAsks);
}
Using the Code
通过合并多个源设置聚合订单簿流后,我们可以开始消费这些数据。用 Rx 的术语来说,我们可以**订阅可观察序列**。订阅序列意味着设置一个操作,以便每次有新项目推送时执行。
WPF 应用程序
我们可能想做的第一件事是更新动态显示。WPF 应用程序允许可视化数据流的运动。左侧列是三个单独交易所的控件,我们希望合并它们的订单簿。在这里,我再次使用了 3 个 MockClients
,因此订单是随机生成的。每个场所都有一个 Start
和 Stop
按钮,它们简单地控制着订单流入管道的流程。
我们希望对每个新传入的订单簿执行的操作是替换在主视图模型中定义为简单 Lists
的 Bids
和 Asks
集合。**WPF 绑定机制**和 INotifyPropertyChanged
负责更新 UI。在这里,重要的是要注意我们需要在后台线程上订阅并在 UI 线程上观察。Rx 使用 SubscribeOn
和 ObserveOnDispatcher
方法使其变得非常容易。
//subscribe to the aggregatedOrderBookStream.
//each time a new aggregated orderbook is received, the list of bids and asks is updated in the UI.
//ATTENTION: no elements will be observed until at least one exchange is connected to
aggregateStreamSubscription = aggregatedOrderBookStream.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe((aob) =>
{
Bids = aob.Bids.ToList();
Asks = aob.Asks.ToList();
});
识别套利机会
运行 WPF 应用一段时间后,可能会注意到有时最佳买价会比最佳卖价更贵
这带来了在某个交易所低价买入并在另一个交易所高价卖出的机会。这种免费午餐被称为套利。
在我们的框架中,我们将套利机会表示为围绕两个字典的包装器,其中包含每个交易所的买入和卖出数量。
public class ArbitrageOpportunity
{
public ArbitrageOpportunity(Dictionary<int, decimal> buyDico, Dictionary<int, decimal> sellDico)
{
BuyDico = buyDico;
SellDico = sellDico;
}
/// <summary>
/// dictionary of [exchangeID, buyVolume] representing the amounts that should be bought on each exchange
/// </summary>
public Dictionary<int, decimal> BuyDico { get; private set; }
/// <summary>
/// dictionary of [exchangeID, sellVolume] representing the amounts that should be sold on each exchange
/// </summary>
public Dictionary<int, decimal> SellDico { get; private set; }
}
然后,我们可以通过一个方法扩展 AggregatedOrderBook
类来识别套利机会,并使用简单的 Rx Select
语句将聚合订单簿的可观察序列投影到 ArbitrageOpportunities
序列上。
//project the aggregated orderbook stream onto a stream of arbitrages
var arbitrageStream = aggregatedOrderBookStream.Select(aob => aob.LookForArbitrage());
最后,我们可以订阅套利序列并分派适当的订单以利用该机会。
//pass each non-null arbitrage through the orderDispatcher which handles concurrency and race conditions internally
arbitrageStreamSubscription = arbitrageStream.Where(a => a != null).Subscribe(a => orderDispatcher.Put(a));
最后一段代码只是为了提供一个可能实现的想法,但没有提供相应的代码,因为它超出了本文的范围。
结论
响应式编程模型帮助解决了复杂的并发问题,其代码既可测试又可读。本文中引入的抽象使得能够插入任何类型的交易所 API 并为真实市场构建聚合订单簿。