65.9K
CodeProject 正在变化。 阅读更多。
Home

通过 IoT 和 Microsoft Cloud 进行环境监测和异常检测

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.12/5 (3投票s)

2015年4月1日

CPOL

3分钟阅读

viewsIcon

22479

低成本环境监测传感器的可用性,加上 Microsoft 云的功能,为构建智能城市坚实的基础设施提供了巨大的机会。

引言

低成本环境监测传感器的可用性,加上 Microsoft 云的功能,为构建智能城市坚实的基础设施提供了巨大的机会。我们的应用程序中探索的架构如图 1 所示。

图 1. 环境监测应用

背景

该应用程序能够成功地从连接到 Arduino 兼容板 (Gertduino) 的传感器收集温度数据,该板连接到树莓派 (RPI) 板。一个 python 脚本捕获数据,并定期将其作为事件传输到 Microsoft 云中的事件中心 (Event Hub)。一个工作角色从事件中心的 16 个分区消耗事件,并将信息存储到表存储 (Table Storage) 中。用户使用一个 Web 角色访问该信息,该 Web 角色从表存储中提取该信息。该 Web 角色支持使用 HTML5、CSS3、JavaScript 和 Bootstrap 构建的响应式网站。WCF 服务用于按需从表存储中提取数据。

Using the Code

RPI 和事件中心之间的通信需要一个共享访问签名 (SAS) 密钥,该密钥是使用从 Microsoft MSDN 下载的开源签名生成器应用程序生成的(图 2)。

图 2. 共享访问签名生成应用程序

生成的密钥嵌入到 RPI 上运行的 Python 脚本中,捕获的事件(温度)数据使用 http 传输,采用 JSON 格式。安装并使用 “Requests” python 包来创建 POST 请求。使用 “Requests” 使创建这些请求变得非常容易。每个请求大约需要两秒钟才能执行。RPI 通过 USB WiFi 适配器连接到本地 WiFi 网络,并使用电源适配器为电路板供电。

下一个关键工作是创建一个消耗事件的工作角色。由于 RPI 位于单独的位置,因此构建了一个设备模拟器,以使测试更容易。设备模拟器的代码如下。

设备模拟器代码

using System;
using System.IO;
using System.Net;
using System.Threading;

namespace IoTDeviceSimulator
{
    class Program
    {
        static void Main(string[] args)
        {
            // Generate a SAS key with the Signature Generator.: https://github.com/sandrinodimattia/RedDog/releases
            var sas = "SharedAccessSignature sr=https%3a%2f%2fgqc-ns.servicebus.windows.net%2fgqc%2fpublishers%2fmypi%2fmessages&sig=uODLVMFxG3DwM5qGtINr3rkA%3d&se=64709&skn=senderdevice";
            // Namespace info.
            var serviceNamespace = "gqc-ns";
            var hubName = "gqc";
            var deviceName = "mypi";
            Console.WriteLine("Starting device: {0}", deviceName);

            var uri = new Uri(String.Format("https://{0}.servicebus.windows.net/{1}/publishers/{2}/messages", serviceNamespace, hubName, deviceName));

            // Keep sending.

            while (true)
            {
                var eventData = new
                {
                    Stage = new Random().Next(20, 50),
                    Flow = new Random().Next(10, 15)
                };

                var req = WebRequest.CreateHttp(uri);

                req.Method = "POST";

                req.Headers.Add("Authorization", sas);

                req.ContentType = "application/atom+xml;type=entry;charset=utf-8";

                using (var writer = new StreamWriter(req.GetRequestStream()))
                {
                    //{"piesense1":{Stage:12.2,Flow:20.2}, unixtimestamp:12345}
                    writer.Write(@"{" + deviceName.Replace("-", string.Empty) + ":");
                    writer.Write("{ Stage: " + eventData.Stage + ",");
                    writer.Write("Flow: " + eventData.Flow + "}");
                    writer.Write(", unixtimestamp: " + DateTime.Now.ConvertToUnixTimestamp());
                    writer.Write("}");
                }

                using (var response = req.GetResponse() as HttpWebResponse)
                {
                    Console.WriteLine("Sent stage using legacy HttpWebRequest: {0}", eventData.Stage);

                    Console.WriteLine(" > Response: {0}", response.StatusCode);
                }

                Thread.Sleep(60000);

            }
        }
    }

    public static class ExtensionMethods
    {
        public static Int64 ConvertToUnixTimestamp(this DateTime pDotNetDateTime)
        {
            pDotNetDateTime = TimeZoneInfo.ConvertTimeToUtc(pDotNetDateTime);
            if (pDotNetDateTime.Ticks >= 621355968000000000)
            {
                return (pDotNetDateTime.Ticks - 621355968000000000) / 10000000;
            }
            return 0;

        }
    }
}

RPI 和设备模拟器生成的事件由一个工作角色消耗(图 3)。

图 3. 消耗事件的工作角色(稍后提供代码片段)

如前所述,我们首先构建了一个桌面原型,创建了一个设备模拟器(图 4)和一个消费者,然后将该消费者代码移入一个工作角色(图 5)。

图 4. 一个设备模拟器(之前提供了代码片段)
private async Task RunAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                Trace.TraceInformation("Working");

                var partitionCount = 16;
                var serviceNamespace = "gqc-ns";
                var hubName = "gqc";
                var receiverKeyName = "receiver";
                var receiverKey = "uXZt2dtha/ULhgd=";

                CancellationTokenSource cts = new CancellationTokenSource();
                for (int i = 0; i <= partitionCount - 1; i++)
                {
                    await Task.Factory.StartNew((state) =>
                    {
                        Console.WriteLine("Starting worker to process partition: {0}", state);
                        var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri
                        ("sb", serviceNamespace, ""), new MessagingFactorySettings()
                        {
                            TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(receiverKeyName, receiverKey),
                            TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
                        });
                        var receiver = factory.CreateEventHubClient(hubName)
                            .GetDefaultConsumerGroup()
                            .CreateReceiver(state.ToString(), DateTime.UtcNow);

                        while (true)
                        {
                            // Receive could fail, I would need a retry policy etc...
                            var messages = receiver.Receive(10);
                            foreach (var message in messages)
                            {
                                var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject
                                <Dictionary<string, double>>(Encoding.Default.GetString(message.GetBytes()));
                               
                                Dictionary<string, EntityProperty> properties = 
                                	new Dictionary<string, EntityProperty>();
                                foreach (var item in eventBody.Keys)
                                {
                                    properties.Add(item, new EntityProperty(eventBody[item].ToString()));
                                }

                                //Console.WriteLine("{0} [{1}] Temperature: {2}", 
                                //DateTime.Now, message.PartitionKey, eventBody.Temperature);
                                DynamicTableEntity dte = new DynamicTableEntity("ccno1", 
                                DateTime.Now.ToString("yyyy-MM-ddTHH:mm:00zzz")) { Properties = properties };
                                CloudTable table = getSensorTable();
                                TableOperation insertOp = TableOperation.InsertOrMerge(dte);
                                table.Execute(insertOp);
                            }

                            if (cts.IsCancellationRequested)
                            {
                                Console.WriteLine("Stopping: {0}", state);
                                receiver.Close();
                            }
                        }
                    }, i);
                }
                await Task.Delay(1000);
            }
        }
图 5. 消耗事件并将它们插入表存储

事实证明这有点挑战性,因为工作角色的行为略有不同(它总是在运行循环中),与桌面应用程序相比。因此,必须非常仔细地检查异步任务的考虑事项。在阅读了几篇关于这个主题的文章后,我们能够组合一个以可靠方式运行的工作角色。表存储中生成的表如图 6 所示。

图 6. 事件数据的表存储

存储的测量结果可以与使用符合 HTML5、CSS3 和 JavaScript 的 Web 角色在必应地图 (Bing Maps) 背景下进行的环境模拟模型的输出一起显示。该模型由另一个工作角色执行,结果存储在 Blob 存储和表存储中。Web 角色在 Microsoft 云中配置,并托管 WCF 服务以从云存储中提取数据,以进行快速可视化。图 7 显示了建模结果(红色)和传感器数据(蓝色)的示例可视化。

图 7. 使用必应地图显示实时传感器数据

关注点

  1. 使用 PowerBI 进行可视化
  2. 使用流分析 (Stream Analytics) 处理事件中心中的事件
  3. 使用 HDInsight 将大型数据集从表存储移动到 HDInsight 集群中,然后处理这些数据集
  4. 使用 Microsoft 机器学习工作室 (Machine Learning Studio) 通过 REST API 直接连接到云存储,并运行适当的算法以检测数据中的异常。
© . All rights reserved.