65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Azure 事件中心、流分析和 Azure SQL 实现大规模流式处理 - 第一部分:将流式数据引入 Azure

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2022 年 2 月 22 日

CPOL

7分钟阅读

viewsIcon

6600

如何编写 Python 应用程序以通过 Azure 事件中心摄取天气数据到 Azure

Azure 可帮助你引入、处理和存储流式数据。随着公司数据需求的扩展,它会自动进行扩展。此外,Azure 的无服务器选项意味着你无需花费时间维护服务器,并且只需为你使用的内容付费。

本文将演示如何编写一个 Python 应用程序,使用 Azure 事件中心将天气数据引入 Azure。本三部分系列文章的后续文章将 使用 Azure 流分析处理数据,将其存储在 Azure SQL 数据库中,然后 使用 Power BI 分析和可视化数据

完成本教程需要 Azure 订阅。免费 注册并获得 200 美元的积分,用于使用常用服务。

考虑使用 Azure 事件中心

将流式数据引入 Azure 需要一个事件中心来接收流式数据。事件中心使用高级消息队列协议 (AMQP),这是一种开放标准,用于在应用程序之间通信数据。由于它是一个开放标准,许多应用程序和设备都可以直接向其进行流式传输。事件中心还支持 HTTP 和 Kafka。

事件中心随后可以将其消息广播给其他已连接的应用程序,以进行进一步的处理和存储。每个事件中心每秒可处理数万条消息。

物联网 (IoT) 设备、联网车辆或其他应用程序通常可以直接将事件流式传输到事件中心,而无需现实世界中的应用程序。但是,由于本教程没有现成的 IoT 设备,因此我们将使用 Python。

创建事件中心工作区

创建事件中心首先需要一个工作区。工作区是事件中心的容器,用于管理访问、缩放和架构注册表。

进入 Azure 门户,搜索“事件中心”,然后创建一个新的。然后,选择一个订阅和资源组。创建新的资源组可以轻松地在之后通过简单删除资源组来删除所有内容。因此,请创建一个名为 data-streaming-example 的资源组。

接下来,选择一个事件中心名称,例如 data-streaming-evhub。选择一个靠近的区域,例如西欧。

选择“基本”定价层,这对于本示例来说应该足够了。此层提供一个使用者组和 100 个托管连接,这意味着一个额外的应用程序可以消耗事件(默认有一个)并且 100 个应用程序可以同时写入或接收消息。值得注意的是,消息保留期为一天。

标准层提供 20 个使用者组和 1,000 个托管连接,并将消息存储长达七天。此层还允许捕获事件并将它们写入存储帐户中的 blob。此外,此层提供一个架构注册表,用于结构化数据消息,这些消息可以在应用程序之间共享。

高级层提供的功能与标准层相同,但数量更多。标准层比基本层贵约两倍,而高级层比标准层贵约 34 倍(尽管用户不按消息付费,因此差异取决于使用情况)。

最后,选择代表事件中心容量的吞吐量单位。单个吞吐量单位的入口吞吐量最高可达每秒 1 MB 或每秒 1,000 个事件(以先到者为准),出口吞吐量最高可达每秒 2 MB 或每秒 4,096 个事件。标准层允许用户通过启用自动扩展来自动缩放吞吐量单位的数量。

创建命名空间”窗格应如下所示

现在,“**审阅 + 创建**”事件中心命名空间。

或者,使用 Azure CLI 通过以下命令创建命名空间

az group create –name data-streaming-example –location “West Europe”
az eventhubs namespace create –resource-group data-streaming-example 
   –name data-streaming-evhub –sku Basic –location “West Europe”

这些命令将创建资源组和命名空间。

创建事件中心

现在已经有了工作区,可以创建实际的事件中心。事件中心是一个接收消息并将其传递给下游应用程序的通道。

要创建事件中心,请进入命名空间,然后单击顶部的“**+ 事件中心**”按钮。然后,为事件中心命名 — 例如,weather — 以便流式传输天气数据。

分区计数会为下游并行处理组织消息,因此应用程序可能拥有的读取器越多,它需要的分区就越多。在此情况下,请保留默认值 2。

基本层用户无法更改消息保留期或启用捕获,因此请继续并“**创建**”事件中心。

或者,要使用 Azure CLI 创建事件中心,请使用以下命令

az eventhubs eventhub create –resource-group data-streaming-example 
   –namespace-name data-streaming-evhub –name weather –message-retention 1 –partition-count 2

基本层的默认消息保留期无效,因此请将其明确设置为 1。另外,请注意,使用 CLI 时默认分区计数为 4,而使用门户时为 2。

选择数据集

现在事件中心已就绪。接下来,获取一个数据集。该数据通常来自 IoT 设备、联网汽车或其他设备。但是,由于本教程中没有此类数据,因此我们将使用开放数据集,例如来自 Kaggle 数据集weather_description.csv 文件。

CSV 文件包含美国及海外各地城市的每小时天气描述(如晴朗、多云、有雾等)。城市是文件中的列,行包含日期和时间以及每个城市的描述。

由于数据可能来自多个气象站,因此我们将列拆分为行。36 个城市每小时提供 36 行。超过 45,000 行将随着时间的推移流式传输超过 150 万行(这需要一些时间)。但这将在 Python 应用程序中完成。

流式传输到 Azure

是时候开始编码了。确保本地计算机上安装了 Python。此外,创建一个名为 index.py 的文件,并将 weather_description.csv 放在同一文件夹中。

此外,此项目需要一个用于向 Azure 事件中心发送消息的包。使用 azure-eventhub 包(此处为 v5.7.0),它使用 AMQP 协议。

pip install azure-eventhub==5.7.0 –user

完整的应用程序只有 41 行代码

import time
import json
from csv import reader
from azure.eventhub import EventHubProducerClient, EventData

connection_str = '<Your connection string>'
eventhub_name = 'weather'

class Weather:   
    def __init__(self, date, city, description):
        self.date = date
        self.city = city
        self.description = description

    def __str__(self):
        return f"{self.date}, {self.city}, {self.description}"

def send_to_eventhub(client, data):
    event_data_batch = client.create_batch()
    event_data_batch.add(EventData(data))
    client.send_batch(event_data_batch)

def main():
    with open('weather_description.csv', 'r') as weather_descriptions:
        csv_reader = reader(weather_descriptions)
        header = next(weather_descriptions)

        cities = header[:-1].split(',')[1:]
        client = EventHubProducerClient.from_connection_string\
                 (connection_str, eventhub_name=eventhub_name)
        for row in csv_reader:
            date = row[0]
            i = 1
            for city in cities:
                weather = Weather(date, city, row[i])
                send_to_eventhub(client, json.dumps(weather.__dict__))
                print(json.dumps(weather.__dict__))
                i += 1
           
            time.sleep(3)

main()

此代码以一些导入、连接字符串和事件中心名称开头。指向事件中心工作区的连接字符串位于 Azure 门户中,在“**工作区**”下的“**共享访问策略**”中。单击“**RootManageSharedAccessKey**”可找到主密钥、辅助密钥以及两者的连接字符串。复制其中一个连接字符串并将其粘贴到应用程序中。

或者,使用 Azure CLI 使用以下命令获取连接字符串

az eventhubs namespace authorization-rule keys list –resource-group data-streaming-example 
   –namespace-name data-streaming-evhub –name RootManageSharedAccessKey

然后是 send_to_eventhub 函数。首先,代码使用 EventHubProducerClient(函数的一个参数)创建一个数据批。接下来,它将数据添加到批中并发送批。这部分代码很容易理解。

主函数更难理解,主要是因为它将列转换为行。代码打开 CSV 文件并获取标题。然后,它按逗号分割标题并检索除第一个(datetime 列)以外的所有值。现在有一个包含所有城市名称的数组。

然后,代码遍历 CSV 文件中的行。第一个值是 datetime,因此代码会提取它。然后,对于每一行,代码都会遍历城市。之后,只需将索引 i + 1 与城市匹配。

应用程序接下来从数据创建 Weather 对象,并将其转换为 JSON 字符串。发送 JSON 字符串现在可以更轻松地处理数据。之后,应用程序等待三秒钟,然后发送下一行。

测试应用程序

测试很简单。只需启动应用程序,然后观察它向 Azure 事件中心发送消息。代码中的 print(data) 函数应显示应用程序刚刚发送的内容。

在 Azure 中,转到“**工作区**”并找到事件中心。在“**事件中心概述**”中,单击“**weather”事件中心。消息应该会源源不断地涌入(稍有延迟)。

如果一开始看不到数据,请等待一到两分钟查看传入的请求。如果没有,代码中很可能存在问题。确保连接字符串有效。

后续步骤

Azure 在处理数据时具有许多优势,例如自动缩放和只为您使用的内容付费。您的服务还可以连接到 Azure 和本地的各种其他服务。

用户可以使用 Azure 门户或 Azure CLI 设置 Azure 事件中心命名空间和事件中心。然后,该事件中心可以接收来自数据源(如联网车辆或 IoT 设备)的信息。

阅读本三部分系列文章中的 后续文章,了解如何通过将事件中心连接到 Azure 流分析来处理所有这些数据。

要了解有关如何连接到数据源、可视化和发现重要信息并与团队共享这些信息的详细信息,请观看我们关于 Reactor YouTube 的 Power BI 数据分析入门介绍

© . All rights reserved.