使用 Azure Functions 将事件从 Azure 事件中心发送到 Azure SQL 数据库





5.00/5 (5投票s)
引言
IoT 解决方案包含一些经典场景 - 设备将事件发送到某个消息中心,该中心可以接收任意数量的传入消息,然后有一个服务负责接收消息并将其存储起来。Azure 平台提供了多种服务可以帮助你处理这种情况。
在本文中,我将演示如何实现一个 IoT 工作流,其中设备将事件发送到 Azure 事件中心服务,然后使用 Azure Functions 将这些事件移至 Azure SQL 数据库。
消息将采用 JSON 文档的格式 - 以下是一个消息示例
{"eventId":200,"deviceId":1,"value":25,"timestamp":"2017-01-22T16:59:46.6169948Z"}
DeviceId
和 value
是设备发送的假设值。value 可能是温度、压力、速度或设备发送的任何其他值。本文将涵盖以下主题
- 创建和配置 Azure 事件中心
- 创建和配置 Azure Functions,用于将事件从 Azure 事件中心 移至 Azure SQL 数据库
- 创建和配置 Azure SQL 数据库,用于存储事件并使你能够进行分析
创建事件中心
作为第一步,你需要一个 Azure 事件中心 来接收来自设备的事件。你需要以下内容
- 一个名为
code-project-event-hub-demo
的事件中心命名空间 - 一个名为
gatewaytosqldatabase
的事件中心
如果你熟悉事件中心,可以跳过本节的其余部分,直接转到“发送事件到 Azure 事件中心”部分。
否则,你可以按照本节中的说明配置 Azure 事件中心。
要创建 Azure 事件中心,首先需要创建一个包含该事件中心的 Azure 事件中心 命名空间。打开 Azure 门户,选择 **新建** > **物联网** > **事件中心**。在打开的窗格中,输入你的事件中心名称(例如,code-project-event-hub-demo
)
你可以选择定价层(标准或基本),并决定你的 Azure 事件中心应放置在哪资源组和区域。
注意:最好将 Azure 事件中心、Azure Function 和 Azure SQL 数据库放在同一区域,以避免这些服务之间的跨群集传输。
在上一步中,你创建了一个可能包含多个事件中心的命名空间。现在你需要创建至少一个事件中心来接收来自设备的事件。打开事件中心详细信息,选择 +事件中心,然后添加新的事件中心 - 在我的例子中,它将命名为 GatewayToSqlDatabase
你可以选择定义事件中心的属性,例如分区数、保留期等。
发送事件到 Azure 事件中心
在此示例中,我将使用 Microsoft Program Manager Paolo Salvatori 在其 演示示例 中创建的设备模拟器应用程序。此示例包含一个模拟设备并向事件中心发送 JSON 消息的演示应用程序
打开设备模拟器应用程序并填写以下文本框
- 命名空间是本文第一步中创建的 Azure 事件中心命名空间的名称 - 在此示例中为
code-project-event-hub-demo
- 事件中心是事件中心的名称 - 在此示例中为
gatewaytosqldatabase
- 密钥值是 Azure 事件中心命名空间的主密钥或次密钥。在 Azure 门户中打开事件中心命名空间,转到“共享访问策略”> `RootManagerSharedAccessKey` 并复制主/次密钥
如果你输入的值正确,则可以启动设备模拟器,你将在设备模拟器应用程序中看到一些消息已发送。
另外,如果你在 Azure 门户中打开 **事件中心** > **概述** 窗格,你会看到有新消息已发布到事件中心。
使用 Azure Functions 将事件从 Azure 事件中心移至 Azure SQL 数据库
现在我们需要一个服务来监视事件中心并获取发送到那里的事件。首先,你需要通过 Azure 门户使用 **新建** > **计算** > **函数应用** 来创建一个新的 Azure Function。在打开的窗格中,你可以设置函数应用的名称,选择资源组和位置(如果可能,使用与事件中心相同的位置),并选择一个存储帐户。
当 Azure 完成部署后,打开 Azure Function 的详细信息,选择 +新建函数,然后从模板库中选择 `EventHubTrigger`-CSharp(当事件中心收到新事件时运行的 C# 函数)。
选择该类型后,你应该输入以下信息
- 函数名称,例如
code-project-function-demo
- 事件中心命名空间名称
code-project-event-hub-demo
- 创建事件中心连接
- 输入事件中心名称
- 输入事件中心命名空间连接字符串
如果一切设置正确,你可以转到你创建的函数,打开 “</> 开发” 选项卡,查看默认生成的代码。如果你向事件中心添加了一些事件,可以运行该函数,并在“日志”窗口中查看事件中心的消息(如果未显示日志,请选择右上角的“日志”按钮)。
如果你看到任何问题,可以检查你的设置。所有设置都本地保存在 `function.json` 文件中。如果你按“**查看文件**”链接,你可以看到函数中所有文件的列表(最初只有两个文件:`function.json` 和 `run.csx`)。`function.json` 文件的内容应类似于
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "myEventHubMessage",
"direction": "in",
"path": "gatewaytosqldatabase",
"connection": "code-project-event-hub-demo/gatewaytosqldatabase",
"consumerGroup": "$Default"
}
],
"disabled": false
}
现在我们有了接收来自设备(或本例中的设备模拟器)的消息的 Azure 事件中心,以及每当新消息被传递到事件中心时将被调用的 Azure 函数。
现在是时候创建 Azure SQL 数据库并配置 Azure Function,使其将消息写入数据库而不是日志了。
设置 Azure SQL 数据库
在此示例中,我们需要一个 Azure SQL 数据库来存储设备(或本例中的设备模拟器)发送的消息。消息可以存储在各种存储中,例如
- Azure Blob Storage 或 Azure Table Storage,如果你需要以低存储成本存储大量消息。
- Azure Document DB,如果你希望能够搜索、过滤和查询消息。
- Azure SQL 数据库,如果你需要使用完整的 SQL 语言进行高级分析,并且需要将消息中的信息与数据库中的外部实体关联(例如,欺诈检测,你需要分析事件并将其与客户行为相关联)。
在此示例中,我将使用 Azure SQL 数据库作为目标。
- 将所有事件以其原始格式(作为 JSON 文本)存储
- 解析设备发送的 JSON 消息
创建目标表
首先,如果你还没有 Azure SQL 数据库,你需要创建一个。你可以在 Azure 门户中选择 **新建** > **数据库** > **SQL 数据库**
你可以输入数据库名称(例如,code-project-database-demo
),选择数据库所在的服务器(选择一个与事件中心和 Azure Function 相同位置的服务器)以及定价层。
我们需要一个表(我将其命名为 Events
),用于存储来自 Azure 事件中心的消息。这个表可以很简单 - 它可能有一个字符串列用于存储消息,以及一个可选的日期列,用于存储消息接收的时间信息。
CREATE TABLE [dbo].[Events](
Data NVARCHAR(MAX),
DateAdded datetime2 DEFAULT ( GETDATE() ),
INDEX cci CLUSTERED COLUMNSTORE
);
有趣的部分是定义在表上的 INDEX cci CLUSTERED COLUMNSTORE
(或者我将在文章的其余部分称之为 CCI)。聚集列存储索引以列存储格式组织表,该格式针对事件分析进行了优化。列存储格式的两个主要好处是
- 高压缩率 - CCI 可将你的表压缩 10-20 倍(取决于你的数据),因此你可能会发现表空间“消失”了 90-95%。
- 批量模式分析 - CCI 以“批量模式”处理行。与逐行处理数据的标准查询不同,CCI 一次获取 100-900 行并一起处理,因此查询速度可能比标准表快 10 倍。
注意:CLUSTERED COLUMNSTORE
索引仅在你的数据库为 Premium 层时可用。
另一种选择是创建一个包含 JSON 消息中每个字段的单独列的表
CREATE TABLE [dbo].[Events](
[EventId] [int] NOT NULL,
[DeviceId] [int] NOT NULL,
[Value] [int] NOT NULL,
[Timestamp] [datetime2](7) NULL,
INDEX cci CLUSTERED COLUMNSTORE
);
Table
包含从设备发送到 JSON 消息的字段。与前面的示例一样,我在表上添加了 CLUSTERED COLUMNSTORE
索引,以实现高压缩率和快速查询。问题是我们需要在将 JSON 消息添加到表中之前对其进行解析并提取字段。
插入消息的存储过程
我还会创建一个存储过程,该过程将接收发送到事件中心的 JSON 消息,对其进行解析,然后将其放入目标表中。
CREATE PROCEDURE dbo.ImportEvents @Events NVARCHAR(MAX)
AS BEGIN
INSERT INTO dbo.Events (Data)
VALUES( @Events);
END
在这种情况下,存储过程不执行任何复杂操作。它只是获取一个 string
消息并将其插入到 Events
表中。
在此过程中,我们假设你将从事件中心发送单个 JSON 消息。如果你计划发送消息数组,可以使用以下过程
CREATE PROCEDURE dbo.ImportEvents @Events NVARCHAR(MAX)
AS BEGIN
INSERT INTO dbo.Events (Data)
SELECT value FROM OPENJSON( @Events);
END
OPENJSON
函数将获取 JSON 对象数组并将其拆分为数组元素。它会为数组中的每个对象返回一行,以便你可以将其导入 Events
表。
更有趣的情况是将事件插入到为 JSON 消息中的每个字段具有单独列的表中。
CREATE PROCEDURE dbo.ImportEvents @Events NVARCHAR(MAX)
AS BEGIN
MERGE INTO dbo.Events AS ExitingEvent
USING (SELECT *
FROM OPENJSON(@Events)
WITH ([eventId] int, [deviceId] int, [value] int, _
[timestamp] datetime2(7))) AS NewEvent
ON (ExistingEvent.EventId = NewEvent.EventId)
WHEN MATCHED THEN
UPDATE SET
ExistingEvent.DeviceId = NewEvent.DeviceId,
ExistingEvent.Value = NewEvent.Value,
ExistingEvent.Timestamp = NewEvent.Timestamp
WHEN NOT MATCHED THEN
INSERT (EventId, DeviceId, Value, Timestamp)
VALUES(NewEvent.EventId, NewEvent.DeviceId, NewEvent.Value, NewEvent.Timestamp);
END
此过程将使用 MERGE
语句,该语句接受一组新行并检查它们是否存在于现有事件中。如果新事件集中的行与现有事件通过 EventId
值匹配,则 MERGE
语句将更新现有行,否则将从新事件中插入值到 Events
表中。
为了找到序列化在 JSON 消息中的新事件行集,我使用了新的 OPENJSON
函数,该函数解析 JSON 文本并按 WITH
子句中指定的名称提取属性。在这种情况下,我们从 JSON 文本中获取 eventide
、deviceId
、value
和 timestamp
。这些值将由 OPENJSON
函数返回的行返回,MERGE
语句将它们与现有事件进行比较。
你可以在 Paolo Salvatori 创建的 GitHub 示例 https://github.com/azure-cat-emea/servicefabricjsonsqldb 中看到如何使用此过程通过 Worker roles 和 Service Fabric 从事件中心导入事件。
分析消息
Azure SQL 数据库在存储事件消息方面最大的价值在于其最强大的 SQL 语言,可用于分析,例如 ORDER BY
、GROUP BY
、WINDOW AGGREGATE
等。
以下查询显示了如何按设备查找设备报告的平均值
SELECT DeviceId, AVG(value) as avgValue
FROM Events
GROUP BY DeviceId
这里的假设是你使用的是一个为 JSON 消息中的每个字段具有单独列的表。
如果你将消息存储在单个字符串列中,则可以使用 Azure SQL 数据库中提供的 JSON_VALUE
函数从 JSON 字段中获取值并在查询中使用它们。
SELECT JSON_VALUE(Data, ‘$.DeviceId’) as DeviceID,
AVG( (JSON_VALUE(Data, ‘$.value’) as float) ) avgValue
FROM Events
GROUP BY JSON_VALUE(Data, ‘$.DeviceId’)
WHERE DateAdded BETWEEN @startdate and @enddate
你可以看到将事件存储为单列或多列之间的区别。如果你选择一个为每个字段具有单独列的表,则需要在导入时(在过程中)解析 JSON 文本,并且如果消息发生更改,你需要更改表结构。如果你使用单个字符串列存储消息,则插入速度更快,并且在消息结构更改时具有更高的灵活性,但查询速度可能会变慢,因为你需要在查询中解析 JSON。
将消息从 Azure Function 写入 Azure SQL 数据库
现在是时候添加 Azure Function 代码,该代码将把消息从事件中心发送到 Azure SQL 数据库了。
首先,我们需要在 Azure Function 应用程序的连接中设置到 Azure SQL 数据库的连接。打开 Function 应用设置,在“开发”部分选择“**配置应用设置**”。转到“**连接字符串**”,然后添加一个具有以下参数的新连接字符串
- 名称“
azure-db-connection
” - 值 Server=tcp:[servername].database.windows.net;Database=[databasename]; User ID=[username];Password=[password];Trusted_Connection=False; Encrypt=True;
你会看到类似下图的内容
设置好连接后,我们需要添加一些数据访问代码。Azure Function 没有内置的连接到 Azure SQ 数据库的输出。因此,我们需要添加一些数据访问代码。
你可以使用 Entity Framework,但由于我们没有任何 C# 对象,这可能会有所开销。我们只需要执行一个简单的 SQL 语句,将一个 string
参数发送到数据库。
另一种选择是编写 ADO.NET 代码,但这可能会导致大量的数据访问代码,包括打开/关闭连接、处理异常等。
你可以使用 Dapper 框架在 Azure SQL 数据库上执行查询。为了使用 Dapper Framework,我们需要在 `project.json` 文件中添加对 Dapper NuGet 包的引用。在 Azure Functions 中,我们可以使用任何 NuGet 包,因此我们将添加一个 `project.json` 文件,内容如下
{
"frameworks": {
"net46": {
"dependencies": {
"Dapper": "1.50.2"
}
}
}
}
如果你使用 Dapper,你可以轻松地在 SQL 连接上执行任何 SQL 查询
using (var conn = new SqlConnection(_connectionString))
{
conn.Execute("<your query here>");
}
你可以在 Davide Mauri 博客 上找到有关在 Azure Functions 中使用 Dapper 的更多详细信息。
作为替代方案,我们可以使用任何其他数据访问框架。下面是一个引用 Belgrade SQL Client 数据访问库的项目文件示例
{
"frameworks": {
"net46":{
"dependencies": {
"Belgrade.Sql.Client": "0.6.2"
}
}
}
}
这也是一个简单的库,可以让你在 SQL Server 上执行查询。如果你想使用 Belgrade SQL Client,你可以用以下内容替换 `Run.csx` 文件的内容
using System.Configuration;
using System.Data.SqlClient;
using Belgrade.SqlClient.SqlDb;
public static async void Run(string myEventHubMessage, TraceWriter log)
{
if(String.IsNullOrWhiteSpace(myEventHubMessage))
return;
try{
string ConnString = ConfigurationManager.ConnectionStrings_
["azure-db-connection"].ConnectionString;
var cmd = new Command(ConnString);
var sqlCmd = new SqlCommand("ImportEvents");
sqlCmd.CommandType = System.Data.CommandType.StoredProcedure;
sqlCmd.Parameters.AddWithValue("Events", myEventHubMessage);
await cmd.ExecuteNonQuery(sqlCmd);
} catch (Exception ex) {
log.Error($"C# Event Hub trigger function exception: {ex.Message}");
}
}
首先,我们将检查输入是否为空字符串。这可能是因为你只是按“**运行**”来测试函数,在这种情况下,你可能不想将空 string
导入数据库。
然后,你需要获取名为“azure-db-connection
”的连接字符串(你在配置中添加的),创建一个执行 ImportEvents
过程的命令,将命令类型设置为存储过程,添加名为“Events
”的参数,并使用参数提供的值,然后执行该命令。
如果发生任何错误,你将在日志中看到异常。
端到端测试
现在是时候进行端到端的测试了。打开设备模拟器,仔细检查它是否指向你的事件中心,然后启动它。你应该在设备模拟器日志和 Azure Function 日志中看到消息正在通过管道传输。此外,如果你执行 SELECT * FROM
Events,你将看到表的内容已更改。
结论
Azure 平台提供了多种服务,可用于典型的 IoT 工作流
- 用于从设备接收事件的服务 - Azure Event Hub 和 Azure eIoT Hub
- 可用于永久存储事件的服务 - Azure SQL 数据库、Azure Blob Storage、Azure Document DB 和 Azure Table Storage。
- 可用于将事件从接收事件的服务移动到存储事件的服务。这些服务的示例是 Azure Functions、Azure Data Factory 和 Azure Stream Analytics。
在本文中,我演示了如何创建一个工作流,在该工作流中,你在 Azure Events Hub 中接收事件,然后使用 Azure Functions 将它们移至 Azure SQL 数据库。如果你查看代码,你会发现实现此工作流不需要大量代码。
还有其他备选解决方案可用于实现将事件从 Azure 事件中心加载到 Azure SQL 数据库的工作流。Paolo Salvatori 描述了一个类似的解决方案,该解决方案使用 Azure Service Fabric 作为移动消息的机制:https://github.com/azure-cat-emea/servicefabricjsonsqldb。
历史
- 2017 年 2 月 7 日:初始版本