65.9K
CodeProject 正在变化。 阅读更多。
Home

Azure Function 作为 Azure Stream Analytics 作业的输出作业拓扑

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2018 年 12 月 19 日

CPOL

6分钟阅读

viewsIcon

12752

在本文中,我们将了解如何将 Azure Function 设置为 Azure 流分析作业的输出作业拓扑。听起来很有趣,对吧?

Azure IoT 开发套件

引言

在过去的几天里,我一直在玩我的 Azure IoT 开发套件 MXChip。在本文中,我们将了解如何将 Azure Function 设置为 Azure 流分析作业的输出作业拓扑。听起来很有趣,对吧?在我们之前的 文章 中,我们已经了解了什么是 Azure 流分析作业以及如何通过门户和 Visual Studio 创建它。如果您还没有阅读这些文章,我强烈建议您阅读它们。现在让我们开始本文。

背景

正如我之前提到的,在本文中,我们将

  1. 使用我们现有的 Azure 流分析作业
  2. 创建一个新的 Azure Function 应用
  3. 将新创建的 Azure Function 设置为流分析作业的输出作业拓扑
  4. 监控来自流分析作业到 Azure Function 的数据。

玩转 Azure Function

是的,我们要玩玩它。让我们去创建一个吧。

创建 Azure Function

要创建 Azure Function 应用程序,您需要登录到您的 Azure 门户并单击“创建资源”图标,然后您可以搜索“Function App”。

在下一个屏幕上,提供以下信息

  1. 应用名称
  2. 订阅
  3. 资源组
  4. 操作系统
  5. 托管计划
  6. Location
  7. 运行时堆栈
  8. 存储
  9. Application Insights

在这里,消耗计划托管允许您按执行付费,而应用服务计划允许您拥有预定义的容量。对于运行时堆栈,我们将使用 .NET,但您可以自由使用任何您想要的。

创建完成后,您应该可以在“Function Apps”部分看到它。

创建 Azure Function 解决方案和 Function

现在让我们进入 Visual Studio,为我们的 Azure Function 创建一个新的解决方案。

Azure Function App 项目类型

现在您可以右键单击新创建的项目并添加一个新的 HttpTrigger Function。暂时我们将访问权限保持为 Anonymous。我将我的函数命名为“GetData”。目前,我们只是从流分析作业中获取数据,并检查其长度。

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;namespace ml.IoTPlatform.AzureFunctions
{
    public static class GetData
    {
        [FunctionName("GetData")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            ILogger log)
        {
            log.LogInformation($"GetData function triggered with Uri {req.RequestUri}");         
            string content = await req.Content.ReadAsStringAsync();
            log.LogInformation($"String content is {content}");
            dynamic data = JsonConvert.DeserializeObject(content);
            log.LogInformation($"Data count is {data?.Count}");
            if (data?.ToString()?.Length > 262144)
            {
                return new HttpResponseMessage(HttpStatusCode.RequestEntityTooLarge);
            }
            return req.CreateResponse(HttpStatusCode.OK, "Success");
        }
    }
}

正如您所见,我们目前并没有做太多事情,我们只是将数据接收为 HttpRequestMessage,然后读取内容为 req.Content.ReadAsStringAsync(),然后反序列化对象。如果您不执行此步骤,您可能会收到一个错误:“No MediaTypeFormatter is available to read an object of type ‘Object’ from content with media type ‘application/octet-stream’.” (没有可用的 MediaTypeFormatter 来读取内容类型为“application/octet-stream”的对象。)

我们还检查实体长度,如果太长,我们会发送一个状态码为 413 的 HttpResponseMessage

发布 Azure Function App

要发布您的 Azure Function App,只需右键单击您的项目并单击“Publish”,然后通过选择现有的 Azure Function App 来设置您的发布目标,还记得我们之前创建了一个吗?发布完成后,您可以进入您的 Function App 并查看您的 Function。您还可以使用一些虚拟数据进行测试。

当您尝试从 Visual Studio 发布正在运行的 Function App 时,可能会出现“Web Deploy cannot modify the file on the destination because it is locked by an external process”(Web Deploy 无法修改目标上的文件,因为它被外部进程锁定)的错误。要解决此问题,您可以在 这里 查看我的答案。

门户中的 Function App

Azure 流分析作业

既然我们已经成功配置了 Azure Function App,现在让我们回到 Azure 流分析。

配置 Azure Function 输出

在我之前的 文章 中,我们使用 Visual Studio 创建了一个 Azure 流分析作业解决方案,现在让我们打开该解决方案并为 Azure Function 配置新输出。

解决方案资源管理器

配置 Azure Function 输出时,请确保您选择的是现有的 Azure Function App。

Azure Function 输出配置

更新脚本

我们还应该对我们的 _Script.asaql_ 文件进行一些更改,以支持我们新创建的输出。

WITH BasicOutput AS 
(
SELECT    
    messageId,
    deviceId,
    temperature,
    humidity,
    pressure,
    pointInfo,
    IoTHub,
    MAX(EventEnqueuedUtcTime) AS EventEnqueuedUtcTime,
    EventProcessedUtcTime,
    PartitionId    
FROM
    Input TIMESTAMP By EventEnqueuedUtcTime
    GROUP BY TUMBLINGWINDOW(second, 10), 
    messageId, 
    deviceId,
    temperature,
    humidity,
    pressure,
    pointInfo,
    IoTHub,
    EventEnqueuedUtcTime,
    EventProcessedUtcTime,
    PartitionId
)
SELECT * INTO SQLServerOutput FROM BasicOutput
SELECT * INTO AzureFunctionOutput FROM BasicOutput

更新 TLS 版本

完成这些操作后,只需单击“提交到 Azure”按钮。如果您在此部分有任何疑问,请阅读我之前关于此主题的帖子。现在让我们再次登录门户,查看所有输出、输入和查询是否已发布。

门户中的输出

太棒了!干得好,看起来已经发布了。现在,如果您单击 AzureFunctionOutput,您可能会收到一个警告:“Please make sure that the Minimum TLS version is set to 1.0 on your Azure Functions before you start your ASA job”(请确保在启动您的 ASA 作业之前,将您的 Azure Function 的最低 TLS 版本设置为 1.0)。我宁愿将其视为错误而不是警告,因为如果不进行这些更改,我们的 Azure 流分析作业将无法写入我们的 Azure Function。所以这一点非常重要,我花了几个小时才解决这个问题,并最终找到了根本原因,您可以在 这里 查看我关于此问题的答案。

因此,只需转到您的 Azure Function App,然后单击“Platform Features -> SSL -> Minimum TLS Version”(平台功能 -> SSL -> 最低 TLS 版本)

设置 TLS 版本

有句俗话说,开发者只关心错误而不关心警告,在某些情况下确实如此。嗯,我只是在开玩笑。

输出

完成所有提到的操作后,您就可以开始您的流分析作业了,请确保您的 MXChip 已连接到电源,以便设备可以开始发送数据。

检查 SQL Server 输出

现在让我们登录到我们的 SQL Server 数据库并运行以下查询,以确保我们正在从设备接收数据。

SELECT TOP (1000) [Id]
      ,[messageId]
      ,[deviceId]
      ,[temperature]
      ,[humidity]
      ,[pressure]
      ,[pointInfo]
      ,[IoTHub]
      ,[EventEnqueuedUtcTime]
      ,[EventProcessedUtcTime]
      ,[PartitionId]
  FROM [dbo].[StreamData] order by [EventEnqueuedUtcTime] desc

SQL Server 输出数据

检查 Azure Function 输出

要检查 Azure Function 输出,我们可以回到我们的 Azure Function,单击 Function,然后使用Monitor(监视)选项。

Azure Function 输出数据

请注意,如果您发现某些内容不起作用,您可以随时检查您的 Azure 流分析作业活动日志。

结论

在本篇文章中,我们学习了如何

  1. 使用 Azure 流分析作业
  2. 创建 Azure Function App
  3. 在 Visual Studio 中创建 Azure Function App 解决方案
  4. 编写一个 HttpTrigger 函数并将其发布到 Azure Function App
  5. 将 Azure Function App 设置为 Azure 流分析作业的输出作业拓扑
  6. 在另一个解决方案中使用创建的包

在我们的下一篇文章中,我们将看到如何将此 Azure Function 输出数据发送到 Azure SignalR 服务,然后在 Angular 应用程序中获取相同的数据。我迫不及待地想写我的下一篇文章。

现在轮到你了。你有什么想法?

非常感谢您的阅读。我很快就会带着另一篇关于同一主题的文章回来。我是否遗漏了您认为需要的内容?您觉得这篇文章有用吗?请不要忘记分享您的反馈。

您始终可以在 这里 查看我的 IoT 文章。

© . All rights reserved.