Let's IoT Hub: 教程 2
创建一个控制台应用程序,该应用程序监听您的 Raspberry Pi 3 上传到 Azure IoT Hub 的数据。
前言
本教程假定您具备一定的编程背景知识,但对 Raspbian(或任何 Linux 发行版)、GPIO 微处理器(Pi、Arduino 等)或 Python 没有先验经验。可能会引用之前的教程。
我主要使用的编程语言是 Java、C#.NET 和 VB.NET。您可能会注意到我的 Python 代码与 Python 的标准约定和格式之间存在一些细微差异。这不应影响您对代码的理解。
目标
将数据作为设备到云消息从您的 Raspberry Pi 3 发送到 Microsoft Azure 的 IoT Hub,并设置另一个应用程序来检索数据。
示例用例
- 室外温度传感器
- 用于宠物门的运动检测模块
- 繁忙街道的车辆计数系统
入门
您需要完成教程 0 和 1,才能使您的 Pi 处于最佳状态并拥有可用于工作的代码基础。如果您没有完成,本教程中的一般概念仍然适用,但代码示例将直接基于之前的教程。
事先阅读有关电路工作原理(包括电压和电流)的知识将有助于您理解。
Visual Studio(首选 2017 版,社区版或更高版本)也将是完成本教程后半部分所必需的。其他 IDE 可能也可用。
发送消息
返回教程 1 中完成的模块。如果您还记得,我们设置了一个 IoT 客户端并使用了服务器消息来切换 LED。这次,我们将添加一个额外的方法,该方法将接受一个 string
并自动将其传输到服务器,以及一个用于传输键值对以进行更具体数据上传的方法。
不过,我们还是从基础开始。请记住,我们应该将变量、方法和执行代码分开。首先创建一个用于发送消息的基本回调方法,以及我们用于发送简单 string
的方法。
def send_confirm_callback(message, result, context):
# Announces whether the message was successfully sent
print (“Message ID %s status: %s” % (message.message_id, result)
def sendText(str): # Sends a string to the connected IoT Hub
global CLIENT, send_confirm_callback
# Rely on the library to create the proper object
msg = IoTHubMessage(str)
# Set optional parameters
## msg.message_id = “id”
## msg.correlation_id = “cid”
print (“Sending message [%s]...” % (str))
# Send the message. (msg object, callback method, context)
CLIENT.send_event_async(msg, send_confirm_callback, 0)
提示:如前所述,请勿仅复制粘贴此处显示的代码。格式差异可能会导致 Python IDE 中出现意外后果。另外,请确保您使用的是 Python 2。
现在我们有了发送消息的方法,让我们修改执行代码以调用此函数。请注意,IoT Hub 的免费订阅每天限制您发送 8000 条消息,因此如果您希望您的 Pi 全天运行,您将需要仔细设置消息间隔。我们将保持安全,使用 15 秒的间隔。
...
# Begin code execution
turnLightOn() # Ensure start state is ON
print (“Starting IoT Hub Client...”)
main()
while True:
# (LED toggling line has been deleted)
# Tell server we’re awake
sendText(“hello, iothub”)
time.sleep(15)
每隔 15 秒,您的设备就会向服务器发送一条消息。运行模块并查看控制台输出。如果一切顺利,您的消息将成功发送并被 IoT Hub 接收。
现在,让我们暂时打开您的 Azure门户。导航到您的 IoT Hub,然后滚动到 Hub 的“指标”视图。在“可用指标”下,确保选中了“发送遥测消息”框。如果您的代码正在运行,您应该会在图表上看到一些响应,表明您的 Hub 确实正在接收 Pi 的消息。更新可能需要一些时间。
不幸的是,您无法通过 Azure门户查看原始数据。别担心,我们稍后会处理。现在,我们还将设置发送键值对的方法。当您的设备包含多个传感器并需要以有组织的方式报告数据时,键值对是最佳选择。此方法允许您发送一对键和值。
def sendKeyValuePair(key, value)
global CLIENT, send_confirm_callback
msg_txt = (“%s, %s” % (key, value))
msg = IoTHubMessage(msg_txt)
# Message properties are of an IoT HubMap object
propmap = msg.properties()
propmap.add(key, value)
# Set optional parameters
## msg.message_id = “id”
## msg.correlation_id = “cid”
# Send the message. (msg object, callback method, context)
CLIENT.send_event_async(msg, send_confirm_callback, 0)
如果您想尝试一下,请随意将 sendText
调用修改为使用带有两个参数的 sendKeyValuePair
。
发送输入值
为了使我们的 Pi 成为有用的 IoT 设备,它需要能够传输有关其实时状态的信息,而不仅仅是预先编程的消息。我们将设置一个相对简单的开关,以将值发送到服务器。
为了保护我们的 Pi免受过大电流的影响,我们需要使用几个电阻器以及一个开关来实现此功能。以下是电路的零件清单
- 按动开关(或仅是两个可以相互接触的导线)
- 一个电阻,最好是 1 千欧左右
- 一个电阻,最好是 10 千欧左右(或第一个电阻的 10 倍)
- 几根导线,具体取决于您如何设置面包板
如果左侧断开连接,Pi 的引脚将读取一个不确定的状态,因此这不能是一个简单的电源开关引脚电路。相反,我们的想法是让您的 Pi持续读取接地信号,以表示按钮未被按下。当通过开关施加“正”电压(将是 Pi 的 3V 输出)时,我们希望读取开关已被按下并且电流正在流动。为此,我们将以一种方式使用我们的电阻器,使其始终连接到接地,但会向 Pi传输电压信号而不会使其短路。
按照以下方式组装电路,电阻值应尽可能接近
请注意,正如教程 1 中提到的,您可以使用任何编号的 GPIO引脚。在本教程中,我们选择了引脚 5,如图所示。
现在我们需要一个代码块来读取引脚 5 的值。
def readSwitch():
# Reads and returns the signal value for pin 5
return GPIO.input(5) # = GPIO.HIGH or GPIO.LOW, 1 or 0
您可能还记得,我们必须添加一个设置行来将引脚 5 初始化为输入引脚!
...
# Set pin numbering mode
GPIO.setmode(GPIO.BCM)
# Set pin 6 (or whatever number you’re using)
GPIO.setup(6, GPIO.OUT)
# Set pin 5 (or whatever number you’re using)
GPIO.setup(5, GPIO.IN)
# Begin code execution
...
为了测试我们的电路(在您仔细检查连接后),让我们稍微修改一下 While
循环。我们将让 LED响应开关的状态。请注意,此效果通常会以模拟方式实现,LED 直接连接到开关。此代码在根本上是基于软件的开关。
...
# Begin code execution
turnLightOn() # Ensure start state is ON
print (“Starting IoT Hub Client...”)
main()
while True:
# (LED toggling line has been deleted)
# Tell server we’re awake
# sendText(“hello, iothub”) # this line is commented out
time.sleep(1) # reduce the delay
if readSwitch() == GPIO.HIGH:
# Pin is receiving non-ground voltage
turnLightOn()
else:
# Pin is grounded
turnLightOff()
运行模块。当您按下开关(或触摸功能上等效的导线)时,LED应该会亮起。当您松开手时,LED应该会熄灭。因为我们将计时器设置为每 1 秒间隔工作,所以响应可能不会立即发生,但它应该大致遵循开关的位置。
现在,让我们发送一些可能有用的数据。不要发送“hello, iothub
”,而是传输我们开关的状态。将 While
循环完全重写。
...
while True:
# Report status to server
sendKeyValuePair(“buttonState”, readSwitch())
toggleLight() # Indication that code is running
time.sleep(15) # 15 second interval
# (end of while loop)
...
运行您的模块,并在 Azure 上查看您的“指标”页面,以查看您是否收到数据。如果您没有立即看到已接收消息数量的激增,请不要担心;可能需要几秒钟才能刷新。恭喜您,您现在正在将 Pi 的物理状态上传到云端。
准备数据检索
在查找从 Pi检索数据的方法时,您可能会找到一个常见但有些无用的指南,该指南会引导您构建一个在 Azure网页环境中运行的 Web 应用程序。如果您需要以更特定的方式访问此数据,则需要构建一个程序来联系服务器并请求传输回数据。
为了继续,您需要在 Windows计算机上安装 Visual Studio。本指南将使用 .NET API连接到 IoT Hub 服务。
注意:如果您更喜欢 Java 而不是 .NET,则有适用于 Java 的 Event Hubs包。初始化和设置不一定与以下教程类似。有关更多详细信息,请参阅 Microsoft 帮助文档。
首先,让我们设置我们的项目。创建一个新项目,使用您选择的 .NET语言,并将其设置为控制台应用程序。保存项目并在提示时创建一个新解决方案。
在 Visual Studio 中,访问菜单“工具”>“NuGet程序包管理器”>“程序包管理器控制台”。这将打开一个命令行窗口,其中新行显示“PM>”。我们将使用此窗口快速下载和安装我们的依赖项。
运行以下行。每行需要几秒钟才能完成,具体取决于您的互联网和硬盘速度。
install-package microsoft.azure.eventhubs
install-package microsoft.azure.eventhubs.processor
安装这两个库(及其依赖项)后,我们现在需要将它们导入到我们的控制台应用程序中。
在本教程中,我将使用 VB.NET 来演示必要的代码以便于阅读。C# 的等效代码差别不大。
Imports Microsoft.Azure.EventHubs
Imports Microsoft.Azure.EventHubs.Processor
Imports System.Text
Module Module1 'or whatever you named your file
...
提示:VB 使用撇号(')表示注释。
不过,在继续编码之前,我们还必须准备好我们的 IoT Hub。具体来说,我们需要将传入的遥测数据重定向到一个存储容器,以便数据实际可收集。打开您的 Azure门户,让我们开始设置。
设置存储
注意:这部分会很复杂。请仔细按照每个步骤进行操作。
通过访问“创建资源”>“存储”>“存储帐户”>“创建”来创建存储。在向导中,设置一个唯一的名称(最好是与您的 Hub 名称相关的名称),并将资源组设置为与您的 Hub使用的资源组相同。帐户种类应为“常规用途”。
我们需要将传入的遥测数据重定向到存储帐户。打开您的 IoT Hub 页面,然后在“消息”下的菜单中找到“终结点”。添加一个 Azure存储容器,并选择您之前创建的存储帐户。这将带您到一个名为“容器”的菜单,并且该菜单将是空的。创建一个 Blob 或容器并选择它。保存您的容器名称。
添加一个新路由(同样在“消息”下)。数据源应为“设备消息”,终结点应为您刚创建的终结点。禁用该规则,或在查询 string
中写入“true
”,以便所有传入消息都重定向到存储。或者,如果您不创建新路由,则默认情况下所有消息都会进入“事件(消息/事件)”终结点。如果您希望尽量减少混乱,可以选择这样做;但是,这将使数据更难排序。
我们的数据现在遵循此路径
Pi -> IoT Hub Messages -> Your Endpoint -> Your Route -> Storage Account -> Storage Container
既然我们在这里,让我们记下一些非常重要的连接密钥。导航到您的存储帐户,然后在“设置”菜单下找到“访问密钥”。保存第一个连接字符串。
返回您的 IoT Hub页面,然后再次转到“终结点”。单击“事件”终结点(消息/事件),并保存事件中心兼容名称和终结点。
仍然在终结点的属性中,在底部添加一个新的“使用者组”。您需要输入的只是一个名称;“myapp
”就足够了。也请记住这个使用者组的名称。
开始编程
很棒!现在我们已经完成了所有这些设置,我们可以开始编码了。顺便说一句,默认情况下,消息会排队最多一天供消耗(并标记为已读)。如果您的 Pi 当前正在积极上传消息,您将需要下载大量消息才能看到最新更新。如果您尚未停止,请确保您的 Pi 的 Python 代码已使用 Ctrl+C 停止执行。
返回到您的控制台应用程序项目。我们将从一个新的异步方法开始。
Module Module1
Sub Main()
End Sub
Private Async Function BeginListening() As Task
End Function
Async
函数允许代码异步执行,这可以防止应用程序在下载或打印消息时意外挂起。因为我们正在等待服务器给我们新的消息,所以 Async
方法在这里很有效。
Private Async Function BeginListening() As Task
Dim CompatibleName as String = “iothub-ehub-...”
Dim ConsumerGroup as String = “myapp”
Dim CompatibleEndpoint as String = “Endpoint=sb://...”
Dim ConnectionString as String = “DefaultEndpointsProtocol...”
Dim ContainerName as String = “containerName”
提示:再次强调,不要复制粘贴代码。IDE 中的格式差异可能会导致意外行为。
为这些变量中的每一个填写正确的 string
。如果这些不正确,您的应用程序将无法正常运行。创建这些变量后,让我们实例化主机。
Private Async Function BeginListening() As Task
...
Dim host = new EventProcessorHost(
CompatibleName,
ConsumerGroup,
CompatibleEndpoint,
ConnectionString,
ContainerName)
End Function
为了防止控制台应用程序立即终止,我们将等待用户按键。
...
Dim host = new EventProcessorHost(
...)
Console.Writeline(“Listening. Press ENTER to stop.”)
Console.ReadLine()
Await host.UnregisterEventProcessorAsync()
End Function
等等,我们什么时候注册过事件处理器?我们还没有,因为我们需要自己实现它。不幸的是,API没有提供默认的事件处理器供我们使用。
...
End Function 'BeginListening
Partial Public Class BasicProcessor
Implements IEventProcessor
End Class
Partial
标签允许我们选择实现 IEventProcessor
中的哪些方法。由于有几个方法需要实现但我们目前并不真正需要它们,因此 Partial
非常合适。
Partial Public Class BasicProcessor
Implements IEventProcessor
Function ProcessEventsAsync(ByVal context as PartitionContext,
By Val messages As IEnumerable(Of EventData)) As Task
Implements IEventProcessor.ProcessEventsAsync
End Function
End Class
注意:函数声明应在一行上。我已将其分解为更小的部分以方便阅读。
现在我们只需要对 EventProcessorHost
查找并传递给我们的新声明函数的消息做些事情。目前,让我们只打印原始消息文本。
Function ProcessEventsAsync(...) Implements ...
For Each eventData In messages
Dim data = Encoding.UTF8.GetString(
eventData.Body.Array,
eventData.Body.Offset,
eventData.Body.Count)
Console.Writeline("Partition " & context.PartitionId & ", Data: " & data)
Next
Return context.CheckpointAsync()
End Function
我们只是将字节数组解码为 UTF8 string
,打印详细信息,然后通知 Host 我们已读取消息。哦,但别忘了我们发送的数据可能还包括一些键值对!如果您已完成到目前为止的 Python 教程,您的 While
循环将根据开关的状态发送 {“buttonState”, “0/1”}
。让我们也读取这个。
Function ProcessEventsAsync(...) Implements ...
For Each eventData In messages
Dim data = Encoding.UTF8.GetString(...)
Console.Writeline("Partition " ...)
Console.WriteLine(eventData.Properties.Count & " sets of Key-Value pairs")
For Each piece In eventData.Properties
Console.WriteLine("K-V: " & piece.Key & " - " & piece.Value.ToString)
Next
Next
Return context.CheckpointAsync()
End Function
注意:有一个嵌套的 For
循环。别忘了第二个 Next
语句。
最后,我们需要注册我们的事件处理器
...
Dim host = new EventProcessorHost(
...)
Await host.RegisterEventProcessorAsync(Of BasicProcessor)()
Console.Writeline(“Listening. Press ENTER to stop.”)
Console.ReadLine()
Await host.UnregisterEventProcessorAsync()
End Function
现在我们只需要从 Main
方法中调用我们的函数。您的最终代码看起来应该像这样
Imports...
...
Module Module1
Sub Main()
BeginListening.GetAwaiter.GetResult()
Console.Writeline("Finished. Press any key to exit.")
Dim a = Console.ReadKey
End Sub
Private Async Function BeginListening(...)...
...
End Function
End Module
Partial Public Class BasicProcessor
...
End Class
测试您的应用程序
生成并运行您的控制台应用程序。同时,在您的 Pi 上开始运行 Python 代码。确保您的 Python 代码正在成功将数据上传到您的 IoT Hub。
您的控制台输出应该为每条接收到的消息生成三行;第一行告诉我们消息在某个分区中被接收,以及与之相关的即时文本;第二行计算消息中包含的键值对的数量;第三行将打印电路开关的状态,并在键和值之间使用连字符(-)。
结论
恭喜您!您已经组装了一个电路,该电路可以指示您的 Raspberry Pi 3 的开关状态,生成并传输字符串以及键/值数据到您的 IoT Hub,并且创建了一个控制台应用程序来检索正在上传到您 IoT Hub 的数据。
接下来做什么?
找出您想在电路中包含哪些其他传感器,并按照其说明导入相关的库。发送更多键值对以及其他可能有用的数据。实现 IEventProcessor
中的其他函数。