针对 Kafka 开发者的 Azure 事件中心大规模流式传输(第 2 部分):使用 Python 中的 Azure Functions 处理流数据
本文中,我们将开发几个 Python 中的 Azure 函数,以处理我们收到的有效载荷并将数据保存到 Cosmos DB 实例。
在上一篇文章中,我们创建了一个事件中心实例和一个使用 Kafka API 发送消息的演示应用程序。大多数 Kafka 应用程序只需对应用程序的配置进行少量更改即可与事件中心配合使用。然而,这只是故事的一方面。我们还必须检索消息并使其有用。
为此,我们使用 Azure 函数,这是一种无服务器计算服务,允许我们运行代码而无需管理任何基础设施。Azure 函数有几种不同的绑定,使它们能够侦听包括事件中心在内的多种服务,从而极大地简化了构建体验。它们还会根据负载自动扩展,以处理大量生成的事件中心消息。
我们在 Python 中创建的每个函数也将不同的数据保存到 Azure Cosmos 数据库中。与 Azure 函数一样,Azure Cosmos DB 提供了一个无服务器的 NoSQL 数据库,允许我们保存数据而无需管理任何基础设施。在编写更多 Python 代码之前,让我们创建 Cosmos 数据库和 Azure 函数服务来处理生成的数据。
创建 Cosmos DB 实例
首先,我们需要一个地方来存储数据。
打开 Azure 门户并转到我们在第一篇文章中创建的资源文件夹。单击创建资源按钮,然后搜索Azure Cosmos DB选项。我们使用核心选项来利用 Azure 函数可用的输出绑定。
选择订阅和资源组,然后根据需要为您的服务提供帐户名称和位置。我选择了无服务器选项作为容量模式,但是如果您想要用于生产的专用资源,预配吞吐量可能是一个更好的选择。
单击审查 + 创建以初始化 Cosmos DB 实例。这可能需要几分钟,具体取决于配置,但我们现在应该有一个地方来存储我们的数据。
如果您想了解更多关于 Azure Cosmos DB 的信息,或分享您所知道的,请查看Azure Cosmos DB Conf;这是一个与 Azure Cosmos DB 社区合作组织的免费在线虚拟开发者活动。
创建 Azure 函数应用
现在我们有了一个存储数据的地方,我们需要一个函数来从我们的事件中心检索消息并将其保存到我们的 Cosmos DB 中。
再次打开资源组,点击创建,然后找到Azure 函数选项。像 Cosmos DB 配置一样,选择资源组并为函数应用命名。选择代码选项而不是docker以减少所需的管理。最后,选择Python作为代码库,3.9作为版本,然后点击审查和创建。
与 Cosmos DB 一样,这可能需要几分钟才能创建,但一旦完成,我们现在应该有一个函数应用来托管我们的 Azure 函数。在我们转向代码之前,最后一步是为我们的函数应用创建一个共享访问策略 (SAS),以便从事件中心读取消息。
为此,请打开事件中心命名空间,然后打开我们创建的事件中心,然后单击左侧的共享访问策略菜单。添加一个具有侦听事件中心权限的新策略。
创建我们的第一个函数
我们使用 Visual Studio (VS) Code 来创建我们的第一个函数。此外,我们使用 Azure Function Core 工具和 Azure CLI 在本地开发我们的函数,然后从 VS Code 部署它们。安装这些组件后,打开 VS Code 并单击左侧的扩展按钮。查找并安装Azure 函数扩展,它也应该安装Azure 帐户和Azure 资源扩展。
安装这些扩展后,我们应该会看到一个 Azure 图标,我们可以打开并用于登录我们的 Azure 租户。在函数部分,单击创建新项目选项,然后会提示我们配置我们的函数应用程序。配置步骤如下:
- 选择一个文件夹来创建项目。这是您保存所有函数代码的地方。
- 选择用于开发函数和版本的语言。为了一致性,本文使用 Python 3 绑定。
- 选择 函数触发器类型,这将是 Azure 事件中心触发器。事件中心触发器将函数配置为侦听事件中心消息以运行。
- 输入函数的名称。我们的第一个函数保存客户详细信息,所以让我们将其命名为
airline-customer-save
。 - 登录您的 Azure 帐户,该帐户将在新的浏览器窗口中打开以访问 Azure 资源。
- 选择您的Azure 订阅、事件命名空间和我们在第一篇文章中创建的事件中心。
- 最后,选择我们创建的侦听共享访问策略,并保留$Default主题。
完成这些步骤后,VS Code 会为我们的函数设置样板代码。以这种方式设置的任何 Azure 函数项目都包含文件 host.json 中的一些函数应用设置、文件 local.settings.json 中的一些本地设置以及文件 requirements.txt 中的任何 Python 依赖项。此外,对于函数应用中托管的每个函数,都有一个带有函数名称的文件夹、一个包含函数绑定的 function.json 文件和一个默认名称为 __init__.py 的脚本文件。
如果我们点击左侧的 Azure 图标并打开函数部分,我们应该会看到一个带有箭头的云图标,名为部署到函数应用。
这会要求提供订阅和要部署的函数应用。选择这些后,VS Code 会将主函数部署到我们的函数应用。
我们现在已经部署了函数应用,但在我们添加连接详细信息到函数应用之前,它不会处理任何消息。如果我们查看 function.json 文件,我们会看到事件中心触发器通过变量连接中的名称链接到我们的事件中心。我们必须将名称和事件中心共享访问策略中的连接字符串添加到我们的函数应用。
在 Azure 控制台中打开函数应用,并从左侧菜单中选择配置菜单项。添加一个新的应用程序设置,其名称作为我们代码中的连接字符串,值作为共享访问策略的主连接字符串,然后单击确定。请务必保存这些设置。
使用函数保存数据
我们的函数应用现在正在处理消息,但除了将信息事件注册到日志中之外,它什么也没做。为了保存我们的数据,我们使用输出绑定连接到我们的 Cosmos DB 实例。
打开函数目录中的 function.json 文件,并在事件中心部分之后添加以下 JSON 到绑定中
{
"type": "cosmosDB",
"direction": "out",
"name": "doc",
"databaseName": "airlineData",
"collectionName": "customer",
"id": "id",
"partitionKey": "/id",
"createIfNotExists": "true",
"connectionStringSetting": "AzureCosmosDBConnectionString"
}
此代码块允许我们访问 doc
变量以将对象保存到我们的 Cosmos DB 中。它还指定了数据库名称、集合名称、ID 值和分区键。它还设置了一个标志,用于在数据库和集合不存在时创建它们。最后,连接字符串链接了我们需要检索 Cosmos DB 连接字符串的函数应用设置。
要获取此字符串,我们打开我们的 Cosmos DB 实例,从左侧菜单中选择键选项,然后复制连接字符串。我们可以将此字符串添加到文件 local.settings.json 中以进行本地调试,但我们还必须在我们的函数应用中创建一个新的应用程序设置键。
要将数据保存到 Cosmos DB,我们现在必须更新我们的 Function
参数以包含 doc
变量,并在我们的 Function 中使用该变量来保存数据。我们的代码现在应该看起来像这样:
def main(events: List[func.EventHubEvent], doc: func.Out[func.Document]):
for event in events:
logging.info('Python EventHub trigger processed an event: %s',
event.get_body().decode('utf-8'))
event_body = event.get_body().decode('utf-8')
doc.set(func.Document.from_json(event_body))
后续步骤
我们现在有了一个简单的函数,可以将 Kafka 事件数据直接保存到 Cosmos DB 中。在最后一篇文章中,我们将通过添加 Power BI 在我们的 Cosmos DB 数据集之上构建报告和仪表板来完成此应用程序的构建。为此,我们将扩展此函数来操作此数据,并使用不同的函数保存单个组件以处理每个用例。
要了解如何加速基础应用程序开发,请查看开发人员的低代码应用程序开发 7 步指南。