65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Overlord 管理、监控和控制所有物联网设备

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.15/5 (6投票s)

2015年3月24日

CPOL

33分钟阅读

viewsIcon

24234

downloadIcon

379

Overlord 是一个开源的 .NET 物联网管理平台,使用 C# 编写,运行在 Microsoft Azure 上。

目录

  1. 概述
  2. 状态
  3. 目标
  4. 为什么使用 .NET 构建云物联网应用程序?
    1. 静态类型和编译器基础设施
    2. Visual Studio 2013
    3. 性能
    4. 文档
  5. 设计
  6. Azure 服务
  7. 存储
    1. 数据流
    2. 云物联网应用的数据并行性
    3. 为云物联网应用存储时间序列数据
    4. 实体
  8. 日志记录
  9. 安全
  10. 测试
  11. 关注点

概述

Overlord 是一个云规模的物联网平台,提供一个 HTTP REST API,用于与任何可以连接到 Web 或任何私有网络的设备进行双向数据传输和通信(通过 TCP/IP)。Overlord 提供了一个 API 来管理设备、传感器、读数、通道、警报和消息。设备可以注册传感器,并通过简单的 REST API 将数据推送到服务器,服务器会确认接收并响应任何为该设备生成的消息或警报。设备传感器数据(按通道分组)可以通过简单的请求-响应模型流式传输到订阅的设备,客户端发送最后一个通道数据项的索引或日期。设备可以为通道设置警报,这些警报可以在传感器数据超过某个阈值时触发。可以通过 API 从一个设备发送按需消息到另一个设备。设备的的消息和警报被放入队列中,并按接收顺序取出,当设备确认已处理消息或警报时。

您可以将 Overlord 用作传感器网络上的聚合节点,用于存储或复制您的传感器数据到公共云。Overlord 不需要您的物联网设备理解 OData 等协议。您可以从 Python 或 Bash 命令行脚本中使用 Overlord,而无需安装任何客户端库。

Overlord 托管在 Github 上。

状态

0.1.0 版本将于 2015 年 3 月 31 日 完成。截至目前,我已提交了安全层和约 80% 的存储层。我还编写了一套通过了存储层和安全层单元测试的测试用例。

下载当前的 Overlord 源代码。

您可以在 Visual Studio 2013 中打开解决方案。默认情况下,Overlord 在调试模式下将使用 Azure Storage Emulator。已启用 Nuget 包恢复,因此项目所需的所有 NuGet 包都应已安装。在 Overlord.Testing 项目中运行所有单元测试,并确保它们都成功完成,特别是 ingestSensorValues 单元测试。您可以直接在 Visual Studio 中查看设备、用户、通道、设备通道、摘要表和队列等的内容。

您可以查看我的 Github  以获取进一步的更新和发布。

目标

市面上有许多物联网管理平台,其中一些是免费且开源的。微软自己也公布了其 Azure Intelligent Systems Service 的计划,该服务利用微软为其 Azure PaaS 构建的大量服务,让用户管理任何可想象设备的所有数据。Overlord 是一个开源的物联网平台,与其他物联网平台相比,其理念略有不同。

许多 HTTP 物联网平台共同的一个缺点是,物联网设备需要使用特殊的客户端库或支持 JSON 解析等操作来发送和接收数据。一个成本非常低的 Arduino 设备可能只有 32K 的闪存用于代码和 2K 的 SRAM 用于数据,并且可能使用以太网或 Wi-Fi 盾牌,这些在网络堆栈中没有加密和 HTTPS 功能。或者,假设您正在使用像 Freescale 开发的低成本微控制器,其带有嵌入式以太网接口。这是 MQTT 和 CoAP 等协议旨在解决的场景,但这些协议使用 UDP 并为传感器节点和设备之间的 M2M 通信而设计,而不是将数据发送到公共云。

您可能希望通过 HTTP 发送和流式传输数据到云端,而无需执行 JSON 解析或支持 WebSocket 等操作,这可能需要更像 Raspberry Pi 这样的全功能计算机的物联网设备。您可能拥有现有的物联网应用程序或网络,它们使用某种特定的语言、库或技术,您不想对其进行重大修改即可使用云数据存储和通信。

Overlord 的目标是成为一个零占用的物联网数据存储和通信平台。设备无需任何特殊的客户端库即可将数据发送到 Overlord 或流式传输数据通道或接收警报和消息。任何拥有标准 TCP/IP 堆栈且能够发出 HTTP 请求的设备都可以连接到 Overlord,而无需额外的客户端库或执行 JSON 或 XML 解析或使用 WebSocket 或 HTTP keep-alives。  对于运行 Linux 的嵌入式设备,例如我正在用作测试设备的 Raspberry Pi 2,您可以通过终端会话使用 Overlord API,例如,您可以编写一个简单的 shell 脚本来轮询 GPIO 引脚并使用 wget 或 curl 将其状态发送到 Overlord。对于可以使用 C 编程的 Arduino 或 Freescale Freedom 等嵌入式设备,您可以使用标准的 TCP/IP 网络堆栈或 Arduino WebClient 等客户端库连接到 Overlord 服务器来发送和接收数据,而无需额外的库。您的设备发送和接收的数据格式为扁平的 CSV 类型结构,可以用 C 编写的程序或 sed 等标准 Unix shell 工具轻松解析。您可以利用 Overlord 来镜像您的设备当前收集的数据,而无需大幅修改您现有的控制器设备或数据节点。

Overlord 被设计成高度可扩展且可插入,支持不同的托管环境、存储提供商和网络协议。您可以将自己的 Overlord 实例托管在您的 Azure 网站上,甚至可以将其作为私有网络上的独立服务器进程运行,而 Azure 只用作存储后端。

为什么使用 .NET 构建云物联网应用程序?

市面上有许多云应用程序使用 JavaScript、Python 或 Ruby 等语言,并在 Node.js 等平台上运行。例如,MeshBlu 是一个运行在 Node 上的开源物联网管理平台。

在我看来,像 C# 这样强制执行强类型和静态类型检查的语言比 JavaScript、CoffeeScript 和 Python 等语言更适合进行云规模的开发。像 .NET Framework 和 .NET 运行时这样的框架,以及像 Task Parallel Library 这样的类库,更适合云应用所需的此类多层开发、安全性和性能要求。构建云应用使用 .NET 的一些好处包括:

1. 静态类型和编译器基础设施

我从不认为 C# 等语言的静态类型是障碍。当我编写 JavaScript 代码时,我总是要调整参数顺序并进行防御性编程以防止意外值。

软件开发技术变化非常迅速。对于一个有多个层和许多移动部件的应用程序(如 Overlord),或者我必须学习新库或 API 的情况,我无法想象没有编译器类型检查的“安全毯”。无论是使用 Azure Table Data Service 还是 Windows Identity Framework,我事先就知道方法的返回和参数对象类型,这多次挽救了我。在 JavaScript 中使用 jQuery.Ajax 后,我知道在不确定方法调用会返回什么的情况下学习库有多难(我得到的是什么:对象还是字符串还是字符串化对象还是???? )

静态类型和 .NET 的高级编译器基础设施带来的许多好处包括:

扩展方法

扩展方法是我在任何语言中遇到的最酷的东西之一。扩展方法基本上是在编译时由编译器翻译的别名,这意味着您可以非常简单地扩展现有类型。扩展名仅在您拥有的命名空间中可见,这又是一个巨大的优点。

扩展方法是解决常见问题的优雅方案,这些问题通常需要笨拙的解决方案。例如,在我的存储库中,我有一些 System.String 扩展

namespace Overlord.Storage
{
    public static class StringExtensions
    {
        public static Guid UrnToGuid(this string urn)
        {
            string prefix = "urn:uuid:";            
            if (urn.IndexOf(prefix) == 0)
            {
                return Guid.ParseExact(urn.Substring(prefix.Length), "D");
            }
            else
            {
                return Guid.ParseExact(urn, "D");
            }
        }

        public static string UrnToId(this string urn)
        {
            string prefix = "urn:uuid:";
            if (urn.IndexOf(prefix) == 0)
            {
                return urn.Substring(prefix.Length);
            }
            else return urn;
        }

        public static Guid ToGuid(this string s)
        {            
            return Guid.ParseExact(s, "D");
        }
    }
}

这使我可以在一个地方处理解析 GUID 标识符(如设备 ID)的问题。所以,我可以自然地说:

"urn:uuid:d155074f-4e85-4cb5-a597-8bfecb0dfc04".UrnToGuid()

非常自然。原则上,动态语言(如 JavaScript)也可以实现这种类型的扩展,但显然让 C# 编译器处理工作意味着为您节省了大量工作,并且比运行时扩展对象原型更安全。

LINQ

LINQ 在我刚开始的时候花了我一些时间才弄明白,而且我仍然是新手。但一旦你“理解了”,它就能让处理集合、列表和数组变得容易得多。使用 LINQ 以类型安全的方式查询数据集合比使用 JavaScript 中的 Underscore.js 等库要容易和安全得多。

LINQ 使 C# 能够实现您在函数式语言中可能看到的那种单行解决方案。例如,当我添加设备读数时,我必须检查传感器名称和传感器值是否有效。当您在设备上创建传感器时,必须命名它们,例如“D2”或 N1”,其中第一个字母代表传感器数据类型:DateTime 或 Number(以我们的两个示例为例)。在将传感器值写入存储之前,我可以使用简单的 LINQ 查询和名为 IsValidSensorName 的字符串扩展方法来测试是否有任何传感器名称不正确。

if (values.Any(v => !v.Key.IsVaildSensorName()))

我可以使用 LINQ 查询抛出异常,以生成一个格式正确的错误传感器值列表。

if (values.Any(v => !v.Key.IsVaildSensorName()))
{
    string bad_sensors = values.Where(v => !v.Key.IsVaildSensorName())
                    .Select(v => v.Key + ":" + v.Value).Aggregate((a, b) => 
                        { return a + " " + b + ","; });
    throw new ArgumentException("Device reading has bad sensor names. {0}", bad_sensors);
}

Where LINQ 函数首先过滤掉所有无效的传感器名称-值对。Select LINQ 函数然后将这些值投影到一个简单的字符串列表中。然后Aggregate LINQ 函数将此列表聚合到一个用空格和逗号格式化的单个列表中。(我认为您还可以使用 LINQ Concat 查询一步完成这些操作。)

Task Parallel Library

The Task Parallel Library是一套完整的 .NET 类型和方法,用于轻松安全地在 .NET 中执行并行和并发操作。TPL 消除了您处理云应用程序中多线程问题的需求,并提供了熟悉的语言构造,用于将数据和任务并行性添加到您的代码中。

LINQ 和 TPL 使我们能够用更少的代码行完成复杂的代码,否则需要更多代码行。例如,在我使用的 Digest worker role 中,我必须执行以下操作:

  1. 从 Azure Storage Queue 读取最多 32 条消息。
  2. 将每条消息从其序列化字符串形式转换为 IStorageDeviceReading 实例。
  3. 根据读数的设备 ID 将每条消息分区到组中。每个组必须按读数时间升序排序。
  4. 现在,为每个组并行地将读数放入表存储中。
IEnumerable<CloudQueueMessage> queue_messages = storage.GetDigestMessages(32);
if (queue_messages != null)
{
    IEnumerable<IStorageDeviceReading> messages =
    queue_messages.Select((q) => JsonConvert
         .DeserializeObject<IStorageDeviceReading>(q.AsString));
    IEnumerable<IGrouping<Guid, IStorageDeviceReading>> message_groups = messages
        .OrderBy(m => m.Time)
        .GroupBy(m => m.DeviceId);
    Parallel.ForEach(message_groups, message_group =>
    {
        Log.Partition();
        foreach (IStorageDeviceReading m in message_group)
        {
            storage.AddDigestDeviceReading(m);
        }
    });

LINQ 和 TPL 使我能够以一种非常紧凑、可读的方式完成这个复杂的操作。

C# 拥有如此多的语言特性,结合 .NET 类库和运行时,可以使您的代码更短、更具可读性、更易于理解。但您仍然不必与 JavaScript 等动态类型语言的模糊性打交道。云规模的物联网应用程序必须以安全的方式处理大量不同类型的数据,可以从 .NET 和 C# 等静态类型语言中受益匪浅。

2. Visual Studio 2013

Visual Studio 2013 Community Edition 是免费的!很难找到一个 IDE,无论免费还是付费,您都可以说它比 Visual Studio 更好。构建端到端云应用程序所需的一切,从测试到版本控制(您可以直接从 VS 同步到 Github)到服务器管理,再到 HTML 和 JS 编辑,再到对 Python 等开源语言的支持,都集中在一个地方。此外,它还有出色的 NuGet 包管理器,用于管理 jQuery 等第三方库,并且有大量社区制作的扩展(如 xUnit 测试运行器)可与 Visual Studio 集成。

Visual Studio 2013 Community 附带一套完整的工具,用于管理您的 Azure 服务以及开发和部署您的云应用程序。您可以直接将 ASP.NET 项目发布到 Azure 网站,或者将您的 Web 应用程序和类库作为 Azure Cloud Service 发布。您可以直接查看和编辑 Azure 存储表和 SQL 数据库。直接在 VS 中查看我的 Azure 表和队列内容的能力节省了大量时间,并使云应用的开发和部署尽可能顺畅。我还可以创建 C 和 Python 项目,以便将来使用不同的客户端和网络库测试我的物联网 API,这也非常棒。

3. 性能

Node.js 因其异步设计而备受赞誉,并且如今在 Web 开发人员中非常流行。但据我所知,我看不到 Node 的任何功能是 C# 中的异步方法做不到的。C# 中已经包含了对异步调用、lambda 表达式和类型推断的编译器支持,以及 TPL 类库。JavaScript 是一种不错的轻量级语言,用于脚本文档和连接 UI 组件,但我会犹豫是否将其用于高流量网络应用程序。构建使用大量不同组件和库的应用程序时,动态类型似乎不是一个好主意。

.NET 应用程序都编译为原生代码,因此对于长期运行的进程,我认为 .NET 代码将比解释型语言具有明显优势。我计划为 Overlord 和 Node.js 物联网平台(如 MeshBlu)编写一系列基准测试,并查看它们的比较结果,敬请关注。

4. 文档

.NET 已经存在很长时间了,并且其商业企业根源意味着文档始终是优先事项。例如,我在 Python 中工作时没有遇到过同样的理念。开源在很多方面都很好,但新库的文档不是其中之一。在构建 Overlord 时,我从未在一个问题上卡住太久,以至于 MSDN 或 StackOverflow 文章没有为我指出错误所在……即使我使用的是 Azure Storage SDK 等新版本库。

除了 .NET 和 Azure 文档之外,微软还发布了大量架构指导、模式、代码示例和完整的示例,用于构建全栈云应用程序。 the patterns and practices 团队编写了 Cloud Design Patterns免费电子书Building Cloud Apps with Microsoft Azure。此外,还有大量的代码示例和博客文章,涵盖了微软作家和博主以及独立作者关于 Azure 和 ASP.NET 和 Entity Framework 等的所有内容。人们长期以来一直在使用 .NET 构建分布式多层应用程序,因此要找到构建任何类型云应用程序的指导、解决方案和模式并不难。

设计

Overlord 的设计遵循了促进模块化、可插拔性和可测试性的核心原则。Overlord 被分解为几个不同的 C# 项目:

Overlord.Security:这是一个 C# 库,实现了 Overlord 的基于声明的身份和授权 模型

Overlord.Storage:这是一个 C# 库,提供了实现 Overlord 用户、设备、传感器、读数、通道、警报和消息存储的接口。它自带一个 Azure Table Storage 的存储提供程序。其他存储提供程序可以使用 IStorage* 接口轻松插入。

Overlord.Digest.Azure 这是一个 C# 库,负责所有传入的传感器数据处理和用于处理传出消息和警报的队列处理。当传感器数据被推送到 Overlord 时,会生成一个队列消息,Digest 会读取该消息,检查其所属的通道以及传感器上设置的警报,然后将数据写入正确的数据表。它目前被编写为 Azure Cloud Service Worker Role。

Overlord.Core:这是一个 C# 库,实现了核心 REST API 操作:用户、设备、传感器、读数、通道、警报和消息的 CRUD。添加设备和传感器、存储传感器读数、流式传输数据通道、发送和接收警报和消息,都通过核心 API 公开的操作完成。核心 API 不关心用于访问它的网络协议,因此支持 TCP/IP 和 HTTP 以外的其他协议(如 UDP 和 CoAP 等物联网协议)非常容易。

Overlord.Http:这是一个 ASP.NET Web 项目,通过 HTTP 提供对 Overlord 核心 API 的访问。它使用了 ASP.NET MVC Web Api 2 框架。

Overlord.Testing:这是一个 C# 库,包含应用程序每个层的单元测试和其他测试。它依赖于 xUnit 测试库。

为这些层设置单独的项目,可以让我非常快速地测试应用程序的每个部分。如果我想测试一个存储方法,就不需要模拟 Web Api 控制器对象,也不需要在测试运行期间加载 ASP.NET MVC 库。未来的计划是创建一个仪表板 UI,用户可以在其中监控他们的设备并绘制传入的传感器数据图。

Azure 服务

Azure Table Storage 是一个低成本、高可扩展性的数据存储平台,非常适合存储和检索大量非关系型结构化数据。

Azure Queue Storage:Overlord 使用存储队列进行低成本、高可扩展的异步数据处理和设备间的消息传递。传入的设备读数被序列化为队列消息,然后由 Digest 云服务工作角色 FIFO 顺序摄取。Overlord 按需或在 Digest 触发传感器警报时将消息传递给设备。

Azure Cloud Services:Overlord 使用 Cloud Service 中的工作角色异步处理传入的传感器数据,并将其 FIFO 作为设备可以读取和查询的时间有序数据流发布为通道。

Azure Websites:Overlord HTTP 服务器是一个运行在 Azure Websites 上的 ASP.NET Web 应用程序。我也可以选择将其作为 Cloud Service Web Role 部署。

Microsoft Azure 提供了大量技术,如 Event HubsService Bus,可用于构建捕获遥测数据或进行消息传递的应用程序。我选择编写自己的存储和消息传递层,因为我想学习如何设计和编码云规模的系统。Overlord 的目标是实现一个 REST API,资源受限的设备可以使用该 API,而无需进行 OData 消息解包等繁重的工作。

存储

存储是 Overlord 中最大的层,也是我花费时间最多的地方。我基本上选择创建自己的存储和消息传递层,而不是使用 Azure SQL 数据库或 Azure Event Hubs 或 Service Bus 等服务。这花费了很多精力,但我在此过程中玩得很开心,学到了很多东西,以及如何思考异步操作、并发和数据并行性。

Overlord 存储围绕可扩展的键值存储而设计。Azure Table Storage 是第一个编写的存储提供程序,但其他类型的键值数据存储(如 Redis)可以通过实现 IStorage 接口来使用。设备调用 API 发送设备读数。设备读数是一组一个或多个数据对,包含传感器名称和传感器值。传感器名称必须以 S、I、N、D、L、B 开头并以数字结尾,例如 N1、S25、B43 都是有效的传感器名称。每个有效的传感器名称对应一种传感器类型:String、Integer、Number (Double)、DateTime、Logical (bool) 和 Binary (byte[])。

设备读数以异步方式以高吞吐量发送到 API。HTTP API 服务器是一个运行在 Azure Websites(或 Azure Cloud Services)上的 ASP.NET MVC Web 应用程序,可以扩展以适应来自世界各地的任何数量的设备和读数。设备和用户还调用 API 来请求数据通道。通道是传感器数据的时间有序系列。通道是 Overlord 发布传感器数据的方式,也是用户和设备订阅的对象。一个通道可以包含来自许多不同设备的传感器。例如,我们可以创建一个通道,其中包含位于特立尼达的所有设备的温度传感器。

为了完成这种典型云应用的流入和发布数据,我们必须仔细考虑我们的数据流。

数据流

Overlord Data Flow

基本数据流是:

  1. 设备调用 REST API AddReading 方法,以键值对格式添加传感器数据。
  2. API 层将设备读数传递给存储层。
  3. 存储层将设备读数序列化为队列消息,并将消息放入 Digest 队列。
  4. Digest 进程从设备读数队列中 FIFO 读取多达 32 个读数,按设备 ID 对它们进行分组,然后在并行调用每个组的 storage.IngestSensorValues 方法。
  5. IngestSensorValues 分析每个设备读数中的每个传感器键值对,并将传感器数据写入设备通道表。每个设备都有一个设备通道表,其中包含该设备的所有传感器读数,按读数时间排序。
  6. IngestSensorValues 分析每个传感器值,并检查该值是否在为该传感器设置的任何警报的阈值内。
  7. 如果阈值在为传感器设置的警报范围内,IngestSensorValues 会为该设备的消息队列添加一条消息。
  8. IngestSensorValues 检查每个传感器是否属于任何其他通道,并将传感器值数据写入这些通道表。
  9. 设备或用户可以通过 REST API 查询通道数据。

云物联网应用的数据并行性

以下是我从 Azure Storage 设计以及微软模式和实践团队及其他地方的建议中吸取的关于在云应用程序中执行数据操作的一些经验教训。

  • 在进行网络或磁盘 I/O 时,一次性写入和读取或发送和接收尽可能多的数据会快得多。您应该尝试为单个操作利用尽可能多的磁盘和网络带宽。
  • 使所有 API 调用异步化,并使用队列按 FIFO 顺序处理传入的异步数据。
  • 将高吞吐量数据分区(分段),使其可以安全地并发写入并并行读取。在 Overlord 中,传感器读数是高吞吐量数据实体,设备 ID 和传感器名称构成了自然的划分方案。一个设备永远不会需要写入属于另一个设备的传感器数据,设备上的一个传感器也不会需要写入属于另一个传感器的传感器数据,或者需要顺序获取一个或多个设备的传感器数据。
  • 并行化您的 I/O,以便在分区数据上并发执行一次操作,而不是串行执行。
  • 使用 Task Parallel Library 和模式来执行并行操作,避免低级地管理自己的多线程。
  • 在 Azure Table Storage 中,尽可能查询 PartitionKey 和 RowKey 的精确匹配。这是 Table Storage 最快的读取操作类型,远快于对两个索引进行范围查询,或者更糟糕的是对属性值进行查询。

在云物联网应用中存储时间序列数据

我采用了 Semantic Logging Application Block 和 Windows Azure Diagnostics 团队用于在 Azure Table Storage 中存储高吞吐量时间序列数据的方法。对于我的传感器数据,我将每个行的 Partition key 设置为 UTC 时间的 ticks 数,四舍五入到最近的分钟。然后,我将读数的精确时间存储为 RowKey 或行属性之一,具体取决于我计划对数据进行的查询类型。这使我能够快速、并发地查询不同时间间隔内的传感器数据,同时对表 PartitionKey 和 RowKey 进行精确匹配。例如,如果我想查询过去一小时内添加到设备上的 4 个传感器的所有传感器数据,我可以执行一个并行查询,获取对应于过去一小时内每个分钟和每个 4 个传感器名称的行键的表行。理论上,我可以执行 60 个并发 Table Storage 查询来获取这些数据。

实体

Overlord 数据模型中有 8 个实体:用户、设备、传感器、读数、通道、通道项、警报和消息。基本上,一个用户拥有多个设备,并且可以添加、删除和更新每个设备的属性,如名称和位置。

用户实体 (IStorageUser)

ID Token ETag 名称 设备
{4567-2134-4444-1234} 密码 时间戳 用户 01 [{1234-6789-0004-1234},...]

一个设备有多个传感器。设备向 Overlord 注册单个传感器,并使用传感器名称将数据推送到 Overlord。

设备实体 (IStorageDevice)

ID Token ETag 用户 ID 名称 描述 Location 传感器
{1234-6789-0004-1234} 密码 时间戳 用户 01 设备 01 Raspberry Pi 2 Gasparillo, Trinidad W.I. S1, S2, N1

传感器实体 (IStorageSensor)

设备 ID (传感器) 名称 描述 单位 通道 警报
{1234-6789-000-1234} N1 数字温度传感器 摄氏度 [{5555-5555-6666-6666},...] [{0<val<100},...]

读数是一组传感器值以及设备传感器被读取并发送到 Overlord 的时间。

设备读数实体 (IStorageDeviceReading)

时间(到分钟) 时间(精确) 传感器值
0635631838800000000 0635631838834500000 {S1:"",S2="",N1:30}

通道是传感器数据的分组,形成一个按时间排序的序列。

通道实体 (IStorageChannel)

ID 名称 传感器类型 描述 单位 警报
{5555-5555-6666-6666} 特立尼达天气 N0 特立尼达的天气传感器 摄氏度 [{40<val<100},...]

通道项实体 (IStorageChannel)

时间(到分钟) 时间(精确) 设备 ID (传感器值)
0635631838800000000 0635631838834500000 {1234-6789-0004-1234} {N1:30}

警报实体 (IStorageChannel)

传感器类型 最小值 最大值 Message
N0 40 100 "太热了!"
N0 -30 10 "太冷了!"

消息实体 (IStorageChannel)

接收设备 ID DataType
{1234-6789-000-1234} S0 太热了!

实体数据规范化还是重复?

在关系数据库设计中,最重要的优点之一是规范化数据表:最小化相关实体之间数据的重复。这既有助于提高数据库性能,也(最重要的是)有助于数据的_一致性_。但在使用 Azure Table Storage 等结构化存储服务时,规范化可能是一种阻碍。实体之间的重复数据是完全可以的,并且极大地简化了查询。 

例如,我的设备通道数据可以存在于其他通道表中,而不是被迫通过关系在一个地方访问。这使我能够根据我计划存储的数据类型来调整我的存储提供程序的性能。

总的来说,在设计云物联网应用时,您需要仔细考虑使用预定义存储层(如关系数据库)是否优于围绕 Redis 或 Azure Table Storage 等基本键值存储设计您的存储层。围绕数据设计存储层而不是反过来,可能需要相同的工作量,并带来很多好处。在 Overlord 中,我选择了前者,这为我提供了最大的灵活性来最大化安全性、一致性和性能。我可以控制如何存储和检索时间序列数据(如传感器读数)以及离散实体(如用户和设备)。

日志记录

对于需要处理和执行海量数据和操作的云规模应用程序来说,日志记录至关重要。一个优秀的日志记录组件,能够最大程度地灵活表达您编写和分析日志事件的方式,可以让您监控云应用程序的每个方面,并追踪由编码错误、扩展性或性能问题导致的任何应用程序故障的根源。

Overlord 使用微软模式和实践库中的 Semantic Logging Application Block语义日志记录 是您使用代码而不是文本字符串来构建事件日志,并在如何收集和分析日志事件方面提供了极大的_一致性_和_强大功能_。解释语义日志记录的最佳方式是看它的实际应用。

[EventSource(Name = "AzureStorage")]
public class AzureStorageEventSource : EventSource
{
    public class Keywords
    {
        public const EventKeywords Configuration = (EventKeywords)1;
        public const EventKeywords Diagnostic = (EventKeywords)2;
        public const EventKeywords Perf = (EventKeywords)4;
        public const EventKeywords Table = (EventKeywords)8;          
    } 
    public class Tasks
    {
        public const EventTask Configure = (EventTask)1;
        public const EventTask Connect = (EventTask)2;
        public const EventTask WriteTable = (EventTask)4;
        public const EventTask ReadTable = (EventTask)8;          
    }

在这里,我们定义了一个事件源,其中包含我们要为 AzureStorage 组件记录的所有事件。我们按任务和关键字对事件进行分类。关键字是 2 的倍数,因此可以 OR 组合。

我们在 EventSource 上定义了用于记录消息的方法。这是语义日志记录的关键部分:使用表示应用程序正在做什么的方法。

[Event(5, Message = "FAILURE: Write Azure Table Storage: {0}\nException: {1}", Task = Tasks.WriteTable,
Level = EventLevel.Error,
            Keywords = Keywords.Diagnostic | Keywords.Table)]
internal void WriteTableFailure(string message, string exception)
{
    this.WriteEvent(5, message, exception);
}

[Event(6, Message = "SUCCESS: Write Azure Table Storage: {0}", Task = Tasks.WriteTable, 
Level = EventLevel.Informational,
    Keywords = Keywords.Diagnostic | Keywords.Table)]
internal void WriteTableSuccess(string message)
{
    this.WriteEvent(6, message);
}

现在,我们拥有了一个统一、一致的方式来记录对 Azure Table Storage 的读写操作,确保所有关于操作的相关信息都以一致的方式存储。Overlord 中的每个层都可以定义自己的事件源,并以与存储层相同的方式记录事件。我还使用扩展方法轻松捕获异常,而无需额外输入。

public static class AzureStorageEventExtensions
    {
      ...
        public static void ReadTableFailure(this AzureStorageEventSource ev, 
            string message, Exception e)
        {
            ev.ReadTableFailure(message, e.ToString());
        } 
    }

所以,我可以这样说:

catch (Exception e)
{
Log.WriteTableFailure(string.Format("Failed to add device entity: {0}, Id: {1}, Token {2}.",
    device.Name, device.Id.ToUrn(), device.Token), e);

您可以记录到不同的事件接收器,这只是不同日志目标的另一种说法。SLAB 足够灵活,可以将不同类型的事件记录到不同的位置。我选择记录到纯文本文件,因为这是最简单的选项。所以在我的日志文件中,有类似以下的条目:

EventId : 6, Level : Informational, Message : SUCCESS: Write Azure Table Storage: Added device reading entity: Partition: 0635631739800000000, 
RowKey: 9ac31883-f0e3-4666-a05f-6add31beb8f4_0635631740429485024, Sensor values: S1:9KR8hJMpk4s34VndWn16Z6GIMyvIAJWk0ZUkbNOe3vn,D1:3/28/2015 1:57:13 PM , 

每当我写入一个传感器值时。当我准备好一个更具可扩展性的日志记录组件时,我可以使用 Azure Table Storage 接收器来处理我的日志事件。

安全

这是另一个耗时完成的层。对于一个可能供数百万设备和用户使用的系统,安全性是至关重要的。我们必须确保每个用户和设备都隔离在单一的安全上下文中,无法访问任何未经授权的数据。我们必须最大限度地减少意外或故意损坏或泄露数据的可能性。

Overlord 使用基于声明的 身份授权 模型,以及基于角色的访问检查,使用来自 Windows Identity Framework 和 .NET System.Security.Permissions 命名空间中的类。WIF Claims Programming Model 的一个详细介绍 在这里。WIF 提供了一种统一的方式来处理来自 Windows 用户帐户、Active Directory 甚至 Windows Live 等云标识提供程序的授权和身份验证提供程序。您可以将任何类型的身份验证方案插入 WIF,并使用 ClaimsPrinicpalPermission 等属性来声明性地断言应用程序的横切安全方面。

云应用程序通常由多个组件或层组成,这些组件或层在逻辑上或网络上相互隔离。例如,Overlord 存储库在逻辑上与其他应用程序分离,并将数据存储在与 Overlord Http 侦听器运行位置不同的网络服务器上。

声明代表一个安全断言,即 Overlord 的一个层声明并被其他层信任为真实。当设备或用户成功通过身份验证时,安全层会设置关于当前设备或用户身份和角色的声明,其他层(如 Storage 和 Core)会信任这些声明。当需要存储或 API 操作时,当前层必须断言该操作已被请求,以便较低层(如 Storage)可以信任该操作是故意的。

因此,例如,在我们的存储层中,DeleteUser 方法声明为:

[PrincipalPermission(SecurityAction.Demand, Role = UserRole.Administrator)]
[ClaimsPrincipalPermission(SecurityAction.Demand, Resource = Resource.Storage, 
Operation = StorageAction.DeleteUser)]
public bool DeleteUser(IStorageUser user)

在我们的存储层中,对于每个 CRUD 操作,我们都要求线程主体属于特定的角色。所以我们的 IStorage.DeleteUser 方法要求附加到我们线程的principal 的身份属于“Administrator”用户角色。但我们也要求在执行存储操作时,该操作是合法地从更高层发出的。我们通过使用一种称为 StorageAction.DeleteUser 的声明类型来做到这一点。

Overlord 的任何部分需要删除用户都必须将此声明添加到当前线程身份。如果缺少此声明,则对 IStorage.DeleteUser 的任何调用都会抛出安全异常。这最大限度地减少了 Overlord 无意中(或可能被恶意攻击者故意)执行任何层不明确声明它想要执行的存储操作的可能性。因此,可以进行如下测试:

[Fact]
public void CanAuthorizeDeleteUser()
{
    OverlordIdentity.InitializeUserIdentity(user_01_id.UrnToId(), "admin", new string[0]);            
    AzureStorage storage = new AzureStorage();
    OverlordIdentity.AddClaim(Resource.Storage, StorageAction.FindUser);
    IStorageUser user = storage.FindUser("d155074f-4e85-4cb5-a597-8bfecb0dfc04".ToGuid(), "admin");

    //Security Exception thrown here:
    Assert.Throws(typeof(System.Security.SecurityException), () => storage.DeleteUser(user));

    //Intialize an admin user identity:
    OverlordIdentity.InitializeAdminUserIdentity(user_01_id.UrnToId(), "admin");         

    //But exception is still thrown here because it doesn't have the correct StorageAction claim              
    Assert.Throws(typeof(System.Security.SecurityException), () => storage.DeleteUser(user));
}

因为当没有线程身份处于所需角色时访问存储层的任何方法都会抛出安全异常。但即使身份_确实_具有所需角色,它仍然需要正确的 StorageAction 声明来执行操作。

当我们的核心 API 库加载时,我们的安全层会初始化当前线程主体的身份。我们获取当前身份(可能是一个 Windows 身份),并从中创建一个新的 ClaimsIdentity。

private static void InitalizeIdentity()
{
    //Get the current user identity as a stock IIdentity
    IIdentity current_user_identity = Thread.CurrentPrincipal.Identity;
    //Create a new ClaimsPrincipal using our stock identity
    ClaimsPrincipal principal = new ClaimsPrincipal(new ClaimsIdentity(current_user_identity, 
        null, current_user_identity.AuthenticationType,
        ClaimsIdentity.DefaultNameClaimType, ClaimTypes.Authentication.Role));
    //Assign to our current thread principal
    Thread.CurrentPrincipal = principal;
    ClaimsIdentity new_user_identity = (ClaimsIdentity)Thread.CurrentPrincipal.Identity;                        
}

ClaimTypes.Authentication.Role 实际上是我们拥有的一个类型,我们为其定义了一个自定义名称,以避免与任何标准或 Windows 角色冲突。

namespace Overlord.Security.ClaimTypes
{
 
    public class Authentication
    {
        public const string Role = "urn:Overlord/Identity/Claims/Roles";
        

这意味着我们可以添加、删除和检查应用程序角色,而不会干扰身份可能从其他地方获取的任何现有角色。

然后,我们将这个新身份对象分配给当前线程主体的身份。

ClaimsIdentity new_user_identity = (ClaimsIdentity)Thread.CurrentPrincipal.Identity;

一旦我们的 Overlord 库被卸载,并且线程主体的身份恢复到其执行我们库之前的状态,我们的新身份对象就会超出作用域。

在我们的应用程序配置中,我们声明了一个自定义授权管理器类。

<system.identityModel>
    <identityConfiguration>
      <claimsAuthorizationManager type="Overlord.Security.AuthorizationManager,Overlord.Security" />
    </identityConfiguration>
  </system.identityModel>

然后,当调用受保护的方法(如 DeleteUser)时,授权管理器可以决定如何授权调用。

 public override bool CheckAccess(AuthorizationContext context)
        {                        
            return OverlordIdentity.HasClaim((string) context.Resource.First().Value, 
               (string) context.Action.First().Value);            
        }

我们检查一个声明类型,该声明类型的名称是正在访问的资源(在上例中是 Storage),其值是正在执行的资源操作的名称(在上例中是 DeleteUser)。所以我们可以非常肯定,任何对存储层的调用都必须是由一个有意将存储操作添加到当前身份拥有的声明列表中的方法调用的。请注意,在我们的 DeleteUser 方法的最后,我们执行

OverlordIdentity.DeleteClaim(Resource.Storage, StorageAction.DeleteUser)

这会将声明从身份中删除,并确保任何进一步对存储层的调用都必须显式声明它们正在调用 DeleteUser 操作。

使用基于声明的授权允许云应用程序中的多个组件安全地通信安全上下文和授权信息。Overlord 将当前已认证的用户或设备 ID 存储为线程主体的身份上的声明。因此,每次执行操作时,关于请求该操作的用户或设备的信息始终可用于在同一线程上执行的所有代码。这消除了基于当前用户或设备 ID 频繁过滤查询的需要,并最大限度地减少了访问或修改不属于当前用户或设备的数据的可能性。

测试

测试是任何应用程序构建的基础。单元测试使我能够在不同层之间进行更改,而这些更改必须产生相同的行为。如果单元测试失败,则意味着该层行为不正常。

在处理 Azure Storage SDK 等 SDK 时,单元测试非常宝贵,因为它们可以捕获我犯下的错误(例如并发写入实体),然后再将组件集成到其他层。例如,考虑以下代码:

IStorageUser user = storage.FindUser(AzureStorageTests.user_02_id.UrnToGuid(), AzureStorageTests.user_02_token);

OverlordIdentity.AddClaim(Resource.Storage, StorageAction.AddDevice);
IStorageDevice device_01 = storage.AddDevice(user, 
    AzureStorageTests.device_01_name, AzureStorageTests.device_01_token, null, AzureStorageTests.device_01_id);

OverlordIdentity.AddClaim(Resource.Storage, StorageAction.AddDevice);
IStorageDevice device_02 = storage.AddDevice(user, AzureStorageTests.device_02_name, AzureStorageTests.device_02_token, null, 
    AzureStorageTests.device_02_id);

这段看似无害的代码实际上会与 Azure Table Storage 产生并发错误。为什么?因为在我的 AddDevice 方法中,我说:

TableOperation update_user_operation = TableOperation.Merge(CreateUserTableEntity(user));
result = this.UsersTable.Execute(update_user_operation);

我在添加设备时更新用户对象。但是当我尝试添加第二个设备时,我仍然在使用原始用户对象。当我尝试第二次更新用户时,Azure Table Storage 会抱怨,因为我使用的是用户对象的先前版本。在我 AddDevice 方法中,我需要做的是:

TableOperation update_user_operation = TableOperation.Merge(CreateUserTableEntity(user));
result = this.UsersTable.Execute(update_user_operation);
user.ETag = result.Etag;

这将更新后的 ETag(Azure Table Storage 在更新用户操作后返回)放在我正在添加设备的 `User` 对象上。

我使用 xUnit 进行单元测试。xUnit 可作为 NuGet 包获得,并具有 GUI 测试运行器,该运行器集成到 Visual Studio IDE 的测试 GUI 中。

测试数据

我使用以下方法生成测试数据。这两个方法生成随机字符串和日期时间。

public static Random rng = new Random();
/// <summary>
/// Genarate a random string of characters. Original code by Dan Rigby:
/// http://stackoverflow.com/questions/1344221/how-can-i-generate-random-alphanumeric-strings-in-c
/// </summary>
/// <param name="length">Length of string to return.</param>
/// <returns></returns>
public static string GenerateRandomString(int length)
{
    var chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
    var stringChars = new char[length];            
    for (int i = 0; i < stringChars.Length; i++)
    {
        stringChars[i] = chars[rng.Next(chars.Length)];
    }
    return new String(stringChars);
}
        
public static DateTime GenerateRandomTime(int? year, int? month, int? day, int? hour)
{
    int y = year.HasValue ? year.Value : DateTime.Now.Year;
    int m = month.HasValue ? month.Value : DateTime.Now.Month;
    int d = day.HasValue ? day.Value : DateTime.Now.Day;
    int h = hour.HasValue ? hour.Value : rng.Next(24);            
    return new DateTime(y, m, d, h, rng.Next(60), rng.Next(60))
}

传感器名称是以 S、I、N、L、D、B 开头,后面跟一个数字的字符串。字母指定传感器值类型:字符串、整数、数字(双精度)、逻辑(真/假)或字节数组。此方法生成一个包含随机有效传感器名称及其正确类型值的字典。

public static IDictionary<string, object> GenerateRandomSensorData(int num_sensors)
{
    string[] sensors = { "S", "I", "N", "L", "D", "B" };
    IDictionary<string, object> sensor_values = new Dictionary<string, object>();
    for (int i = 1; i <= num_sensors; i++)
    {
        string sensor_name = sensors[rng.Next(0, 5)] + rng.Next(num_sensors).ToString();
        if (sensor_values.Keys.Contains(sensor_name)) continue;
        if (sensor_name.ToSensorType() == typeof(string))
            sensor_values.Add(new KeyValuePair<string, object>(sensor_name, GenerateRandomString(20)));
            else if (sensor_name.ToSensorType() == typeof(DateTime))
                sensor_values.Add(new KeyValuePair<string, object>(sensor_name, GenerateRandomTime(null, null,
                        null, null)));
            else if (sensor_name.ToSensorType() == typeof(int))
                sensor_values.Add(new KeyValuePair<string, object>(sensor_name, rng.Next(5, 1000)));
            else if (sensor_name.ToSensorType() == typeof(double))
                sensor_values.Add(new KeyValuePair<string, object>(sensor_name, rng.NextDouble()));
            else if (sensor_name.ToSensorType() == typeof(bool))
                sensor_values.Add(new KeyValuePair<string, object>(sensor_name, rng.NextDouble() > 0.5));
            else
            {
                byte[] b = new byte[100];
                rng.NextBytes(b);
                sensor_values.Add(new KeyValuePair<string, object>(sensor_name, b));
            }
        }
        return sensor_values;
}

因此,我可以在测试中快速创建大量设备,然后添加一组有效的传感器数据。随着我的应用程序代码的增长,我运行的测试数量(尤其是并行数据操作的测试)也在增长。我可以使用 Task Parallel Library 的 Parallel.Foreach 来模拟设备并行添加传感器数据。

Parallel.For(0, devices.Count, d =>
{
    storage.AuthenticateAnonymousDevice(devices[d].Id.ToUrn(), devices[0].Token);                
    OverlordIdentity.AddClaim(Resource.Storage, StorageAction.AddDeviceReading);                
    storage.AddDeviceReading(TestData.GenerateRandomTime(null, null, null, null),                        
        TestData.GenerateRandomSensorData(10));
        IDictionary<string, object> sensor_values = TestData.GenerateRandomSensorData(10);
        OverlordIdentity.AddClaim(Resource.Storage, StorageAction.AddDeviceReading);
        storage.AddDeviceReading(TestData.GenerateRandomTime(null, null, null, null),
            TestData.GenerateRandomSensorData(sensor_values));

        //Sleep for a random interval
        Thread.Sleep(TestData.GenerateRandomInteger(0, 1000));

        //Add another set of sensor data
        OverlordIdentity.AddClaim(Resource.Storage, StorageAction.AddDeviceReading);
            storage.AddDeviceReading(TestData.GenerateRandomTime(null, null, null, null),
        TestData.GenerateRandomSensorData(sensor_values));
});

关注点

LINQ

当使用 LINQ 过滤器或可枚举类型的 .Contains() 方法比较对象时,默认情况下比较是_按引用_进行的。这意味着,即使两个对象的每个成员都相等,.NET 仍然不认为这两个对象是相等的。因此,例如,如果您在 Where 或 filter 条件中的两个对象不是同一个_引用_,LINQ 的 Select 操作可能无法按预期工作。

如果您想在比较对象时使用其他语义,则需要为您的对象实现 IEqualityComparator。因此,由于我存储对象的方式冗余,有时我会从 Table Storage 中取出两个不同的对象,并且希望它们可以被视为相等。Azure Table Storage 已经提供了 3 个字段来唯一标识一个对象:PartitionKey、RowKey 和 Timestamp。因此,对于我的 IStorageDevice 类,我说:

public class IStorageDeviceEq : IEqualityComparer<IStorageDevice>
    {
        public bool Equals(IStorageDevice d1, IStorageDevice d2)
        {
            if ((d1.Id == d2.Id) && (d1.Token == d2.Token) && (d1.ETag == d2.ETag))
            {
                return true;
            }
            else
            {
                return false;
            }
        }

        public int GetHashCode(IStorageDevice d)
        {            
            return (d.Id + d.Token + d.ETag).GetHashCode();
        }
    }
}

Id、Token 和 ETag 是我用来唯一标识 IDeviceStorage 对象的 3 个字段。所以这样可行:

IStorageUser user = storage.FindUser(user_01_id.UrnToGuid(), "admin");
    IStorageDevice device = storage.FindDevice(device_01_id.UrnToGuid(), 
        "XUnit_CanFindDevice_Test_Token");            
Assert.Contains(device, user.Devices, new IStorageDeviceEq());

即使对象引用 device 与 user.Devices 中的对象不同,使用 IStorageDeviceEq 比较器也允许 IEnumerable.Contains() 执行的比较将每个对象视为相同。

同时还请注意,我可以做到:

Assert.True(user.Devices.ContainsDevice(device));

因为在我的 IListExtensions 类中,我有一个扩展到 IList 的功能,它使用我的自定义 IEqualityComparator 来进行设备比较。这展示了扩展方法的有用性和强大功能,以及它们对代码可读性和正确性的贡献。

语义日志记录应用程序块

您的每个自定义 EventSource 类都是静态的,意味着在您的 AppDomain 或进程的所有类之间只共享一个副本。您可以在应用程序入口点一次性初始化您的事件侦听器,每个库或组件都会记录到同一个侦听器。例如,在我 Digest worker role 的 OnStartup() 方法中:

EventTextFormatter formatter = new EventTextFormatter() { VerbosityThreshold = EventLevel.Error }; 
digest_event_log_listener.EnableEvents(Log, EventLevel.LogAlways, AzureDigestEventSource.Keywords.Perf 
| AzureDigestEventSource.Keywords.Diagnostic); 
digest_event_log_listener.LogToFlatFile("Overlord.Digest.Azure.log", formatter, true); 
storage_event_log_listener.EnableEvents(AzureStorageEventSource.Log, EventLevel.LogAlways, AzureStorageEventSource.Keywords.Perf 
| AzureStorageEventSource.Keywords.Diagnostic); 
storage_event_log_listener.LogToFlatFile("Overlord.Storage.Azure.log", formatter, true);

这会将所有 AzureStorageEvent 和 AzureDigestEvents 发布到当前进程工作目录中的平面文件接收器。

如果您使用 FlatFile 事件接收器,那么您进行日志记录的每个进程都会对文件拥有锁定。这意味着,如果您尝试在同一进程中两次启用事件,则会由于文件锁定而遇到 System.IO 异常。

因此,您应该确保只在应用程序库加载时初始化您的事件侦听器一次。在我 worker role 的 OnStop() 方法中,当库被卸载时,我执行:

digest_event_log_listener.DisableEvents(AzureDigestEventSource.Log);
storage_event_log_listener.DisableEvents(AzureStorageEventSource.Log);

这会刷新并关闭任何活动的接收器。

历史

为参加 Azure IoT 竞赛创建了第一个版本。

2015 年 3 月 26 日星期四:更新了更多关于目标、存储和发布传感器数据、日志记录和测试的信息。

2015 年 3 月 31 日星期二:添加了设计图、存储部分。修复了拼写错误。

© . All rights reserved.