65.9K
CodeProject 正在变化。 阅读更多。
Home

Interactive Brokers 市场记录器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.76/5 (10投票s)

2016年3月14日

MIT

8分钟阅读

viewsIcon

47906

downloadIcon

1337

一个工具,它捕获一组符号的实时市场数据,并将它们以1秒快照的形式保存到SQL数据库中。

注意:下载前,请查阅要求设置说明

引言

MarketRecorder 连接到 Interactive Broker 的 (IB) TWS API,并捕获一组符号的实时流式事件,如买卖价、成交量、最高价、最低价、近期交易等。这些原始事件数据每秒直接存储到新的数据库记录中。

此工具可用于下载市场数据的不同原因。对我而言,我使用实时数据通过 AI 预测股票,但它也可以用于几乎任何事情。要使用它,需要一个 Interactive Broker 的账户或免费的 TWS 演示版本。演示版本是虚假信息,但可用于测试。

本文是 MarketRecorder 的一篇博文,它是几个松散关联的应用程序之一。此应用程序主要将原始实时逐笔数据直接保存到数据库。保存这些近乎随机事件的杂乱数据很重要,因为它能真实地反映实际交易。还有一个名为 BriefMaker 的第二项目,它将这些数据提升到一个新的水平,通过将事件数据转换为更熟悉和实用的时间间隔摘要,例如每6秒一次的最高价/最低价/收盘价/成交量。这种类型的数据可以放入具有固定时间间隔作为一轴的电子表格中。BriefMaker 将在另一篇文章中介绍。

Market Recorder 只记录事件。保存这些流式事件的优点在于,事件构成了市场。市场是由几乎随机发生的各种事件组成的混乱集合。这种混乱的数据形式就是市场。一旦这些数据被转换,例如由 BriefMaker 转换为6秒摘要快照,市场的真实性就会丢失一部分,并且无法恢复。保存原始事件数据允许应用程序(如 BriefMaker)使用不同的参数或代码重新运行。

使用 API 收集数据

下面是 MarketRecorder 的数据流图。

市场数据首先通过互联网或专用线路直接进入 Interactive Brokers (IB) TWS 应用程序。TWS 具有内置 API,允许用户执行几乎任何操作,从访问市场数据到检查订单状态再到执行交易。

要检索数据,我们必须首先建立 TWS API 连接。为清晰起见,我已删除了一些非必需行。

logger.Info("Connecting to Interactive Brokers API.");
IBClient client = new IBClient();
Thread.Sleep(1000);

int retryCt = 5;
while (!client.Connected)
{
    Thread.Sleep(2000); 
    if ((retryCt--) <= 0)
    {
        logger.Info("Tried to reconnect 5 times but failed - aborting.");
        return; // abort the function.
    }

    try
    {
        logger.Info("Connecting to TWS..." + "(Try " + (4 - retryCt) + " of 5)");
        client.Connect(settings.IBHostToConnectTo, settings.IBPortToConnectTo, 1);
        
        // Set the client ID.  Here we just use 1.
        client.RequestIds(1);
    }
    catch (Exception ex)
    {
        logger.Info("IB Connecting Exception: " + ex.Message);
    }
}
logger.Info("TWS Client is now Connected.");

接下来,我们注册一些我们感兴趣接收事件的交易代码符号。

foreach (var symbol in symbols)
{
    Contract item = new Contract(symbol.Symbol.Trim(), symbol.Market, symbol.securityType, "USD");

    // Lets now register for event updates... 
    client.RequestMarketData(symbol.SymbolID, item, null, false, false);

    // More available options not used in Market Recorder... 
    // client.RequestMarketDepth(symbol.SymbolID, item, 10);
    // client.RequestContractDetails(symbol.SymbolID, item);
    // client.RequestFundamentalData(symbol.SymbolID, item, "Estimates");
    // client.RequestFundamentalData(symbol.SymbolID, item, "Financial Statements");
    // client.RequestFundamentalData(symbol.SymbolID, item, "Summary");
}

现在应用程序等待更新。每当 TWS 发生事件时,例如新的最佳买入价,API 就会在 MarketRecorder 中触发一个事件方法。然后,该方法将数据捕获到 BinaryWriter (capturingWriter) 中。

void client_TickPrice(object sender, TickPriceEventArgs e)
{
 int secondFraction = (int)(DateTime.Now.Ticks % TimeSpan.TicksPerSecond / (TimeSpan.TicksPerSecond/256));
 lock (capturingWriterLock)
 {
   capturingWriter.Write((byte)secondFraction ); // record sub-second time (1/256th resolution)
   capturingWriter.Write((byte)e.TickType);      // kind of data (like bid, ask, last...)
   capturingWriter.Write((byte)e.TickerId);      // The Symbol ID (like AMD, INTC, INDU..)
   capturingWriter.Write((float)e.Price);        // The data - in this case price.
 }
     ...
}

将数据写入数据库

每秒,capturingWriter 会被上传到 StreamMoments 数据库表。

// Round down to nearest second
DateTime time = new DateTime((DateTime.Now.Ticks / TimeSpan.TicksPerSecond) * TimeSpan.TicksPerSecond);

// Set capturing stream to processing stream 
BinaryWriter ProcWriter = capturingWriter;
lock (capturingWriterLock) { capturingWriter = manufacturedWriterForFuture; }

// Finish up processing stream
ProcWriter.BaseStream.Position = 0;
ProcWriter.Write(time.Ticks);
ProcWriter.Flush(); 

// Notify any WCF Clients of a new TstsRow 
OpenAndSendWithWCF(ProcWriter);

// Save to the database 
byte[] data = ((MemoryStream)ProcWriter.BaseStream).ToArray();
StreamMoment briefToSave = new StreamMoment() { SnapshotTime = time, Data = data };
try
{
    dc.StreamMoments.InsertOnSubmit(briefToSave);
    dc.SubmitChanges();
}
catch (Exception ex)
{
    logger.Error("Exception with SubmitChanges(): {0}", ex.Message);
}

// Display pushed results
logger.Debug("Snapshot complete ({0} bytes at {1})", data.Length, time.ToString("HH:mm:ss.fff"));

// Clean-up
ProcWriter.BaseStream.Dispose();
ProcWriter.Dispose();

// Get next BinaryWriter ready ahead of time.
manufacturedWriterForFuture = new BinaryWriter(new MemoryStream(MemoryStreamReserveSpace));

数据库中 StreamMomentFormat 的输出格式

每秒 MarketRecorder 在 StreamMoments 表中开始一个新的行。每秒完成后,新的一行会被添加到 StreamMoments 表中。每一行都有一个 SnapshotTime(DateTime2)Data(image) 列。

SnapshotTime:这是数据收集结束时的时间。它被四舍五入到秒的末尾。要获取快照时间的开始,只需减去1秒。

Data:数据字段保存着流式数据。其大小取决于有多少数据通过网络传输以及市场有多活跃。该字段的前8个字节是流式事件结束时的 DateTimeNow.Ticks。接下来的7个字节是第一个事件,接下来的7个字节是第二个事件,依此类推。

每个 tick 以以下格式保存:(总共7个字节)

格式 描述 Range
字节型 当前亚秒(分辨率:1/256秒)(约4毫秒分辨率) 0 - 255
字节型 Ticker Type - 保存 tick 数据类型:最高价、最新价、成交量、买入价、卖出价等。 0 - 57
字节型 Ticker ID - 符号表中的符号 ID(例如:AMD、INTC、^DJI) 0 - 255
Float Value - 这是实际数据。(例如:成交量、价格等) (float)

要获取每个事件的亚秒时间,可以使用如下方法:

EventDateTime = SnapshotTime - 1 + (subSecond/256);

下载的符号位于 Symbols 表中。它预先加载了示例。

为了节省空间,子秒时间和 TickerID 使用了字节。但这也会带来一些限制。亚秒精度限制为1/256秒,即3.9毫秒。然而,由于网络连接抖动,这种精度可能足够了。TickerID 也是一个字节,所以当前系统最多只能处理256个交易代码 - 但一次下载这么多已经很多了。另外,IB 对标准账户有100个交易代码的限制,所以256个已经远远超过了。

特点

NLog 日志记录 - 这是一个流行的日志库。调试实时数据很困难,因为市场数据不会停下来等你调试代码。拥有一个像 NLog 这样的良好日志记录器,可以让我们回溯并找出问题。

高性能 - 主要目标之一是最小化延迟。在处理交易算法时,低延迟通常很重要。在可能的情况下,数据会尽快提供给数据库或 WCF 传输。为了加快速度,MarketRecorder 和 BriefMaker 之间通过 Window Communication Foundation (WCF) 建立了直接链接。这绕过了数据库的往返通信。

内置自动重连 - 有时会发生各种问题,例如网络中断或 TWS 出现问题,需要重新初始化 API。MarketRecorder 检查以确保数据正在流入,如果没有数据,它将重新启动连接。注意:有时当流入数据量很小时,例如盘后交易,它会触发自动重连功能。

命令行关机 - 可以通过命令行使用“MarketMaker.exe /q”关闭程序。对我来说,我过去每天都安排一个计划任务来关闭 MarketMaker。现在我使用 TWSStart (http://twsstart.free.fr)。

符号表

下面是 Symbols 表的样子。 'Name' 右侧的列在 MarketRecorder 中未使用。

  • SymbolID:包含每个符号的 ID。(必须是 0-255)
  • Type:MarketRecorder 在注册符号时使用。
  • Market:MarketRecorder 在注册符号时使用。
  • Name:要检索数据的符号。
  • FullNameTradeEnabledTradeQuanityValueAvgVolMarketCapTickTypesToCapture 字段在 MarketRecorder 中未使用。

直接 WCF 连接

MarketRecorder 具有内置功能,可以直接连接到其他应用程序,例如 BriefMaker。该连接使用 WCF (Windows Communication Foundation) 网络连接,用于最小化延迟。当每个1秒快照完成时,它可以直接发送到另一个应用程序,绕过数据库。数据库记录仍然会被写入,以供历史记录存档。

用户界面及其功能

Delete All 按钮 - 此按钮将删除 StreamMoments 表中的所有内容。

Launch Web Demo 按钮 – 此按钮会打开网站,用户可以在其中下载并连接到 IB 的演示版本。这对于测试很有用。

未来愿望清单

MarketRecorder 使用 krs.Ats.IBNet 库。这是一个很棒的库,但自2012年以来未更新。未来,我想将其迁移到 Interactive Brokers C# API

我希望添加的第二个改进,但我不确定是否能完成,是捕获额外的事件,如 Level II 数据和新闻事件。对于新闻,能够以数值形式捕获其含义或情绪会很棒。

设置说明

  1. 下载并挂载入门用的 MS-SQL 数据库。开始时,将其命名为“Focus”。数据库有几个 SQL 表,但只使用两个:SymbolsStreamMoments
  2. 更新 .config 文件
    • 如果数据库名称不是 focus 那么您需要更新 .config 文件中的连接字符串。
    • 接下来,您会看到 IBHostToConnectTo IBPortToConnectTo... 仅在需要时进行调整。端口号可以在 TWS -> File -> Global Configuration -> API -> Settings 中找到。
    • 如果需要,请配置 ShutDownTime。这是程序自动关闭的时间。我属于 PST 时区,所以它默认设置为 14:15(下午2:15)。
  3. 要使用 TWS 中的 API,您首先需要启用它。在 TWS 中,转到 File -> Global Configuration -> API -> Settings 并勾选“Enable ActiveX and socket clients”复选框。您可能还需要将 127.0.0.1 添加到受信任的 IP 地址列表中。同时注意端口号。此端口号应与上面提到的 .config 文件中的值匹配。
  4. 启动 MarketRecorder。它将自动连接到 TWS 和数据库。如果您遇到错误,通常会在最初几秒内出现。如果它没有连接,请查看日志详细信息或在 Visual Studio 中进行调试。
  5. 在完成所有设置并确认测试数据正常工作后,您可以随意更新符号列表。如果您打算使用 BriefMaker,我建议总共保留32个股票项目(0-31),并避免编辑市场指数(32-38)。

要求

历史

  • 2011-11-17 最初创建
  • 2012-08-31 添加了 FixedStepDispatcherTimer
  • 2013-02-11 添加了单实例限制、命令行退出选项,开始支持 WCF
  • 2014-01-28 完成 WCF,提高了稳定性
  • 2016-03-12 清理并在线发布代码
© . All rights reserved.