使用 Azure 事件中心、Azure Functions 和 Cosmos DB 大规模流式传输 - 第 3 部分:分析和可视化数据





5.00/5 (2投票s)
使用 Power BI 在 Cosmos DB 中分析和可视化数据
在本系列的前面部分,我们探讨了如何将数据流式传输到事件中心,以及如何在 Cosmos DB 中进行数据的使用、处理和存储。在本系列的最后一篇文章中,我们将使用 Power BI 来分析和可视化 Cosmos DB 中的这些数据。
Power BI 是 Azure 的数据分析工具,可实现公司范围内的自助式大规模分析。通过可视化数据和使用内置的 AI 功能,我们可以找到有意义的见解。我们还可以将这些见解导出并共享到 Excel 或其他数据工具。
我们将使用 Power BI 来分析我们的 Cosmos DB 数据,并查看比特币对美元的价值。前往 GitHub 查找将数据流式传输到事件中心以及在 Cosmos DB 中处理和存储数据所需的全部代码。
在深入了解 Power BI 之前,让我们测试一下将事件中心数据流式传输到 Cosmos DB,以确保我们的函数按预期工作并生成要分析的样本数据。
通过 Azure Functions 将事件中心数据流式传输到 Cosmos DB
在第一篇文章中,我们构建了一个 Java 应用程序来消费实时区块链交易所 feed,并将消息批量发送到事件中心。通过流式传输到事件中心,我们可以轻松扩展以满足消息吞吐量增加的需求。我们还可以附加多个下游使用者来处理消息。
我们将运行此应用程序以开始将数据流入我们的事件中心。请按照第一篇文章中的步骤来运行您的 Java 应用程序。您的应用程序应该会在屏幕上输出消息,如下面的屏幕截图所示。Azure 事件中心门户中的监控指标应该会显示事件中心正在接收消息,如下面的屏幕截图所示。
现在我们的事件中心正在接收消息。我们可以继续处理我们在第二篇文章中创建的 Azure 函数。同样,通过使用 Azure Functions 来处理来自事件中心的消息,我们可以让 Azure 来处理我们应用程序逻辑的基础设施。我们还可以将函数设置为自动缩放以满足消息吞吐量的峰值,这对于满足近乎实时系统的消息处理服务级别协议 (SLA) 非常有帮助。
您可以使用 VS Code 在本地运行 Azure Function,方法是按照第二篇文章中的步骤进行操作,或者将其部署到 Azure。要从 VS Code 进行部署,请打开命令面板并选择Azure Functions:Deploy to Function App,如下面的屏幕截图所示。
该应用程序解析我们的 JSON 格式的区块链交易所消息,为存储做准备。消息包含一个属性,该属性包含一个值数组。
我们解析数组并将值分解为单独的属性,以便在 Cosmos DB 中独立存储和查询它们。这种格式使我们能够分析数据。
当您在本地运行应用程序时,您应该会看到触发的函数,该函数将数据发送到 Cosmos DB。您还可以使用事件中心门户中的监控图来检查有多少出站消息,如下面的屏幕截图所示。事件中心会跟踪入站和出站消息,这可以帮助您了解使用者落后了多少。如果出站消息的数量高于入站消息的数量,那么您的使用者可能正在迎头赶上。
在上图的屏幕截图中,入站和出站消息速率相同,因此我们的使用者已正确缩放以满足入站消息的需求。
现在,我们可以通过从门户中的数据资源管理器运行 SQL 查询来检查数据是否已进入 Cosmos DB。最初运行应用程序后,计数应显示已到达的前几百条消息,如下面的屏幕截图所示。
如果您让应用程序运行一段时间后重新运行计数,您应该会看到记录数量增加,如下面的屏幕截图所示。
让应用程序运行一段时间。我们将在下一步创建 Power BI 报表,因此我们希望确保我们有足够大的数据集进行分析。
使用 Power BI 分析 Cosmos DB 中的数据
首先,下载并安装 Power BI 桌面应用程序,以便在 Power BI 中使用 Cosmos DB 数据源。然后,在 Power BI 中创建一个新报表。在顶部,选择获取数据,然后点击更多。此操作将打开一个窗口,如下面的屏幕截图所示。它显示了 Power BI 中的所有可能的数据连接器。
由于我们的数据在这里,因此我们使用 Azure Cosmos DB。点击连接。Power BI 将提示您输入一些连接详细信息。您可以在 Azure 门户中 Cosmos DB 的密钥部分找到这些详细信息。提供读写和只读密钥,我们可以使用只读密钥,因为 Power BI 不会修改任何数据。您需要 URI 和主密钥。也要输入数据库和集合名称。
此操作将打开数据源查询窗口。默认情况下,它仅显示一个代表整个文档的列。我们将把文档拆分成单独的属性,以便在我们的报表中它们。
点击展开按钮,如下图所示。选择所有列,然后点击确定。
视图应刷新,我们现在应该在各自的列中看到我们的属性。接下来,点击列名左侧的箭头图标,将价格列的数据类型设置为$ 固定小数位数数字。我们这样做是为了让 Power BI 将它们视为数字,并知道哪些计算可用。
此外,将时间戳列转换为典型的日期格式。目前,它是 Unix 时间戳。点击自定义列以创建自定义列以调整日期,如下面的屏幕截图所示。此新列使我们能够编写公式将 Unix 时间戳转换为日期。
这是转换 Unix 时间戳的公式:
= #datetime(1970,1,1,0,0,0) + #duration(0,0,0,Int64.From(Text.Start([Document.timestamp],Text.Length([Document.timestamp])-3)))
公式从 1970 年 1 月 1 日(Unix 时间戳纪元)开始。它删除了 Unix 时间戳的最后三个精度字符,因为它们不是必需的。然后,它将 Unix 时间戳转换为整数。最后,它将这个整数版本的时间戳作为秒级的持续时间添加到纪元日期。
当列准备就绪时,我们需要通过与价格相同的方法告诉 Power BI 此新列是日期。这有助于 Power BI 理解哪些操作是可能的,例如在报表中将字段用作时间序列。
点击关闭并应用以返回报表生成器窗口。
在右侧,您现在应该看到字段列表。下面的屏幕截图显示了我们在报表中使用的字段的精简版本。我们还通过右键单击然后输入新名称来重命名它们。
只需勾选您想要包含的字段,表格就会出现在左侧。
默认情况下,Power BI 会对数据的显示方式做出一些假设。它通常显示按年、月和日拆分的日期,并对任何数字字段求和。
我们不希望这样做。要修改它,请转到值部分,如下图所示。选择时间戳列旁边的箭头,并将其从日期层次结构更改为时间戳。Power BI 仍会聚合,但仅到分钟级别,这没问题。
由于它仍在聚合,因此我们不希望对我们的收盘价和开盘价字段求和。我们希望看到它们在该特定分钟内的最高值。为此,请在值部分选择字段旁边的箭头,然后将值从求和更改为最大值。
最后,在报表生成器中选择表格。然后,在可视化部分中选择折线图选项,如下图所示。
Power BI 然后将表格转换为折线图,显示 BTC 相对于 USD 的开盘价和收盘价随时间的变化。
后续步骤
我们已经将本系列前两篇文章的所有部分整合到本文的 Power BI 仪表板中。仪表板揭示了通过事件中心流入 Cosmos DB 的数据方面的见解。要进一步扩展,您可以创建 Power BI 中的流式数据集,以便在 Cosmos DB 进入新数据时实时更新报表。
本系列涵盖的技术为任何实时数据系统提供了骨干。这些工具和技术构建了一个端到端的无服务器应用程序,您可以在 Azure 中对其进行扩展,以满足数据吞吐量增长的需求。该系统还具有容错能力,只需单击按钮即可提供多种冗余选项。
这些解决方案使工程师能够专注于构建应用程序,而不是管理基础设施。它们提供了一个更简洁的工程流程,同时仍然为未来的扩展做准备。
既然您已经了解了使用 Azure 事件中心、Java 中的 Azure Functions 和 CosmosDB 创建端到端流式传输解决方案是多么直接,请注册您的免费 Azure 帐户。探索所有选项,构建您自己独特的实时数据分析应用程序。
要了解如何使用 Azure Cosmos DB 更改订阅以了解用户模式、执行实时数据分析和可视化,请参阅使用 Azure Cosmos DB 更改订阅可视化实时数据分析。