集成 Azure IoT 后端以用于应用程序






3.44/5 (6投票s)
本文介绍如何集成 Arduino、RaspberryPi、Azure EventHubs、WorkerRole、Web服务、WCF 和 RESTful API,构建一个由通用应用连接的简单物联网信息服务后端。
引言
本文展示了从 Arduino 经过 Azure 到终端设备的完整数据路径,用于显示传感器数据。
Arduino(作为传感器)收集温度,并通过 RaspberryPi(作为网关)发送到 Azure EventHubs。
Azure 中有 4 个重要部分:
1. EventHubs 收集来自 RaspberryPi 的事件数据。
2. Azure CloudService Worker Role 将从 EventHubs 接收这些数据,并存储到 Azure 块存储中。
3. 一个由 Web服务编写的支持 SOAP 协议的网站。
4. 一个由 WCF + 启用 REST 支持的网站。
在客户端,在 Windows 8.1 和 Windows Phone 8.1 的同一个 Microsoft 通用应用中,实现了两种不同的方式来从 Azure 网站读取数据,作为可视化图表查看器。
背景
RaspberryPi 和 Arduino 的固件项目修改自 GitHub 上的 MSOpenTech Connect The Dots:https://github.com/msopentech/connectthedots
如果您是 Azure EventHubs 的新手,强烈建议您查看 ConnectTheDots 项目,并按照该项目来创建 EventHubs 服务以及您在 Azure 中需要的内容。
我通过阅读 Sandrino Di Mattia 的文章“Getting started Azure Service Bus Event Hubs: building a real-time log stream”学习了如何接收来自 EventHubs 的数据: http://fabriccontroller.net/blog/posts/getting-started-azure-service-bus-event-hubs-building-a-real-time-log-stream/
您不需要太多 Web 开发经验 :)
使用 WinRT XAML Toolkit 的折线图组件:http://winrtxamltoolkit.codeplex.com/
整个项目已上传到 Bitbucket,您应该在继续阅读之前下载: https://bitbucket.org/thkaw/eh_uapp_bundle/downloads
我在项目中没有删除任何连接字符串,所以您可以下载并直接运行通用应用(Windows 8.1、Windows Phone 8.1)来查看应用的样子。
但是在这个项目中,客户端并不是真正令人惊叹的部分,Azure 才是令人惊叹的部分 :)
架构
物联网场景的硬件应用已经足够,但在 Azure 云架构方面,当时并没有太多示例和场景。因此,我想构建一个传统/现代的后端,具有灵活的数据存储,可以让您选择云数据存储,不仅限于云端,还可以在非 Azure 存储架构中。
所以,这是我的计划。首先,根据 MSOpenTech 项目 ConnectTheDots,有很棒的开发者使用 Arduino 收集温度和湿度。然后通过 RaspberryPi 将这些数据传递到 Azure EventHubs。EventHubs 会直接将数据传递到 ASP MVC 5 网站显示实时数据。
但有一些不足之处,所以我想让它变得更完善。
我想要一个事件数据处理器,可以帮助我处理来自 EventHubs 的数据,例如存储它们,但为什么不使用 Stream Analytics 功能来存储这些数据呢?因为如果这些传感器数据涉及隐私问题,您可能希望将其存储在自己的数据库中。当然,这并非唯一的原因。您可以在 Worker Role 中进行更多编程来处理事件数据,对吧?
好的,第二部分是网站。目前 Windows Phone 8.1 由于 Windows Phone 缺少 System.ServiceModel 命名空间,仍然不支持 Web服务引用。参考: https://social.msdn.microsoft.com/forums/windowsapps/en-US/9ab43a4c-499a-4f2e-81e5-c1ab5acbe9bf/wcf-add-service-reference-not-supported-for-windows-phone-81-xaml-applications?forum=wpdevelop
这真的让我很烦恼。在通用应用中,我可以在 Windows 8.1 应用中使用 Web 服务,但在 Windows Phone 8.1 应用中不行。所以我决定使用 WCF + REST 来构建一个网站后端。
我希望设备应用能够可视化地查看一些值,例如温度(简单易收集)。
构建此架构的另一个原因是安全性,这意味着客户端不会直接访问您的 Azure 存储。它通过 Web 服务/WCF 访问。
同时构建一个 Android 应用,展示如何轻松地通过旧设备访问这些数据。
但您会想,为什么不使用 Mobile Service?当然可以,但我再次重申,我使用 WCF+Web 服务是因为它易于构建且具有更具编程性的场景。因此,您可以尝试调整我的架构,使用 Azure Mobile Service、Stream Analytics 等现代服务。
在开发阶段,我修改了 ConnectTheDots 的 Raspberry Pi 网关程序,使其可以随机发送温度数据到 Azure EventHubs。
还将其部署为 Web Role,这有助于在演示此架构时使其更简单。
Azure 中的主要构建
您可以查看 https://github.com/MSOpenTech/connectthedots/blob/master/Azure/AzurePrep/AzurePrep.md 来构建 EventHubs 和设备 AMQP 连接字符串等所需服务。
建议您实现 ConnectToDots 来理解 EventHubs 的工作原理,这将有助于您理解构建以下主题。
本文将不介绍如何创建 EventHubs。
后端代码
首先,从顶部的“Background”部分下载项目包。
解压它,打开“EH_UAPP_BUNDLE.sln”。解决方案 EH_UAPP_BUNDLE 中有 3 个项目文件夹和 2 个项目。
如果您想查看客户端应用的样子,可以打开通用应用文件夹并选择通用应用要运行的平台。
好的,我们先来看 RaspberryPiGateway。如果您想使用真实的硬件发送真实数据,可以注释掉 SIMULATEDDATA 定义,然后将其放到 RaspberryPi 上。
然后使用我的 Arduino 代码驱动 DHT22
// Example testing sketch for various DHT humidity/temperature sensors // Written by ladyada, public domain // Extend Azure IoT by Nathaniel Chen. #include "DHT.h" #define DHTPIN 2 // what pin we're connected to // Uncomment whatever type you're using! //#define DHTTYPE DHT11 // DHT 11 #define DHTTYPE DHT22 // DHT 22 (AM2302) //#define DHTTYPE DHT21 // DHT 21 (AM2301) // Connect pin 1 (on the left) of the sensor to +5V // Connect pin 2 of the sensor to whatever your DHTPIN is // Connect pin 4 (on the right) of the sensor to GROUND // Connect a 10K resistor from pin 2 (data) to pin 1 (power) of the sensor /*-----( Declare Constants )-----*/ #define LIGHTDIGIPIN 3 //Digital IO PIN3 #define LIGHTANLPIN 0 #define LEDPIN 13 // The onboard LED /*-----( Declare Variables )-----*/ int light_digital; /* Holds the last state of the switch */ float light_lvl; float h; float t; char SensorSubject[] = "wthr"; char DeviceDisplayName[] = "DXRDAA_IOTSensor01"; char DeviceGUID[] = "81E79059-A393-0630-1112-526C3EF9D64B"; DHT dht(DHTPIN, DHTTYPE); void setup() { Serial.begin(9600); // Serial.println("DHTxx test!"); dht.begin(); } void loop() { h = dht.readHumidity(); t = dht.readTemperature(); light_lvl = analogRead(LIGHTANLPIN); light_digital = digitalRead(LIGHTDIGIPIN); if (light_digital == LOW) { digitalWrite(LEDPIN, HIGH); //light_lvl = 1; } else { digitalWrite(LEDPIN, LOW); //light_lvl = 0; } // check if returns are valid, if they are NaN (not a number) then something went wrong! if (isnan(t) || isnan(h)) { Serial.println("Failed to read from DHT"); } else { /* Serial.print("Humidity: "); Serial.print(h); Serial.print(" %\t"); Serial.print("Temperature: "); Serial.print(t); Serial.println(" *C");*/ printWeather(); delay(3000); } } int sequenceNumber =0; void printWeather() { //Serial.println(); Serial.print("{"); Serial.print("\"dspl\":"); Serial.print("\""); Serial.print(DeviceDisplayName); Serial.print("\""); Serial.print(",\"Subject\":"); Serial.print("\""); Serial.print(SensorSubject); Serial.print("\""); Serial.print(",\"DeviceGUID\":"); Serial.print("\""); Serial.print(DeviceGUID); Serial.print("\""); Serial.print(",\"millis\":"); Serial.print(millis()); Serial.print(",\"seqno\":"); Serial.print(sequenceNumber++); Serial.print(",\"hmdt\":"); Serial.print(h, 1); Serial.print(",\"temp\":"); Serial.print(t, 1); Serial.print(",\"tempH\":"); Serial.print(t, 1); Serial.print(",\"lght\":"); Serial.print(light_lvl,2); Serial.println("}"); }
如果您想使用模拟数据(无需任何 raspberrypi、Arduino 硬件),则需要取消注释原始 ConnectTheDots RaspberryPiGateway 项目代码。在 Programe.cs 中,取消注释 SIMULATEDDATA 定义。
// In line 26 // #define DEBUG #define SIMULATEDATA // #define LOG_MESSAGE_RATE//
这会影响**第 473 行**的代码片段,它将随机向 Azure EventHubs 发送温度和湿度。
// In line 473 #if! SIMULATEDATA try { valuesJson = serialPort.ReadLine(); } catch (Exception e) { logger.Error("Error Reading from Serial Portand sending data from serial port {0}: {1}", serialPortName, e.Message); serialPort.Close(); serialPortAlive = false; } #else Random r = new Random(); valuesJson = String.Format("{{ \"temp\" : {0}, \"hmdt\" : {1}, \"lght\" : {2}, \"DeviceGUID\" : \"{3}\", \"Subject\" : \"{4}\", \"dspl\" : \"{5}\"}}", (r.NextDouble() * 120) - 10, (r.NextDouble() * 100), (r.NextDouble() * 100), "DXRDAA-SIM01", "wthr", "Simulator"); Thread.Sleep(5000); #endif
别忘了在文件 **“RaspberryPiGateway.exe.config”** 中更改 AMQPAddress。
<configuration> <appSettings> <add key ="EdgeGateway" value="R Pi"/> <add key="AMQPAddress" value="amqps://D1:R3RT%2FvshJ8ODBj6OIvX91bIzlDZMci01RBMeGfyIx68%3D@dxrdaa01suki-ns.servicebus.windows.net" /> <add key="EHtarget" value="ehdevices" /> </appSettings> </configuration>
如果您想在修改连接字符串时使用模拟器而不是真实硬件,请右键单击项目,然后**部署为 WebRole**。
下一个项目,查看“EH_CTD_CONSOLE01”。
我保留那个旧项目是因为它可以帮助您理解如何通过控制台 C# 代码连接 EventHubs。
此项目连接到 EventHubs 的代码修改自: http://fabriccontroller.net/blog/posts/getting-started-azure-service-bus-event-hubs-building-a-real-time-log-stream/
最重要的是在“AzureWebRole”文件夹中,“WorkerRole1”是 EH_CTD_CONSOLE01 的孪生兄弟。
我已经修改了使用 blob 存储温度值,而不是将其存储到 EH_CTD_CONSOLE01 的本地磁盘。
它已修改为适合部署到 Azure WorkerRole 服务。
所以,让我们检查 AzureWebRole/WorkerRole1/WorkerRole.cs。
首先,此函数将向 Azure 注册事件处理器,让 Azure 知道这里有一个事件处理器需要 Event Data,而 Event Data 在 eventHubName 中指定。
并将每 5 秒运行一次,以上传来自 Class Receiver 的实时温度数据,该 Receiver 将接收来自 EventHubs 的数据。
// private async Task RunAsync(CancellationToken cancellationToken) { Trace.TraceInformation("CTD CONSOLE RECIVER IN WORKERROLE. 2015/03/24"); //eventHubName, numberOfMessages, numberOfPartitions ParseArgs(new string[] { "ehdevices", "10", "8" }); string connectionString = GetServiceBusConnectionString(); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); Receiver r = new Receiver(eventHubName, connectionString); r.MessageProcessingWithPartitionDistribution().Wait(); // TODO: Replace the following with your own logic. while (!cancellationToken.IsCancellationRequested) { Trace.TraceInformation("Working"); CloudBlockBlob blockBlob = container.GetBlockBlobReference("tempblob.txt"); // Period upload EventHub data to blob. using (var fileStream = System.IO.File.OpenRead(@"temp.txt")) { blockBlob.UploadFromStream(fileStream); } await Task.Delay(5000); } }
接下来检查 **AzureWebRole/WorkerRole1/Receiver.cs**。
该文件显示如何注册一个 SimpleEventProcessor,以及您需要设置的内容以获取不同时间的事件数据。
public async Task MessageProcessingWithPartitionDistribution() { EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, this.eventHubName); // Get the default Consumer Group defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); string blobConnectionString = CloudConfigurationManager.GetSetting("StorageConnectionString"); // Required for checkpoint/state eventProcessorHost = new EventProcessorHost("singleworker", eventHubClient.Path, "App01", this.eventHubConnectionString, blobConnectionString, "ehdevices"); // Read EvnetData from EventHubs, adjust offset to latest position, make sure won't get old data. EventProcessorOptions eventProcessorOptions = new EventProcessorOptions(); eventProcessorOptions.InitialOffsetProvider = (partitionId) => DateTime.UtcNow; Trace.TraceInformation(">>>Registering Event Processor, Please wait.<<<"); await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(eventProcessorOptions); }
接下来检查 **AzureWebRole/WorkerRole1/SimpleEventProcessor.cs**。
该文件包含此 Worker Role 从 EventHubs 接收的事件。并处理、解析事件中的数据,将其保存到 Worker Role 的本地空间,然后在 WokerRole.cs 中定期上传到 Azure Blob。
所以您可以在此处替换为您自己的逻辑代码,例如分析数据、保存到外部 SQL 数据库等。
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events) { try { foreach (EventData eventData in events) { int data; var ReciveData = this.DeserializeEventDataCTD(eventData); string key = eventData.PartitionKey; // Name of device generating the event acts as hash key to retrieve average computed for it so far if (!this.map.TryGetValue(key, out data)) { // If this is the first time we got data for this device then generate new state this.map.Add(key, -1); } // Update data data = Convert.ToInt32(ReciveData.temp); this.map[key] = data; Trace.TraceInformation(string.Format("Data received. Partition: '{0}', Device: '{1}', TEMP: '{2}', HUMI: '{6}', Offset: '{3}', SequenceNumber: '{4}', UTCTime: '{5}'", this.partitionContext.Lease.PartitionId, key, data, eventData.Offset, eventData.SequenceNumber, eventData.EnqueuedTimeUtc, ReciveData.hmdt)); // Write temperature data to local space. using (StreamWriter sw = new StreamWriter(@"temp.txt")) { // Add some text to the file. sw.Write(data); } } // Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts. if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) { await context.CheckpointAsync(); this.checkpointStopWatch.Restart(); } } catch (Exception exp) { Trace.TraceInformation("Error in processing: " + exp.Message); } }
此项目中的最后一件事是记住 更改 AzureWebRole/WorkerRole1/app.config 设置为您自己的。
<appSettings> <!-- TODO: Change these three key's value to yours!--> <!--For local develop test--> <!--<add key="StorageConnectionString" value="UseDevelopmentStorage=true" />--> <!--Fill your blob connection string--> <add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=dxrdaa01sukistorage;AccountKey=zNIvaYb2Q1j+Kv3nMyRG3IJOoviw6LKfvD1Rq9y9zeNw5Pey+noAQhdBNEr0kXFrsuvOzeD0WlKA0B0+ccq6eA==" /> <!--Fill your servicebus connection string(Evnethubs)--> <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://dxrdaa01suki-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=gMy6B5W6H0VZqLxYzgEXZlvubKJQpvNVDFo3ov5N6aE=" /> </appSettings>
当您完成所有修改后,右键单击项目进行发布,更改上传设置为您自己的。
让我们来检查 Web 后端。
第一个是“EH_WCF_BACKEND”。
如果您有编写 WCF 的经验,您会发现很容易理解我在此网站上所做的工作。
在 EH_WCF_BACKEND\IService1.cs 中。
我添加了一个名为“GetEHTemp”的契约,并返回 XML 格式的数据,您可以根据需要更改为 JSON 格式。
[ServiceContract] public interface IService1 { [OperationContract, WebGet(UriTemplate = "GetData/{value}"),] string GetData(string value); // If you want using Json format as REST return value, please uncomment below //[OperationContract, WebGet(UriTemplate = "GetEHTemp", ResponseFormat = WebMessageFormat.Json)] [OperationContract, WebGet(UriTemplate = "GetEHTemp")] string GetEHTemp(); [OperationContract] CompositeType GetDataUsingDataContract(CompositeType composite); // TODO: Add your service operations here }
在 EH_WCF_BACKEND\ Service1.svc
展示了如何通过连接到 Azure Blob 来读取文本文件,并将其返回给客户端。
public string GetEHTemp() { try { // Retrieve storage account from connection string. CloudStorageAccount storageAccount = CloudStorageAccount.Parse( ConfigurationManager.AppSettings["StorageConnectionString"]); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference("eventlogs"); // Retrieve reference to a blob named "tempblob.txt". CloudBlockBlob blockBlob = container.GetBlockBlobReference("tempblob.txt"); string temp = blockBlob.DownloadText(); return temp; } catch (Exception e) { // Let the user know what went wrong. Console.WriteLine("The file could not be read:"); Console.WriteLine(e.Message); return "Error"; } }
别忘了在 Web.config 中将 StorageConnectionString 更改为您自己的。
<appSettings> <add key="aspnet:UseTaskFriendlySynchronizationContext" value="true" /> <!--TODO: Change connection blob string to yours--> <add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=dxrdaa01sukistorage;AccountKey=zNIvaYb2Q1j+Kv3nMyRG3IJOoviw6LKfvD1Rq9y9zeNw5Pey+noAQhdBNEr0kXFrsuvOzeD0WlKA0B0+ccq6eA==" /> </appSettings>
全部完成,通过右键单击项目名称上传网站。
下一个“EH_WS_BACKEND”比 WCF 项目更简单。
在 EH_WS_BACKEND \ WebService1.asmx 中。
您只需要处理 WebMethod 来从 Blob 读取数据。
[WebMethod] public string readEVTemp() { try { // Retrieve storage account from connection string. CloudStorageAccount storageAccount = CloudStorageAccount.Parse( ConfigurationManager.AppSettings["StorageConnectionString"]); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference("eventlogs"); // Retrieve reference to a blob named "tempblob.txt". CloudBlockBlob blockBlob = container.GetBlockBlobReference("tempblob.txt"); string temp = blockBlob.DownloadText(); return temp; } catch (Exception e) { // Let the user know what went wrong. Console.WriteLine("The file could not be read:"); Console.WriteLine(e.Message); return "Error"; } }
别忘了在 EH_WS_BACKEND/Web.config 中将 storageConnectionString 修改为您自己的!
<appSettings> <!-- Service Bus specific app setings for messaging connections --> <!--TODO: Change connection blob string to yours--> <add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=dxrdaa01sukistorage;AccountKey=zNIvaYb2Q1j+Kv3nMyRG3IJOoviw6LKfvD1Rq9y9zeNw5Pey+noAQhdBNEr0kXFrsuvOzeD0WlKA0B0+ccq6eA==" /> </appSettings>
最后,将此网站发布到 Azure。
应用程序代码
在应用程序中,我编写了一个通用应用项目,但使用不同的方式连接我的 Web 后端。
首先是 Windows Phone 8.1。
它不支持 Web 服务,因此需要连接到我之前构建的 WCF+REST 后端。
在 EH_APP_TEST.WindowsPhone\MainPage.cs 中。
我有一个定时器,每 5 秒触发一次以更新折线图。请注意,WCF URL 中必须添加随机字符串以避免 Web 内容缓存。
async void timer_Tick(object sender, object e) { // Using random addition value to avoid webdata cache. Random rr = new Random(); int rdn = rr.Next(1, 1000000); // TODO: Change to your site name. Uri uri = new Uri("http://dxrdaactdwcf.azurewebsites.net/Service1.svc/GetEHTemp?" + rdn); HttpClient httpClient = new HttpClient(); string result = await httpClient.GetStringAsync(uri); XDocument doc = XDocument.Parse(result); tb_temp.Text = doc.Root.Value; TempList.Add(new NameValueItem { Name = DateTime.Now.ToString("HH:mm:ss"), Value = Convert.ToInt32(tb_temp.Text) }); // Max node in line chart is 10, you can adjust this value. if (TempList.Count > 10) { TempList.RemoveAt(0); } UpdateCharts(TempList); }
接下来是 Windows 8.1。
它使用 Web 服务,因此连接方面有点复杂。
您需要修改您的 Web 服务后端,通过更新或删除当前 ServiceReference1 并创建新的 ServiceReference。
定时器触发与 Windows Phone 8.1 之间只有很小的差异。
您需要使用 Object 来获取 ServiceReference 中的数据。
async void timer_Tick(object sender, object e) { // Windows App using Webservice to get Temp data(Also can using WCF+REST) // TODO: In solution explore change ServiceReference URL to yours. ServiceReference1.WebService1SoapClient wsc = new ServiceReference1.WebService1SoapClient(); tb_temp.Text = await wsc.readEVTempAsync(); TempList.Add(new NameValueItem { Name = DateTime.Now.ToString("HH:mm:ss"), Value = Convert.ToInt32(tb_temp.Text) }); //Max node in line chart is 10, you can adjust this value. if (TempList.Count > 10) { TempList.RemoveAt(0); } UpdateCharts(TempList); }
好了,尝试运行,按下“开始”按钮,等待几秒钟。如果您做的所有事情都正确。屏幕上将显示一些数据J
关注点
在此场景中,您可以学习如何构建和部署 WorkerRole、WebRole、Website,以及操作 Azure 存储、Azure EventHubs。
当然,这只是一个简单的架构,向您展示了如何将这些技术联系在一起。
您可以基于本文了解更多关于 Azure Cloud 和物联网的内容J
历史
2015/03/25 - 初始版本
2015/03/27 - 调整语法