65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Azure 事件中心、流分析和 Azure SQL 进行大规模流式处理 第 3 部分:分析和可视化数据

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2022 年 2 月 24 日

CPOL

6分钟阅读

viewsIcon

4647

如何转换数据以生成统计信息并使用 Power BI 进行可视化

上一篇文章《使用事件中心和流分析将数据流式传输到 Azure》设置了一个简单的 Python 应用程序,使用事件中心将天气数据发送到 Azure。然后,他们将事件中心连接到流分析作业作为输入,并将数据输出到 Azure SQL 数据库。

本文将演示如何转换数据以生成统计信息并使用 Power BI 可视化数据。

对流式数据进行分组

首先,从流分析作业中获取一些统计信息需要对数据进行分组。事件中心可能在不停止的情况下流式传输数据,但分析需要一组特定的最小值、最大值、计数、总和以及平均值。

Azure 流分析提供了一些特定的窗口函数来实现这一点。窗口函数支持在特定时间范围内对数据进行基于组的操作。

探索窗口函数

在所有窗口函数中,最简单的是翻转窗口。该窗口每 x 个时间单位对数据进行分组——例如,每五秒、每三分钟或每 24 小时。本文使用翻转窗口。

跳跃窗口几乎做同样的事情,只是窗口可以重叠。例如,它可以每十秒聚合一次,同时与前一个窗口重叠五秒。

滑动窗口不太直观。当数据可用时,它会触发,回顾指定的时间段,并聚合该时间段内的所有数据。此窗口可能与先前的数据重叠。

与其他窗口不同,会话窗口没有固定的长度。它的功能是根据前一个窗口的特定持续时间将所有到达的数据分组。例如,它可以将前一个组之后十秒内流式传输的所有数据分组。因此,此窗口函数将数据分组十秒,直到没有数据为止。

最后一个是快照窗口,它只将具有相同时间戳的所有数据分组。

在流分析中进行分组

演示将有助于阐明此窗口操作。此示例计算特定天气事件在十秒内的发生次数(请记住,Python 应用程序每三秒发送两条消息)。通常,这些值会更有意义,可以检索一天、一周等的所有数据。但是,对于本示例来说,十秒就足够了。此持续时间应提供大约三个小时的数据。

首先,为作业创建一个新输出。本教程需要一个新的 SQL 表来接收结果。

CREATE TABLE [dbo].[WeatherCount](
   [id] [int] IDENTITY(1,1) NOT NULL,
   [min_date] datetime2 NULL,
   [max_date] datetime2 NULL,
   [description] nvarchar(256) NULL,
   [count] int NULL
) ON [PRIMARY]

在上一篇文章的同一个流分析作业中创建新输出。必须停止作业才能对其进行更改。将新输出命名为例如 `sql-weather-count-output`。

此输出的存在使得可以更改查询。查询可以包含多个 `SELECT` 语句。因此,只需将新查询添加到 Azure 菜单的查询选项中,按描述分组并在十秒内计算出现次数。

SELECT MIN(CAST([date] AS DATETIME)) AS min_date
   ,MAX(CAST([date] AS DATETIME)) AS max_date
   ,[description]
   ,COUNT(*) AS count
INTO [sql-weather-count-output]
FROM [evhub-weather-input]
GROUP BY [description], TUMBLINGWINDOW(SECOND, 10)

翻转窗口位于 group by 子句中。此代码现在也对日期进行了转换,但 `MIN` 和 `MAX` 不能用于 varchar。当所有内容都正确后,启动分析作业并运行 Python 应用程序,然后两个 SQL 表都应接收到数据。

将数据添加到 Power BI

接下来,将此数据添加到 Power BI。要开始使用 Power BI,请获取一个帐户和 Power BI Desktop 应用程序

打开 Power BI Desktop 应用程序后,首先选择或添加数据源。在数据源下,找到 Azure SQL 数据库。筛选 Azure 特定的数据源应将其放在顶部。

为服务器选项输入现有服务器的名称,例如 example-weather-server.database.windows.net。

然后,选择 **导入** 或 **DirectQuery** 作为数据连接模式。导入选项会导入选定的表和数据,因此用户必须刷新报表才能查看数据更改。**DirectQuery** 直接连接到数据源,因此选择此选项可立即查看导入的数据。

选择数据源后,选择要在报表中使用的数据表。选择 `WeatherCount` 表,然后单击 **加载**。之后,会出现一个空白页面,用户可以在其中添加各种类型的可视化效果,包括条形图、折线图和饼图。选择饼图显示在画布上。

接下来,从 `WeatherCount` 表中单击 **description** 和 **count** 字段。报表现在应该看起来像此屏幕截图。

现在,转到 **文件**,然后单击 **发布**,将此基本报表导出到 Power BI。将其发布到 **我的工作区**,然后在 app.powerbi.com 上找到它。

不幸的是,Power BI 通知数据源缺少凭据,无法访问。通过转到左侧菜单中的 **数据集** 并选择数据源来修复此问题。Power BI 会要求提供凭据,然后存储它们。

之后,拥有专业帐户的用户可以与组织中的其他人共享报表。

添加筛选器

一些 Power BI 用户可能会想,为什么要在流分析中聚合数据,而 Power BI 可以聚合和筛选数据。首先,即时聚合大量数据可能既昂贵又耗时。其次,可能存在多个数据使用者。每次都进行聚合可能会很麻烦且容易出错。

要向数据添加筛选器,请在桌面应用程序中打开报表。在饼图和数据源之间找到 **Filters**。

现在,将 `min_date` 字段拖到 **Filters on this page**。然后,选择 **Basic filtering**、**Advanced filtering**、**Relative date** 和 **Relative time**。在实际场景中,用户可以选择类似“相对日期”和“[在过去] [1] [天]”的内容,以便始终获得前一天的天气报告。然后,他们会锁定筛选器,防止其他用户更改。他们可以将报表命名为“昨日天气数据”。

不幸的是,该筛选器不会显示任何内容,因为我们只读取了几个小时的数据。相反,选择 **Basic filtering** 并选择最后几个值进行筛选。

上面的屏幕截图显示了筛选后的视图。它仅显示过去两个小时的信息。数据和天气类型比以前少,因为在这些小时内,气象站没有测量所有天气类型。

总结

这个三部分的文章系列演示了如何使用事件中心将数据流式传输到 Azure。然后,它使用 Azure 流分析处理这些数据,并将数据存储在 Azure SQL 数据库中。最后,它使用 Power BI 可视化数据。

这些示例只是 Azure 流分析可能实现的功能的冰山一角。例如,我们可以将数据直接流式传输到 Power BI 以创建实时 Power BI 仪表板(有一些限制)。其他输出包括 Cosmos DB、Azure Data Lake、Azure Blob 存储和 Azure Functions。实际上,其中大多数都可以成为 Power BI 数据源,从而产生有用的业务见解。

本文应帮助读者开始使用 Azure 的服务流式传输 IoT、联网汽车和其他联网设备数据,然后处理这些数据以大规模生成有用的分析。

要了解有关如何连接到数据源、可视化和发现重要信息以及与团队共享这些信息的更多信息,请查看我们的Reactor YouTube 关于使用 Power BI 进行数据分析的介绍

© . All rights reserved.