通过 IoT 和 Microsoft Cloud 进行环境监测和异常检测






3.12/5 (3投票s)
低成本环境监测传感器的可用性,加上 Microsoft 云的功能,为构建智能城市坚实的基础设施提供了巨大的机会。
引言
低成本环境监测传感器的可用性,加上 Microsoft 云的功能,为构建智能城市坚实的基础设施提供了巨大的机会。我们的应用程序中探索的架构如图 1 所示。
背景
该应用程序能够成功地从连接到 Arduino 兼容板 (Gertduino) 的传感器收集温度数据,该板连接到树莓派 (RPI) 板。一个 python 脚本捕获数据,并定期将其作为事件传输到 Microsoft 云中的事件中心 (Event Hub)。一个工作角色从事件中心的 16 个分区消耗事件,并将信息存储到表存储 (Table Storage) 中。用户使用一个 Web 角色访问该信息,该 Web 角色从表存储中提取该信息。该 Web 角色支持使用 HTML5、CSS3、JavaScript 和 Bootstrap 构建的响应式网站。WCF 服务用于按需从表存储中提取数据。
Using the Code
RPI 和事件中心之间的通信需要一个共享访问签名 (SAS) 密钥,该密钥是使用从 Microsoft MSDN 下载的开源签名生成器应用程序生成的(图 2)。
生成的密钥嵌入到 RPI 上运行的 Python 脚本中,捕获的事件(温度)数据使用 http 传输,采用 JSON 格式。安装并使用 “Requests
” python 包来创建 POST
请求。使用 “Requests
” 使创建这些请求变得非常容易。每个请求大约需要两秒钟才能执行。RPI 通过 USB WiFi 适配器连接到本地 WiFi 网络,并使用电源适配器为电路板供电。
下一个关键工作是创建一个消耗事件的工作角色。由于 RPI 位于单独的位置,因此构建了一个设备模拟器,以使测试更容易。设备模拟器的代码如下。
设备模拟器代码
using System;
using System.IO;
using System.Net;
using System.Threading;
namespace IoTDeviceSimulator
{
class Program
{
static void Main(string[] args)
{
// Generate a SAS key with the Signature Generator.: https://github.com/sandrinodimattia/RedDog/releases
var sas = "SharedAccessSignature sr=https%3a%2f%2fgqc-ns.servicebus.windows.net%2fgqc%2fpublishers%2fmypi%2fmessages&sig=uODLVMFxG3DwM5qGtINr3rkA%3d&se=64709&skn=senderdevice";
// Namespace info.
var serviceNamespace = "gqc-ns";
var hubName = "gqc";
var deviceName = "mypi";
Console.WriteLine("Starting device: {0}", deviceName);
var uri = new Uri(String.Format("https://{0}.servicebus.windows.net/{1}/publishers/{2}/messages", serviceNamespace, hubName, deviceName));
// Keep sending.
while (true)
{
var eventData = new
{
Stage = new Random().Next(20, 50),
Flow = new Random().Next(10, 15)
};
var req = WebRequest.CreateHttp(uri);
req.Method = "POST";
req.Headers.Add("Authorization", sas);
req.ContentType = "application/atom+xml;type=entry;charset=utf-8";
using (var writer = new StreamWriter(req.GetRequestStream()))
{
//{"piesense1":{Stage:12.2,Flow:20.2}, unixtimestamp:12345}
writer.Write(@"{" + deviceName.Replace("-", string.Empty) + ":");
writer.Write("{ Stage: " + eventData.Stage + ",");
writer.Write("Flow: " + eventData.Flow + "}");
writer.Write(", unixtimestamp: " + DateTime.Now.ConvertToUnixTimestamp());
writer.Write("}");
}
using (var response = req.GetResponse() as HttpWebResponse)
{
Console.WriteLine("Sent stage using legacy HttpWebRequest: {0}", eventData.Stage);
Console.WriteLine(" > Response: {0}", response.StatusCode);
}
Thread.Sleep(60000);
}
}
}
public static class ExtensionMethods
{
public static Int64 ConvertToUnixTimestamp(this DateTime pDotNetDateTime)
{
pDotNetDateTime = TimeZoneInfo.ConvertTimeToUtc(pDotNetDateTime);
if (pDotNetDateTime.Ticks >= 621355968000000000)
{
return (pDotNetDateTime.Ticks - 621355968000000000) / 10000000;
}
return 0;
}
}
}
RPI 和设备模拟器生成的事件由一个工作角色消耗(图 3)。
如前所述,我们首先构建了一个桌面原型,创建了一个设备模拟器(图 4)和一个消费者,然后将该消费者代码移入一个工作角色(图 5)。
private async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
var partitionCount = 16;
var serviceNamespace = "gqc-ns";
var hubName = "gqc";
var receiverKeyName = "receiver";
var receiverKey = "uXZt2dtha/ULhgd=";
CancellationTokenSource cts = new CancellationTokenSource();
for (int i = 0; i <= partitionCount - 1; i++)
{
await Task.Factory.StartNew((state) =>
{
Console.WriteLine("Starting worker to process partition: {0}", state);
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri
("sb", serviceNamespace, ""), new MessagingFactorySettings()
{
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(receiverKeyName, receiverKey),
TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
});
var receiver = factory.CreateEventHubClient(hubName)
.GetDefaultConsumerGroup()
.CreateReceiver(state.ToString(), DateTime.UtcNow);
while (true)
{
// Receive could fail, I would need a retry policy etc...
var messages = receiver.Receive(10);
foreach (var message in messages)
{
var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject
<Dictionary<string, double>>(Encoding.Default.GetString(message.GetBytes()));
Dictionary<string, EntityProperty> properties =
new Dictionary<string, EntityProperty>();
foreach (var item in eventBody.Keys)
{
properties.Add(item, new EntityProperty(eventBody[item].ToString()));
}
//Console.WriteLine("{0} [{1}] Temperature: {2}",
//DateTime.Now, message.PartitionKey, eventBody.Temperature);
DynamicTableEntity dte = new DynamicTableEntity("ccno1",
DateTime.Now.ToString("yyyy-MM-ddTHH:mm:00zzz")) { Properties = properties };
CloudTable table = getSensorTable();
TableOperation insertOp = TableOperation.InsertOrMerge(dte);
table.Execute(insertOp);
}
if (cts.IsCancellationRequested)
{
Console.WriteLine("Stopping: {0}", state);
receiver.Close();
}
}
}, i);
}
await Task.Delay(1000);
}
}
事实证明这有点挑战性,因为工作角色的行为略有不同(它总是在运行循环中),与桌面应用程序相比。因此,必须非常仔细地检查异步任务的考虑事项。在阅读了几篇关于这个主题的文章后,我们能够组合一个以可靠方式运行的工作角色。表存储中生成的表如图 6 所示。
存储的测量结果可以与使用符合 HTML5、CSS3 和 JavaScript 的 Web 角色在必应地图 (Bing Maps) 背景下进行的环境模拟模型的输出一起显示。该模型由另一个工作角色执行,结果存储在 Blob 存储和表存储中。Web 角色在 Microsoft 云中配置,并托管 WCF 服务以从云存储中提取数据,以进行快速可视化。图 7 显示了建模结果(红色)和传感器数据(蓝色)的示例可视化。
关注点
- 使用 PowerBI 进行可视化
- 使用流分析 (Stream Analytics) 处理事件中心中的事件
- 使用 HDInsight 将大型数据集从表存储移动到 HDInsight 集群中,然后处理这些数据集
- 使用 Microsoft 机器学习工作室 (Machine Learning Studio) 通过 REST API 直接连接到云存储,并运行适当的算法以检测数据中的异常。