物联网让微波炉玉米卷变得完美






4.97/5 (16投票s)
创建一个微波炉物联网应用程序
引言
微波炉实际上可以是相当复杂的烹饪设备。不幸的是,要充分利用它,用户必须输入一长串且通常不直观的功率设置、时间和暂停指令。最终,大多数用户只是按下1分钟,然后检查食物是否需要再加热一分钟,或者是否已经煮过头了。肯定有更好的方法。
提案
我们的项目结合了:一台经过改造以接入互联网的微波炉、一个包含微波炉信息的 Azure 数据库、预包装食品、针对食物/烤箱组合优化的食谱,以及一个扫描食物条形码以进行加热或烹饪的智能手机应用程序。这是一个团队项目,我的儿子 Canin 将负责 Azure 编程以及一个示例 Android 应用程序。我将负责改造微波炉并编写其支持互联网的应用程序。
架构概述
我们基于消息队列模式而非传统的 RESTful 架构设计,有几个重要原因。据估计,到2020年,将有超过500亿台设备连接到互联网,所有这些设备都将产生大量消息。其中一些消息将是无需确认的状态更新,而另一些将是交互式的,所有这些都将连接到微软 Azure 等基于云的服务。利用消息队列代理作为交换点,将生产者和消费者连接起来,促进了各种资源的完全解耦,同时允许协议本身根据负载扩展内部资源(如队列和交换)。
我们不得不使用 RabbitMQ 作为我们的消息代理,而不是 Azure 服务总线,原因有几个:我们熟悉 RabbitMQ,并且项目范围对学习 Azure 服务总线架构施加了时间限制,此外,Windows 10 尚未发布用于 Raspberry Pi,并且服务总线的 JavaScript 客户端仍在成熟中。然而,由于该架构是基于消息的,因此当工具到位时,迁移到 Azure 服务总线应该没有问题。
每个微波炉都将有一个二维码,其中包含一个带有两个字段的 JSON 对象。一个 GUID,用于向 Azure 服务注册并唯一标识设备,以及一个可选的设备标识符(品牌、型号)。这给了制造商两个选择。优选地,他们可以使用 GUID 向云服务注册每台设备。这不仅可以将设备的性能(功率、传感器等)与正确的菜单选择关联起来,还可以跟踪制造商对烤箱的所有者注册,以便进行保修、产品召回或必要的维护。另一个选择是制造商创建一个 GUID,供所有者稍后向云服务注册,以及可选的品牌/型号字段,用于正确的菜单选择。
RabbitMQ 支持基于主题的队列模型,允许消息队列的分层路由。我们正在使用此功能来控制诊断错误代码的分布。此消息的格式是
制造商.分类.型号.[硬件/软件].[信息/警告/严重]
这允许不同的利益相关方精确订阅所需信息。例如,制造商可能希望通过订阅“ACME.*.*.hardware.critical”来获取所有硬件错误代码。他们还可以通过订阅“ACME.Appliances.B7633S2.software.warning”来获取特定型号的软件警告。如果设备由所有者注册,则区域维修设施可以接收代码,并且如果签订了合同,零件可以自动订购并发送到当地经销商。由于消息队列的独特能力,任意数量的消费者可以订阅消息,并且服务可以动态扩展。
这件“东西”
这当然是一个物联网竞赛,我们项目中的“东西”就是微波炉。任何微波炉本质上都是一个处理器,使用键盘和 LCD 显示屏作为用户界面。用户通过一系列适合他们正在烹饪的食物的按钮操作来“编程”处理器。许多食物都有推荐的加热/烹饪程序,但我很少看到办公室休息室里有人真正遵循这些程序,通常都是选择按一下碰运气的方法。一个商用物联网微波炉将拥有一个额外的支持互联网的数据通道,作为替代的数据输入端口。对于这个项目,我们使用了一个在 Python 中编程的 Raspberry Pi 2 型号 B 作为控制器,对一个来自 craigslist 的微波炉进行了改装。我(Todd)曾希望 Windows 10 支持能够可用,但我不得不求助于 Python 编程。我说“求助于”是因为我以前从未使用过 Python,所以不得不边学边谷歌。因此,目前的代码有点像个黑客作品,但随着 Windows 10 对 Pi 的支持发布,它将被重写。
盖子由多个安全星形螺丝固定,这并非没有原因,所以现在可能是发表免责声明的好时机。
注意:使用安全螺丝的原因是微波炉在运行过程中会产生**致命电压**,除非您**完全**了解自己在做什么,否则绝不应打开和改装。即使拔掉电源,它们也会使用一个大的高压电容器,该电容器可以保持电荷,并且在进行任何其他操作之前必须安全放电。如果您不知道我在说什么,您就不应该取下盖子。
Pi 与 Azure 上的消息队列建立连接,以接收、验证和解释传入的加密命令。最初我打算使用固态交叉开关来模拟按键。几次尝试都失败了,可能是由于额外电路的附加电容。
我最终决定放弃模拟,完全移除控制板,并在 Pi 上编写整个微波炉操作系统。我保留了原始键盘,但不得不“钻入”一个新的液晶显示屏,并构建了一个带有高电流继电器(用于灯/风扇/转盘和磁控管)的控制板。键盘、继电器控制和安全联锁开关的专用连接器被拆下以供使用。
基于云的电器的一个优点是它能够执行诊断并报告故障。微波炉有一个高电流保险丝、一个烹饪腔火焰传感器和一个磁控管过温传感器。这三个项目与120伏电源电压串联,任何一个都可能导致微波炉出现“死机”状态。控制板不依赖于这个主电源总线,因此即使在传感器检测到故障时也能保持供电。我需要一种简单的方法来检测三个监控点处的电源丢失,所以我找到了一些多余的手机充电器并拆下了电路板。每个充电器的120伏输入端连接到各自的监控点,它们的5伏(通过一个1K电阻,因为Pi是3.3伏)连接到Pi。这些检测器上的电源丢失会导致下降沿,从而产生中断并发送消息。为了演示,我移除了磁控管过温传感器引线,并连接了一个外部按钮来模拟事件。
这是我们改装的示意图。
微波炉的最终版本。控制面板是可拆卸的,为了方便开发,我们为电源和 HDMI 创建了外部连接。烤箱程序未设置为自动启动,因此使用无线键盘和外部显示器来运行应用程序。
Azure 云
Azure 将维护一个微波炉品牌、型号及其支持命令的数据库,因为并非所有烤箱都支持所有可用命令。它还将包含预包装食品的 UPC 代码和相关的推荐加热程序。该数据库可以扩展,允许用户对食谱的质量进行投票,并提交他们自己针对特定食物的食谱。虽然不是本项目的一部分,但 Azure 机器学习 API 可以用于验证发送到微波炉的命令,以防止意外或恶意损坏微波炉。
该数据库还将用于验证用户微波炉和移动扫描应用程序。此身份验证将确定扫描后微波炉命令集的发送目的地,并防止黑客向未经授权的设备发送恶意命令。
移动应用程序
Android 应用程序将作为条形码扫描器,传输待加热物品的 UPC。它还将经过身份验证并与用户在 Azure 上的帐户关联,同时授权他们的微波炉。该应用程序还可以用于对所使用的特定食谱进行评分。
消息和通信
本项目关键的架构选择之一是各种元素之间的通信方式。为此,我们选择了高级消息队列协议 (AMQP) 消息代理 RabbitMQ。这个开源软件提供了一个抽象的数据中间件,使得消息传递对编码人员友好、高效,并且易于扩展和伸缩,这对于云环境来说非常适合。
RabbitMQ 的工作基于交换机和队列。生产者端点直接发布到给定的交换机,交换机可以直接发布到特定队列,也可以遵循绑定逻辑。消费者端点可以从预定义队列消费,也可以创建独占队列并通过路由逻辑将输入绑定到队列。配置选择完全取决于要实现的目标。
代理管理必须被确认 (ACK) 或被否定确认 (NACK,这也是断开连接或超时等错误情况下的默认行为) 的消息,确保错误消费的消息能够找到预期的接收者,从而为通信提供健壮性。在我们的项目的一些部分中,这很重要(与手机应用程序之间的通信),而在其他时候则不需要(当微波炉处理烹饪指令时,新的烹饪指令被认为是错误的,因此可以安全地丢弃)。
RabbitMQ 还有一些优势,因为它支持多种编程语言,包括 Python、C# 和 Java,所有这三种语言都在本项目中使用。除了这三种语言之外,它还支持 Perl、Ruby、Javascript/node.js、Erlang 和 Clojure,非常适合跨平台需求;虽然 C# 是 Windows VM 和通过 Entity Framework 与 SQL Server 通信的绝佳选择,但 Python 是 Raspberry Pi 的明确选择,而 Java 是 Android 应用程序的本机语言。
RabbitMQ 进一步支持身份验证,它通过 API 扩展了对自身、用户、队列和交换的所有管理。虽然 API 超出了我们项目的范围,但它确实允许在云环境中用户群扩展或缩小时进行自动化配置。
我们发现 RabbitMQ 是一项出色的技术。如果您对整个 AMQP 模型或在各种语言中实现 RabbitMQ 感兴趣,官方网站是一个很好的入门之地。一份 AMQP 快速参考指南可以帮助您入门,然后您可以深入了解您选择的语言中的六个教程。当然,您可以在本文中继续阅读,了解我们如何实现代理。
为了进一步抽象消息系统——允许以后添加新的模块,分别消费和发布到交换机和队列——我们选择采用非常常见的数据概念 JSON。由于其广泛的流行性,JSON 极易找到第三方库(甚至核心库),它们将数据对象序列化和反序列化为这种表示法。这意味着您可以相信未来的代码能够轻松地以您选择的语言消费这些对象。
为了安全起见,JSON 可以被加密,就像我们对服务器到微波炉的通信所做的那样。
使用代码
安卓应用
Android 应用程序旨在说明与云的连接。Android 应用程序是用户交互的关键部分,旨在说明与云的连接。该应用程序允许用户扫描食物,验证烹饪说明,然后将其推送到微波炉。
该应用程序提供了两种不同通信路径的示例:首先,它展示了远程过程调用 (RPC),其中手机发出请求(针对给定微波炉的烹饪说明的条形码扫描请求),然后是一个更标准的工作队列,其中批准的烹饪说明被推送到服务器。这两种交互都与 VM 服务器(特别是与 C# 代码)进行,但 RPC 更复杂一些,因为它不仅是发布者,而且还是返回消息的消费者。
我们把这种通信抽象成一个方法,我将对其进行分解。(顺便说一句,我不会在本文中包含整个方法,但您可以在源代码下载中找到它)
首先请注意,我将返回类型抽象为“T”。这使得该方法无需关心我从可能存在的队列中接收到的消息对象是什么,只需要在该方法被调用时指定即可。尽管该方法只使用了两次,但它很容易再次用于,例如,对服务器的登录调用,请求注册到帐户的微波炉,注册新的微波炉,或对烹饪说明食谱进行评分。
public <T extends BaseMessagingObject> void sendToRabbit(String message, String queue, boolean isRpc, Class<T> returnType ) throws Exception { String returnTypeName = "null";
接下来是代码设置与 RabbitMQ 服务器连接的地方。没有指定交换机或队列;这只是初始连接。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); factory.setPort(PORT); Connection connection; connection = factory.newConnection(); Channel channel = connection.createChannel();
接下来,代码声明了队列。向服务器声明队列或交换机实际上是请求创建该对象。RabbitMQ 的设置非常巧妙,如果您声明一个已经存在的队列或交换机,并且使用已有的参数,它将悄无声息地忽略您的请求。如果您声明一个存在但参数不同的队列或交换机,它将向您报告错误。这使得代码可以安全地始终声明它将处理的队列或交换机,以确保它存在,同时确保交换机或队列符合其预期。
由于我们的队列是静态的,我们不需要捕获具有不同参数的重复队列的错误,因此没有对队列声明使用错误捕获。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
接下来设置各种属性。其中大多数都是存根(stubbed out),因为它们只有在调用是 RPC 调用时才使用(或者至少,没有从默认值更改)。其中包括一个关联 ID。关联 ID 允许 RPC 调用的回复与原始请求匹配。如果您一次向交换机发送一批工作(因此很可能由许多不同的程序或线程处理,并在不同时间返回给您),并且匹配结果与原始请求很重要,那么这很重要。对于我们的项目,我们只使用关联 ID 来确认我们期望返回的单个消息。
Log.d("code", "Queue declared"); AMQP.BasicProperties props = null; String corrId = null; QueueingConsumer consumer = null; String response = null; Log.d("code", "Naming Exchange"); String exchangeName = ""; //QUEUE_NAME.replaceFirst("Queue","Exchange"); String excahngeType = ""; if(isRpc) { Log.d("code", "Starting RPC settings"); corrId = java.util.UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); Log.d("code", "Ending RPC settings"); } else { Log.d("code", "Not RPC so won't set those settings"); }
接下来我们进入实际事件,发布代码。手机处理发布的方式是它不需要绑定或路由已发布的消息,也不需要使用 RabbitMQ 的任何更高级功能,从而保持调用的通用性和简单性。您稍后将在服务器/微波炉交互中看到路由和绑定。
Log.d("code", "About to publish. exchangeName: " + exchangeName + " QueueName: " + QUEUE_NAME); channel.basicPublish(exchangeName, QUEUE_NAME, props, message.getBytes()); Log.d("code", " [x] Sent '" + message + "'");
消息正在前往交换机,并在(一两毫秒内!)进入队列。
接下来,如果是 RPC,它将进入这个简单的循环。一个 `while(true){ ... }` 使它一直运行,直到我们明确请求它中断。在循环内部,它会寻找发布到其唯一回复队列(上面生成的)的消息,并检查关联 ID。虽然消息被错误地发布到自定义返回队列实际上是不可能的——甚至是几乎不可能的——但验证出队列的消息是我们所期望的仍然是最佳实践。
我们的代码随后使用一个简单的返回消息类。该类允许消息的通用化历史记录和检索。虽然我不会详细介绍这个简单的类,但它的主要目的是通过托管的简单模式实现多线程的便捷性(服务器连接作为异步任务调用)。这个类可以扩展为发布/订阅模式,因此应用程序可以监听新的更新。
while (isRpc) { Log.d("code", "Waiting for return message"); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); Log.d("code", "Got message, checking if it matches corrID"); if (delivery.getProperties().getCorrelationId().equals(corrId)) { Log.d("code", "Found correct message"); response = new String(delivery.getBody()); ServerReturn<T> serverReturn = new ServerReturn<T>(); serverReturn.MessageClass = returnType; serverReturn.FromQueue = QUEUE_NAME; serverReturn.Timestamp = new Date(); serverReturn.Message = response; Log.d("code", "Adding to server return"); if(serverReturnManager == null) Log.d("code", "serverReturnManager is null!"); serverReturnManager.addItem(serverReturn); break; } } Log.d("code", "Cleanning up RabbitMQ"); channel.close(); connection.close(); Log.d("code", "Cleaned up RabbitMQ"); }
Android 应用的消息传递到此结束。虽然该应用可以轻松地通过附加功能扩展对服务器的额外调用,但这两种类型的调用——工作请求和过程调用——都说明了云集成以及解耦通信的必要性。
Azure 云
Azure 云以三种不同的方式使用:托管 RabbitMQ 服务器(在 Windows Server VM 上)、托管 C# 代码(通过同一个 Windows Server VM),以及托管数据层(通过基于云的 SQL Server 实例)。
RabbitMQ
我们使用的 RabbitMQ 服务器是相当直接的开箱即用型。如果您想玩转 RabbitMQ,我强烈建议安装 RabbitMQ 管理插件(其中包含一个 Web 界面),因为它大大简化了 RabbitMQ 服务器、其队列、交换机、用户和已处理消息的故障排除、可视化和基本管理。
我们为 C# 代码注册了一个用户,并为 Java 访问设备(手机)和 Python 硬件对象(微波炉)注册了各自 GUID 的用户。保持登录名的唯一性可以增加安全性,并在实际部署时对系统进行审计。出于管理原因,我建议为任何不友好的账户名称标记友好的标签。
我们项目中 RabbitMQ 的有趣之处与服务器安装关系不大,而更多地与我们的代码如何使用 RabbitMQ 与云交互有关,因此我在这里不会深入讨论消息代理服务器本身。
SQL Server
选择 SQL Server 是因为其对 Azure 的支持以及它通过 Entity Framework 轻松与 C# 通信的事实。我们对数据库采用了代码优先的方法,在 C# 中构建数据对象,然后使用 Entity 的内置逻辑将它们作为表推送。
对于正在寻找一种简单、敏捷的数据库构建方式,并且支持程序员思维的开发人员,我强烈推荐 Entity Framework。虽然它确实有其自身的局限性和细微差别,但它非常适合快速开发强大的应用程序,尤其是在像我们这样的小型团队中。
C# 或“我们云的心脏”
C# 用于创建基于代码的服务器,该服务器既充当所有通信的中心,也充当数据库的访问点。通过使用关系数据库 SQL Server 进行数据存储,并使用 RabbitMQ 进行消息传递,服务器甚至与自身的实例也是抽象的。为了说明这一点,服务器可以以多线程方式启动,或者作为多个实例在多个 VM 上并行运行。
C# 代码中有很多内容(包括数据和消息对象、与 Entity Framework 的交互以及多线程工作管理),但我将重点介绍 RabbitMQ 通信,特别是手机到服务器的通信。
如果您正在查看源代码,这具体在 CookingInstructionQueue 类中。
首先,它尝试将消息出队。我设置了一个特殊方法,该方法在超时时出队,并由一个名为 DoesTimeoutForMessageWait(int, out BasicDeliverEventArgs, SmartNukeQueueDescription) 的 try/catch 块包围。您不需要像我一样使用超时(没有它,代码将简单地永远等待队列中的消息),但超时有几个优点。首先,超时允许处理该方法的线程安全地检查是否请求了取消,其次,有时出队会挂在关闭的连接上,而超时允许您在需要时修复/重新建立连接。
try { if (!SmartNukeQueueLibrary.DoesTimeoutForMessageWait(timeout, out basicDeliverEventArgs, _fromInputQueue)) { Console.WriteLine(_fromInputQueue.QueueName + ": Started processing message.");
接下来,我调用一个简单的反序列化方法,该方法依赖于类库 JSON.Net。该方法允许我选择将消息反序列化为 JSON(本项目中使用的)、BSON,或者根本不反序列化。BSON 是 JSON 的一个改进,允许传输更复杂的数据(更接近二进制信息,而不是文本),并且显然具有其优点,但在本项目中并不需要。
CookingInstructionConfirmation confirmationMessage = SmartNukeQueueLibrary.Deserialize<CookingInstructionConfirmation>( Encoding.UTF8.GetString(basicDeliverEventArgs.Body), SerializeTypes.Json); Console.WriteLine(_fromInputQueue.QueueName + ": Deserialized.");
接下来进行简单的授权。它检查访问设备 GUID 是否与微波炉 GUID 相关联。虽然这在消息中很容易伪造,但可能的 GUID 数量之大使得随机 GUID 冲突几乎不可能,而发生双重冲突(在注册的微波炉和连接的访问设备(手机)上)的可能性更是微乎其微。顺便说一句,我们承认这不应该是生产应用程序中唯一的授权,但它是可以进行授权的**一个**实例。
值得一提的是,GUID 的一个重要优点是它允许实体的分布式注册;因为 GUID 冲突的可能性低得简直是天文数字,事实上,借用维基百科上一个生动的描述...
引用例如,考虑可观测宇宙,它包含大约 5×10^22 颗恒星;每颗恒星都可以拥有 6.8×10^15 个全球唯一的 GUID。
或者,引用 StackOverflow.com 上的 Tom Ritter 的话(http://stackoverflow.com/questions/184869/are-guid-collisions-possible)
引用要使冲突概率达到 1%,您需要生成大约2,600,000,000,000,000,000个 GUID
您可以以分布式方式生成 ID,而无需担心重复。这与物联网分布式模型非常吻合,也是 Active Directory 使用 GUID 的部分原因。但我离题了,回到代码...
if (DbManager.IsDeviceAuthorizedForDevice(confirmationMessage.OriginalRequestMessage.MessageCreatedBy, confirmationMessage.OriginalRequestMessage.MicrowaveID)) { var cookingInstruction = DbManager.GetCookingInstruction( confirmationMessage.OriginalRequestMessage.MicrowaveID, confirmationMessage.OriginalRequestMessage.Barcode); Console.WriteLine(_fromInputQueue.QueueName + ": Got cooking instruction.");
此时,指令要么已拉取,要么已生成错误消息。需要明确的是,指令是通过数据库查找匹配食物条形码和相关微波炉型号来找到的。
然后填充返回消息...
MicrowaveCookingInstructionMessage message = new MicrowaveCookingInstructionMessage { Instructions = cookingInstruction.CookingInstructionEvents, MessageCreatedBy = ServerProperties.ID, MicrowaveGuid = confirmationMessage.OriginalRequestMessage.MicrowaveID, Timestamp = DateTime.UtcNow, Title = cookingInstruction.FoodItem.FoodName }; Console.WriteLine(_fromInputQueue.QueueName + ": Ceated return message.");
var serializedCookingInstruction = SmartNukeQueueLibrary.Serialize(message, SerializeTypes.Json); Console.WriteLine(_fromInputQueue.QueueName + ": Serialized.");
并加密。使用 AES 加密——每条消息都有一个唯一的向量(或 IV)——确保通信安全。我们用此来表明消息可以很容易地在消息本身内进行安全传输,同时结合任何基于网络的安全性。如果将其扩展到一次性密钥或时间戳/ID 跟踪,则可以轻松避免重复消息注入。
var encrypted = MessageEncryptionManager.CreateEncryptedMessage(serializedCookingInstruction, confirmationMessage .OriginalRequestMessage .MicrowaveID); Console.WriteLine(_fromInputQueue.QueueName + ": Encrypted.");
代码已发布!总而言之,与 RabbitMQ 的通信方式与您在 Java 中看到的非常相似,但我以两种不同的方式编写它,以展示即使是简单应用程序的灵活性;一旦您理解了一种语言/驱动程序的 RabbitMQ,迁移到其他语言和驱动程序就会变得相当简单直接。我们有没有说过我们喜欢 RabbitMQ?
您会注意到我们使用微波炉的 GUID 作为路由键进行发布。监听微波炉已经将一个独占的自定义队列绑定到交换机,路由键就是微波炉的 GUID。然后 RabbitMQ 将消息从交换机复制到队列,这样微波炉就可以消费它。通过这种方式,消息被路由到正确的设备,并且可以从单个交换机向任意数量的微波炉供电,而无需担心消息丢失或错误消息到达错误的微波炉。
Console.WriteLine(_fromInputQueue.QueueName + ": Publishing."); _toHardwareObjectQueue.BasicPublish(Encoding.UTF8.GetBytes(encrypted), routingKey: confirmationMessage.OriginalRequestMessage.MicrowaveID.ToString()); Console.WriteLine(_fromInputQueue.QueueName + ": Published."); }
现在,如果有人使用无效的访问设备 GUID、无效的微波炉 GUID 或无效的访问设备与微波炉组合请求烹饪指令,我们就会收到错误消息。由于访问设备/微波炉的组合应该已在登录时建立,并在初始扫描请求时确认(授权失败时会向用户返回错误),因此假设此时的任何失败都是黑客尝试,因此不应向注入者发送任何响应。
else { Console.WriteLine("Unauthorized attempt from (as claimed, but unconfirmed) AccessDevice ID: " + confirmationMessage.OriginalRequestMessage.MessageCreatedBy + " for hardware device " + confirmationMessage.OriginalRequestMessage.MicrowaveID); }
接下来是 ACK 的分发。
if (_fromInputQueue.Channel.IsOpen) { _fromInputQueue.Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); } else { _fromInputQueue.Initialize(); } if (_toHardwareObjectQueue.Channel.IsOpen) { _toHardwareObjectQueue.Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); } else { _toHardwareObjectQueue.Initialize(); } }
这段代码是基于超时的 else 语句。如果超时,连接会关闭(如果打开)并重新建立,以防网络出现干扰连接的故障,导致超时。这并非严格要求(RabbitMQ 尚未在声称已连接但实际上未连接时误导我们),但它易于实现,并且几乎不消耗任何资源。
else { Console.WriteLine(_fromInputQueue.ExchangeName + " did timeout."); _fromInputQueue.Initialize(); _toHardwareObjectQueue.Initialize(); } }
如果发生严重故障,代码会尝试对消息进行 NACK(否定确认),以便 RabbitMQ 将消息重新排队。然后我们继续重新初始化队列,这将断开我们的服务器与 RabbitMQ 服务器的连接,从而默认 NACK 掉我们服务器消费的任何未处理消息。因此,我们的 NACK 调用是冗余的,但明确您的意图和消息传递仍然是良好的实践。
catch (Exception ex) { try { _fromInputQueue.Channel.BasicNack(basicDeliverEventArgs.DeliveryTag, false, !basicDeliverEventArgs.Redelivered); } catch (Exception exInner) { Console.WriteLine(exInner); } Console.WriteLine(ex); _fromInputQueue.Initialize(); _toHardwareObjectQueue.Initialize(); }
这就是其中一种通信情况的基本通信方法;烹饪确认已出队并反序列化,然后通过 SQL 服务器查找进行处理。创建了一个新消息,序列化并加密。然后将消息推送到所有微波炉已绑定的交换机,并且原始命令请求消息已从该方法消费的原始队列中 ACKed。相当时髦。
微波炉
#setup door interlock -- ground when door opens GPIO.setup(doorSwitch, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(doorSwitch, GPIO.BOTH, callback=door_open_callback, bouncetime=300) # setup flame sensor GPIO.setup(flameDetector, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.add_event_detect(flameDetector, GPIO.FALLING, callback=flame_callback, bouncetime=300) # setup magnetron over temp # #this is the correct setup for the sensor # #GPIO.setup(magnetronOverTemp, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) #GPIO.add_event_detect(magnetronOverTemp, GPIO.FALLING, callback=magtemp_callback, bouncetime=300) # This setup is for demo using external trigger switch GPIO.setup(magnetronOverTemp, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(magnetronOverTemp, GPIO.FALLING, callback=magtemp_callback, bouncetime=300) # setup fuse failure detect GPIO.setup(fuse, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.add_event_detect(fuse, GPIO.FALLING, callback=fuse_callback, bouncetime=300) # ------------------------------------------------------------------------ # flame_callback # ------------------------------------------------------------------------ def flame_callback(channel): send_email("Flame Sensor Alarm", MICROWAVE_GUID, CELL_TO_ADDRESS) send_diagnostic("Hardware", "ERROR", MICROWAVE_GUID + ": Flame Sensor Alarm") return # ------------------------------------------------------------------------ # magtemp_callback() # ------------------------------------------------------------------------ def magtemp_callback(channel): #send_email("Magnetron Overtemp Alarm", MICROWAVE_GUID) # Send Text as part of demo send_email("", MICROWAVE_GUID +" Magnetron Overtemp ALarm", CELL_TO_ADDRESS) send_diagnostic("Hardware", "ERROR", MICROWAVE_GUID + ": Magnatron Overtemp Alarm") return # ------------------------------------------------------------------------ # fuse_callback() # ------------------------------------------------------------------------ def fuse_callback(channel): send_email("Fuse Failure Alarm", MICROWAVE_GUID, CELL_TO_ADDRESS) send_diagnostic("Hardware", "ERROR", MICROWAVE_GUID + ": Fuse Alarm") return
为正常操作之外的事件(例如:门打开,以及火焰探测器或磁控管过热等传感器事件)设置了中断。中断具有关联的回调例程,用于处理中断,然后将控制权返回给主应用程序。传感器事件会生成一条消息,该消息路由到相应的诊断分层消息队列。
# ------------------------------------------------------------------------ # connect_azure() # ------------------------------------------------------------------------ def connect_azure(): print 'in connect' global channel global connection global queue_name try: credentials = pika.PlainCredentials(USER_ID, PASSWORD) connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST, 5672, '/', credentials)) channel = connection.channel() print "Channel defined" channel.exchange_declare(exchange = "CookingInstructionCommandExchange", durable = False, type = 'topic') result = channel.queue_declare(exclusive = True) queue_name = result.method.queue channel.queue_bind(exchange = "CookingInstructionCommandExchange", queue = queue_name, #"CookingInstructionCommandQueue" routing_key = USER_ID) Define que for diagnostic reports -- setup with routing key channel.exchange_declare(exchange = "DiagnosticExchange", durable = False, type = 'topic') result = channel.queue_declare(exclusive = True) queue_name = result.method.queue channel.queue_bind(exchange = "DiagnosticExchange", queue = "DiagnosticQueue", routing_key = create_routing_key()) except: logging.warning("Can't open RabbitMQ") return # ------------------------------------------------------------------------ # receive_from_que() # ------------------------------------------------------------------------ def receive_from_que(): #print "in receive_from_que" method_frame, header_frame, body = channel.basic_get(queue_name) #"CookingInstructionCommandQueue" if method_frame: print method_frame, header_frame, body print "Ready to ack" channel.basic_ack(method_frame.delivery_tag) return body else: return None
该应用程序建立了两个带有“主题”队列的 RabbitMQ 交换机,一个用于接收食谱,另一个用于传输诊断信息。队列管理器是在 Azure 上运行的进程。这使得微波炉与消息的消费者之间完全解耦。
# ---------------------------------------------------------------- # MainLoop # ---------------------------------------------------------------- def main_loop(): try: print "in main loop" state = "mainLoop" description = "Burritto" beep("YourMealIsReady.wav") update_display(0, "Ready") while (True): d = receive_from_que() if d: salt,msg,msgl = d.split("&") msgli=int(msgl) dd = decrypt_two(salt, msg, msgli) recipe(dd) #use j instead of d for testing without Azure/RabbitMQ end_cook() else: digit = check_keypad() if digit == None: continue else: manual_entry(d) time.sleep(.5) # Send Text send_email("", description +" is Finished Cooking", CELL_TO_ADDRESS) except Exception as e: logging.warning("Main loop try exception:{}".format(e)) GPIO.cleanup() connection.close() GPIO.cleanup()
初始化后,微波炉处于主(Main())循环中,等待键盘输入或来自队列的消息。当消息(JSON 对象)到达队列时,它会被解密并发送到 recipe() 函数。
# ------------------------------------------------------------------- # recipe() # ------------------------------------------------------------------- def recipe(json): global done_cooking global description done_cooking = False encoded = Payload(json) p = decrypt_two(encoded) description = p.Title update_display(0, description) update_display(1, "Start/Clear") beep("PressStartBeginCook.wav") # check for kepress d = start_cancel() if (d == "cancel"): cancel() return i = 0 try: arrayLength = len(p.Instructions) while (i < arrayLength): # print p.Instructions[i]['ID'] # print p.Instructions[i]['Type'] # print p.Instructions[i]['Time'] # print p.Instructions[i]['DisplayMessage'] # print p.Instructions[i]['PercentagePower'] if (p.Instructions[i]['Type'] == 1): #pause to cool operationTime = p.Instructions[i]['Time'] update_display(0, p.Instructions[i]['DisplayMessage']) new_cook(operationTime, power) elif (p.Instructions[i]['Type'] == 2): #this is a defrost cycle power = .5 operationTime = p.Instructions[i]['Time'] update_display(0, p.Instructions[i]['DisplayMessage']) new_cook(operationTime, power) elif (p.Instructions[i]['Type'] == 3): #This is standard time/power cook lcd.clear() operationTime = p.Instructions[i]['Time'] power = p.Instructions[i]['PercentagePower'] update_display(0, p.Instructions[i]['DisplayMessage']) new_cook(operationTime, power) #4 reserved for convection elif (p.Instructions[i]['Type'] == 5): #user interacation update_display(0, p.Instructions[i]['DisplayMessage']) beep("PleaseTurnFood.wav") lcd.clearRow(1) while (True): entry = check_keypad() if (entry == None): continue if (entry == stopClear): cancel() return elif (entry == start): break else: #this should beep print "incorrect key" i += 1 state = "endRecipe" update_display(0, "Burrito") update_display(1, "Is ready") beep("YourMealIsReady.wav") done_cooking = True except IndexError: logging.warning("Index Error in recipe") pass return # -------------------------------------------------------------------------- # new_cook() # -------------------------------------------------------------------------- def new_cook(hhmmss, power): MIN_TIME = 30 update_display(1, hhmmss) cookTimeSeconds = get_seconds(hhmmss) if (cookTimeSeconds < MIN_TIME): onTime = cookTimeSeconds offTime = 0 else: onTime = MIN_TIME * power offTime = 30 - onTime totalCycleTime = onTime + offTime startTime = time.time() timeToStopCooking = startTime + cookTimeSeconds secondsIntoCycle = 0 # Turn on light/fan/turntable control_main_relay(1) # Turn on magnetron and start cooking control_mag_relay(1) MagIsOn = True while (time.time() <= timeToStopCooking): displayRemainingTime = get_hhmmss(int(timeToStopCooking - time.time())) update_display(1, displayRemainingTime) loopStartTime = time.time() secondsRan = time.time() - startTime secondsIntoCycle = secondsRan % totalCycleTime #Turns Mag off if not supposed to be on if (MagIsOn) and (secondsIntoCycle > onTime): control_mag_relay(0) MagIsOn = False #Turns Mag on if supposed to be on if (not (MagIsOn)) and (secondsIntoCycle <= onTime): control_mag_relay(1) MagIsOn = True #update timers secondsLeftToSleep = loopStartTime + LOOP_TIME - time.time() time.sleep(secondsLeftToSleep) d = check_keypad() if (d == "cancel"): cancel() break # make sure magnetron is turned off!! control_mag_relay(0) #turn off main relay control_main_relay(0) HHMMSS = " " power = 0 return
recipe() 函数使用 GUID 的食谱部分来执行烹饪过程的每个步骤。
当 recipe() 遍历命令步骤时,每个步骤都会根据命令类型进行评估。一个步骤由描述、操作类型、时间、和功率级别组成。例如:烹饪步骤将显示“正在烹饪”并更新倒计时器,而用户交互将暂停,提示用户执行“翻转食物”或“等待搅拌”等操作,并在用户点击继续按钮时继续。对于已定义的烹饪事件,每一步都会调用 new_cook()。任何操作都可以通过点击停止/取消按钮来取消。在过程结束时,显示会更新,并且可以选择向手机发送一条短信。
关注点
我们制作了一段演示我们项目的短视频:http://youtu.be/S2FJmgqd840
历史