实时分析大数据 #2
StreamInsight:
这是《实时分析大数据》系列文章的第 2 部分,请参阅 #1,#2
引言
在上篇文章中,我们回顾了复杂事件处理 (CEP) 系统在现实世界中的应用,特别是微软的 StreamInsight 在体育广播中的应用。这项技术的潜在应用范围非常广泛,从过滤和处理应用程序日志到算法交易。在本文中,我将通过一个简单的演示应用程序,阐述如何自己使用这项技术。
StreamInsight 的最新版本与 Linq 进行了深度集成,就像 LinqToSql 或 LinqToObjects 一样,它拥有自己的提供程序,允许我们为 CepStream 类型表达时间(即随时间推移)查询。
桥接方法允许我们在接口之间进行转换
结合 LinqRx,我们需要理解的关键接口是:
IQStreamable - 定义了一个远程的时间流事件
IQueryable - 支持将查询函数部署到引擎
IObservable - 允许 Observer 被通知底层流的变化
如果您只对如何使用这些方法的示例感兴趣,请跳至下方的 Processor 部分。
背景
为了支持本文及后续文章,我创建了 2 个支持性基础设施,以帮助我们使用和理解 StreamInsight。
要实时分析数据,我们实际上需要一个实时数据源!我曾希望能够相对容易地找到一个可以订阅的延迟股票市场数据源。不幸的是,任何免费的数据源更新频率都很低,例如每分钟一次,但我们需要更快的数据。因此,我改用叠加原理编写了一个 SignalGenerator,用于生成带有随机泛音的正弦波数据。
其次,我们可能还需要一个可视化信号并比较它以及我们对其进行的任何分析的工具。在 WPF 中,我创建了一个自动缩放和调整大小的动画图表控件(截图如下),它会显示时间序列数据,直到达到限制,然后删除超出限制的数据。因此,您会观察到数据持续生成时波形在移动。
使用代码
从 此链接 下载并安装 StreamInsight 2.1
使用所有默认设置,除了下面的屏幕。要么将安装的实例命名为“default”,要么准备在首次运行时更改附加代码。
将当前用户添加到 StreamInsight 组,因为我们将在以后的文章中使用它。
下载附加的 zip 文件,解压缩代码,然后加载到 Visual Studio 2012 Express for Desktop 或更高版本中。
构建并运行后,选择“Go”按钮,如果一切正常,您应该会看到数据正在生成。
随着时间的推移,您应该会看到一个行进的正弦波…
StreamInsight 依赖于 Reactive Extensions (LinqRX) 版本 1.0.10621,我已经对其进行了引用并静态提供。如果由于任何原因这些引用丢失,您可以从 codeplex 下载,或者给我发邮件,我会解决它。
同样,StreamInsight API 也是静态引用的,但您仍然需要安装 StreamInisght 实例才能使代码正常工作!
管道模式(又称 Pipes and Filters)
无论您在处理数据和流的移动方面做什么,几乎都可以将其表示为一种移动的提取-转换-加载过程,其中每个事件或消息都被源化、处理并传递到别处。这通常被称为管道和过滤,或简称为管道。
MSDN 对管道的定义在这里很合适,因为“任务”可以是任何一种归约、投影或丰富。
出于本文的目的,我将这三个步骤称为 Source、Processor 和 Sink。
1. Source:信号生成器
生成重复波相对简单,但要生成一些略带随机性的内容,我们必须生成一些随机数。我没有使用 Random 类默认的均匀分布,而是使用了一个来自我在代码提示 Emulate-Human-Mouse-Input-with-Bezier-Curves-and-Gaussian-Distributions 中详细讨论的类的正态分布随机数。
public class SignalGenerator
{
NormalDistribution normalDistribution = new NormalDistribution();
public SignalGenerator(double r, double a, int harmonics)
{
this.r = r;
this.a = a;
this.harmonics = harmonics;
waveLength = 2 * Math.PI * r; //circumference of a circle;
tickIncrementRatio = 1.0/360;
}
private long tickCount;
private double r;
private double a;
private double tickIncrementRatio;
private double waveLength;
private int harmonics;
public double Next()
{
double angle = tickCount * tickIncrementRatio * waveLength;
double drift = a * Math.Sin(angle);
double shock = normalDistribution.NextGaussian() ;
//drift + random shock
double result = (drift != 0) ?
drift + (shock * 0.01)
: drift;
tickCount++;
return result;
}
}
拥有 RepeatingTask 和 SignalGenerator 的 PointAdapter 是 StreamInsight 引擎的接口。因此,它由 StreamInsight 创建,并且其生命周期由 StreamInsight 控制,这意味着 RepeatingTask(调用 SignalGenerator 的任务)将在我们初始化应用程序时通过 GeneratorFactory 创建。
在使用此技术将数据推送到系统时,牢记这些要点很重要,因为信号生成是在 StreamInsight 内部而不是从外部进行的。下面的代码是 MSDN 的一个样板方法,它检查 StreamInsight 的当前状态,然后决定是否将新数据插入源流。
public void ProduceEvent(GeneratedEvent data)
{
// Check for the stopping state and stop if necessary.
if (AdapterState.Stopping == AdapterState)
{
Stopped();
return;
}
// Since this method is called from a thread that is independent of
// Start() and Resume(), we need to make sure that the adapter is
// actually running.
if (AdapterState.Running != AdapterState)
{
// Throw away the current event.
return;
}
PointEvent<GeneratedEvent> currEvent = CreateInsertEvent();
if (null == currEvent)
{
// Throw away the current event.
return;
}
currEvent.StartTime = DateTime.Now;
currEvent.Payload = data;
if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
{
ReleaseEvent(ref currEvent);
Ready();
return;
}
}
如您所见,只有当 StreamInsight 应用程序正在运行且未关闭时,才会将新事件推送到引擎。
2. Processor:过滤器、分区和投影
Processor 的职责是初始化和配置嵌入式 StreamInsight 引擎的输入、查询和输出。
defaultServer = Server.Create("default");
demoApp = defaultServer.CreateApplication("demo");
文档或其他代码示例中不那么明显的一点是,尽管它是“嵌入式”的,但实际上您所做的只是创建远程对象,并将序列化数据来回发送到正在运行的服务。
例如,在下面的代码中,我们使用一个工厂定义了一个 GeneratedEvents 的源流,为该工厂发送一个新的配置对象实例。请记住,由于序列化,此对象是按值复制的,请勿尝试传递需要引用的内容,例如 CancellationToken!
var mySource = demoApp.DefineStreamable<GeneratedEvent>(
typeof(GeneratorFactory), new GeneratorConfig()
{ CtiFrequency =1 },
EventShape.Point, null);
如果我们的源包含来自多个设备的数据,我们可以应用于流的最简单的 Linq 查询就是简单的过滤。例如:
var mySignal = mySource.Where(x => x.DeviceId=="Signal1");
我们可以这样可视化这种过滤:
在将信号与底层数据流区分开后,我们可以定义进一步的查询来监视流并对检测到的模式进行操作。因此,我们使用滚动窗口来分区信号。
var myTumblingWindow =mySignal.TumblingWindow(new TimeSpan(0, 0, 1));
如果我们有一个每秒产生一次事件的信号,并且想定义一个 2 秒的滚动窗口(即,一个长度为 2 秒且每 2 秒重复一次的窗口),我们可以这样可视化它:
请注意,窗口在时间段完成之前不存在。因此,在 T=0 和 T=1 时我们没有数据,只有在 T=2,4,6 等时,结果才会被推送到管道中的下一阶段。
var average =myTumblingWindow
.Select(x => x.Avg(y => y.Value));
var averageObservable = average.ToObservable();
在演示代码中,我们创建了一个 1 秒的滚动窗口,然后计算这些窗口的平均值,并将结果投影到一个新流中。
我们使用桥接方法使平均流成为 Observable,并连接一个 Observer 函数,该函数在每次向底层集合插入新项时被调用。
average.Subscribe(new Action<double>(updateSource));
同样,我们对源流也这样做,以便两者都可以被管道传输到 UI,我们将在下一节中讨论。
最后,让我们生成滚动窗口中的最大值和最小值,以便图表控件可以推断出一些历史限制。
var max = tumblingWindow.Select(x => x.Max(y => y.Value)).ToObservable();
var min = tumblingWindow.Select(x => x.Min(y => y.Value)).ToObservable();
3. Sink:图表控件
图表控件本质上只是一个画布,由调度器定时器定期更新。它包含一个 ConcurrentQueue(这可能有点过头),类的用户可以访问该队列来推送新的数据点,然后将这些数据点绘制到屏幕上。
理解下面这个函数是如何实现的并不关键,但如果您不想下载源代码,这里就是它:
private void RenderDataSeries()
{
if (null != DataSeries && DataSeries.Count > 0)
{
long start, end;
Coordinate[] dataSeriesCopy;
lock (DataSeries)
{
start = DataSeries.Min(x => x.X).Ticks;
end = DataSeries.Max(x => x.X).Ticks;
while (TimeSpan.FromTicks(end - start)
.TotalMilliseconds > 100000 //plot only the last 100s of data
&& DataSeries.Count > 0)
{
Coordinate discarded;
DataSeries.TryDequeue(out discarded);
start = DataSeries.Min(x => x.X).Ticks;
}
dataSeriesCopy = DataSeries.ToArray();
//Release the lock and allow any producers to continue
}
您可以看到,我尽量以最小的时间阻塞对入站队列的访问,并删除我们不想绘制到 UI 的数据。
我使用事件元数据来打印图例…
int seriescount = 0;
legend.Children.Clear();
RenderLegend(dataSeriesCopy.GroupBy(x => x.SeriesName)
.Select(y => y.Key).ToArray());
根据画布的当前尺寸,计算每个坐标的缩放因子。
foreach (var seriesGroup in dataSeriesCopy.GroupBy(x => x.SeriesName))
{
Polyline p = new Polyline();
p.Stroke = paintables[seriesGroup.Key];
p.StrokeThickness = 1;
//Don't bother painting the first event
if (seriesGroup.Count() <= 1) continue;
long last = 0;
if (seriesGroup.Key == "Min" | seriesGroup.Key == "Max")
AddLimit(seriesGroup.ToArray(), seriesGroup.Key);
else
{
double first = 0.0;
foreach (Coordinate coord in seriesGroup)
{
double xScale = Convert.ToDouble(coord.X.Ticks - start)
/ Convert.ToDouble(end - start);
double yScale = coord.Y * (canvas.ActualHeight / 2);
double x = X0 + (xScale * canvas.ActualWidth);
double y = (canvas.ActualHeight / 2) - yScale;
p.Points.Add(new Point(x, y));
最后,我为 X 轴的 10 秒序数动画化缩放。
double gapInMillisecs = TimeSpan
.FromTicks(coord.X.Ticks - last).TotalMilliseconds;
if (seriesGroup.Key == "Signal"
&& gapInMillisecs >= 10000)
{
if (initial == 0.0)
{
AddXAxisScale(initial, coord, xScale);
initial = gapInMillisecs;
}
if (first == 0.0)
{
first = TimeSpan.FromTicks(coord.X.Ticks)
.TotalMilliseconds - initial;
AddXAxisScale(first, coord, xScale);
}
else
{
double current = TimeSpan.FromTicks(coord.X.Ticks)
.TotalMilliseconds - initial;
AddXAxisScale(current, coord, xScale);
}
last = coord.X.Ticks;
}
}
}
总而言之,xyGraph 控件比我预期的要麻烦一些,但由于我很难找到免费的、能够轻松进行动画和缩放的控件,所以它工作得相当好。如果您愿意,可以随意重用和修改它。
显然,这里的 Sink 是视觉化的,但希望您能看到,提供您自己的 Observer 和 Action 是相对简单的,它们可以执行任何操作,从将数据记录到数据库、执行买入订单,甚至告诉司机他在哪里开得慢!
结论
有了创建、处理和可视化信号及其派生数据的基础设施,我们就可以开始深入研究 StreamInsight 的更复杂功能了。
正如我希望您能看到的,使用 LinqRx 定义简单查询是相对简单的。实际上,我们所做的与我之前一篇文章 中展示的技术 所能实现的相差无几。然而,正如我们稍后将看到的,事情会很快变得相当复杂,因此我们需要利用完整的 StreamInsight 工具集来帮助我们。
使用 StreamInsight,您最终会像这些人一样。真的!
接下来,我们将开始研究流的连接和其他更复杂的操作,以及调试、管理界面、重放测试用例、高可用性等等…
历史
1.1 创建
1.2 链接路径
1.3 添加了历史限制
1.4 增加了关于 Linq 和 Rx 接口的更多细节
1.5 重构了图表控件并增加了更多功能,以及更详细的解释
增加了更多关于用法模式、图片和图表的细节
1.6 添加了更多图表来解释过滤器、分区和投影