IEI Tank AIoT 开发套件和 Microsoft Azure:在边缘运行机器学习
本文将把一个基于 Python 的电机缺陷检测器解决方案转换为可部署的 Azure 模块。
使用 Microsoft Azure* IoT Edge 将机器学习模型远程部署到边缘,有助于扩展 IoT 应用程序。Azure IoT Edge 通过将解决方案容器化为一个模块,然后将其推送到边缘设备上运行。本文将把一个基于 Python* 的 电机缺陷检测器 解决方案转换为可部署的 Azure 模块。该电机缺陷检测器使用 K-means 聚类算法对电机轴承进行预测性维护,以确定它们是否会发生故障。该模块分析来自电机的模拟数据,然后将每个轴承的状态信息发送回 IoT Hub。
硬件
- Windows 10® 开发机器
- 预装 Ubuntu* 16.04 的 IEI Tank* AIoT 开发套件作为边缘设备
设置开发环境和 Microsoft Azure*
请在此 处 按照 Python 模块教程进行操作,以确保所有先决条件都已设置。它将引导您完成在开发机器上设置 Visual Studio Code 和其他资源,以及在 Azure 中创建所有必要的依赖项。在 Azure 中需要的主要组件是一个标准层 IoT Hub,一个已注册到该中心的 边缘设备,以及一个用于存储容器映像的注册表。
技巧
需要将 Cookie Cutter 添加到开发机器的“Path”环境变量中。它安装在开发机器上的位置如下。
C:\Users\\AppData\Roaming\Python\Python36\Scripts\
开发机器应使用 Python3,并且还需要安装以下组件,以便 Visual Studio Code 不会在教程代码中显示错误
pip install iothub_client pandas numpy sklearn scipy
如果 Visual Studio Code 中还没有 Pylint,可能也需要安装它。
pip install pylint
重新启动 Visual Studio Code,以便它可以找到安装的组件。
创建模块
在开发机器上,我们将使用 Visual Studio Code 创建要部署到 Tank 边缘设备的模块。
- 右键单击“modules”,然后选择“New IoT Edge Solution”。
- 将其命名为 MotorDefectDetectorSolution。
- 选择 **Python Module**,并将模块命名为 KmeansModule。
- 输入注册表地址:.azurecr.io/KmeansModule
- 新的 Edge Solution 将打开。
- 将来自电机缺陷检测器 GitHub* 的 *kmeanModel.npy* 文件复制到 KmeansModule。这是模型文件。
- 创建并从下方复制 *utils.py*。*Utils.py* 负责大部分数学计算。它已从 github 的 utils.py 文件进行了编辑,默认情况下改为使用第一个测试集,并删除了未使用的绘图函数。
#importing the libraries import numpy as np import pandas as pd #import matplotlib.pyplot as plt from scipy.fftpack import fft from scipy.spatial.distance import cdist #from sklearn import cluster #cal_labels function take no_of_files as input and generate the label based on 70-30 split. #files for the testset1 = 2148,testset2 = 984,testset3 = 6324 def cal_Labels(files): range_low = files*0.7 range_high = files*1.0 label = [] for i in range(0,files): if(i= range_low and i <= range_high): label.append(1) else: label.append(2) return label # cal_amplitude take the fftdata, n = no of maximun amplitude as input and return the top5 frequecy which has the highest amplitude def cal_amplitude(fftData,n): ifa = [] ia = [] amp = abs(fftData[0:int(len(fftData)/2)]) freq = np.linspace(0,10000,num = int(len(fftData)/2)) ida = np.array(amp).argsort()[-n:][::-1] ia.append([amp[i] for i in ida]) ifa.append([freq[i] for i in ida]) return(ifa,ia) # this function calculate the top n freq which has the heighest amplitude and retuen the list for each maximum def cal_max_freq(files,path): freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = ([] for _ in range(5)) for f in files: temp = pd.read_csv(path+f, sep = "\t",header = None) temp_freq_max1,temp_freq_max2,temp_freq_max3,temp_freq_max4,temp_freq_max5 = ([] for _ in range(5)) rhigh = 8 for i in range(0,rhigh): t = fft(temp[i]) ff,aa = cal_amplitude(t,5) temp_freq_max1.append(np.array(ff)[:,0]) temp_freq_max2.append(np.array(ff)[:,1]) temp_freq_max3.append(np.array(ff)[:,2]) temp_freq_max4.append(np.array(ff)[:,3]) temp_freq_max5.append(np.array(ff)[:,4]) freq_max1.append(temp_freq_max1) freq_max2.append(temp_freq_max2) freq_max3.append(temp_freq_max3) freq_max4.append(temp_freq_max4) freq_max5.append(temp_freq_max5) return(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5) def create_dataframe(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5,bearing): result = pd.DataFrame() result['fmax1'] = list((np.array(freq_max1))[:,bearing]) result['fmax2'] = list((np.array(freq_max2))[:,bearing]) result['fmax3'] = list((np.array(freq_max3))[:,bearing]) result['fmax4'] = list((np.array(freq_max4))[:,bearing]) result['fmax5'] = list((np.array(freq_max5))[:,bearing]) x = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]] return x
代码 1:utils.py - 将下方的 main.py 代码复制到默认的 *main.py* 文件中。Main 是程序将运行并向主题发送有关轴承状态的消息的地方。为了模拟生成数据,脚本将下载原始 GitHub 项目中使用的 NASA 数据集,并提取第一个测试集。然后,它将逐个文件将第一个测试集复制到 /tmp/test 文件夹。程序将从该文件夹中提取数据,从而模拟电机运行和随时间推移收集数据。
import random import time import sys import iothub_client # pylint: disable=E0611 from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError import pandas as pd import numpy as np from utils import cal_max_freq import os import urllib import shutil def checkBearings(hubManager): datadir= '/tmp/1st_test/' filedir = '/tmp/test/' try: if not os.path.exists(datadir): os.system("mkdir /tmp/test") print("data not found7, downloading") urllib.request.urlretrieve("https://ti.arc.nasa.gov/c/3/", "/tmp/IMS.7z") print("downloaded, now unzipping") os.system("7za x /tmp/IMS.7z -o/tmp/") os.system("unrar x /tmp/1st_test.rar /tmp/") print("unzipped") files = [x for x in os.listdir(datadir)] oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f))) os.rename(datadir + oldest, filedir + oldest) except IOError as e: print(e) print("error end") # load the model filename = "kmeanModel.npy" model = np.load(filename).item() # iteration for 1st_test data rhigh = 8 moredata= True while moredata: try: # load the files all_files = os.listdir(filedir) freq_max1,freq_max2,freq_max3,freq_max4,freq_max5 = cal_max_freq(all_files,filedir) except IOError as e: print("you have entered either the wrong data directory path or filepath ") print(e) print("error end") #testlabels = [] for i in range(0,rhigh): print("checking for the bearing",i+1) result = pd.DataFrame() result['freq_max1'] = list((np.array(freq_max1))[:,i]) result['freq_max2'] = list((np.array(freq_max2))[:,i]) result['freq_max3'] = list((np.array(freq_max3))[:,i]) result['freq_max4'] = list((np.array(freq_max4))[:,i]) result['freq_max5'] = list((np.array(freq_max5))[:,i]) X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]] label = model.predict(X) labelfive = list(label[-100:]).count(5) labelsix = list(label[-100:]).count(6) labelseven = list(label[-100:]).count(7) totalfailur = labelfive+labelsix+labelseven#+labelfour ratio = (totalfailur/100)*100 if(ratio >= 25): print("bearing"+ str(i+1) + " is suspected to fail") hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is suspected to fail", 0) else: print("bearing"+ str(i+1) + " is working in normal condition") hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is working in normal condition", 0) files = [x for x in os.listdir(datadir)] if len(files): oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f))) os.rename(datadir + oldest, filedir + oldest) else: moredata = False print("done") # messageTimeout - the maximum time in milliseconds until a message times out. # The timeout period starts at IoTHubModuleClient.send_event_async. # By default, messages do not expire. MESSAGE_TIMEOUT = 10000 # global counters RECEIVE_CALLBACKS = 0 SEND_CALLBACKS = 0 # Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported. PROTOCOL = IoTHubTransportProvider.MQTT # Callback received when the message that we're forwarding is processed. def send_confirmation_callback(message, result, user_context): global SEND_CALLBACKS print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) ) map_properties = message.properties() key_value_pair = map_properties.get_internals() print ( " Properties: %s" % key_value_pair ) SEND_CALLBACKS += 1 print ( " Total calls confirmed: %d" % SEND_CALLBACKS ) # receive_message_callback is invoked when an incoming message arrives on the specified # input queue (in the case of this sample, "input1"). Because this is a filter module, # we will forward this message onto the "output1" queue. def receive_message_callback(message, hubManager): global RECEIVE_CALLBACKS message_buffer = message.get_bytearray() size = len(message_buffer) #print ( " Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) ) map_properties = message.properties() key_value_pair = map_properties.get_internals() #print ( " Properties: %s" % key_value_pair ) RECEIVE_CALLBACKS += 1 #print ( " Total calls received: %d" % RECEIVE_CALLBACKS ) #hubManager.forward_event_to_output("output1", message, 0) return IoTHubMessageDispositionResult.ACCEPTED class HubManager(object): def __init__( self, protocol=IoTHubTransportProvider.MQTT): self.client_protocol = protocol self.client = IoTHubModuleClient() self.client.create_from_environment(protocol) # set the time until a message times out self.client.set_option("messageTimeout", MESSAGE_TIMEOUT) # sets the callback when a message arrives on "input1" queue. Messages sent to # other inputs or to the default will be silently discarded. self.client.set_message_callback("input1", receive_message_callback, self) # Forwards the message received onto the next stage in the process. def forward_event_to_output(self, outputQueueName, event, send_context): self.client.send_event_async( outputQueueName, event, send_confirmation_callback, send_context) # Send the message def send_event_to_output(self, outputQueueName, messsage, send_context): event=IoTHubMessage(messsage) self.client.send_event_async( outputQueueName, event, send_confirmation_callback, send_context) def main(protocol): try: print ( "\nPython %s\n" % sys.version ) print ( "IoT Hub Client for Python3" ) hub_manager = HubManager(protocol) print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol ) print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ") checkBearings(hub_manager) while True: time.sleep(1) except IoTHubError as iothub_error: print ( "Unexpected error %s from IoTHub" % iothub_error ) return except KeyboardInterrupt: print ( "IoTHubModuleClient sample stopped" ) if __name__ == '__main__': main(PROTOCOL)
代码 2:main.py - 更新 *requirements.txt*。这将安装电机故障检测器的依赖项。
azure-iothub-device-client==1.4.0 numpy>=1.11.2 scipy>=1.1.0 pandas>=0.23.4 scikit-learn>=0.19.1 sklearn>=0.0
代码 3:requirements.txt - 并更新 **Dockerfile.amd64**。请注意,容器默认仅包含 Python 2.7,因此需要安装 Python3 并更新 Python 路径。
FROM ubuntu:xenial WORKDIR /app RUN apt-get update && \ apt-get install -y --no-install-recommends libcurl4-openssl-dev libboost-python-dev p7zip-full unrar python3-pip python3-dev python3-setuptools && \ cd /usr/local/bin && \ ln -s /usr/bin/python3 python && \ pip3 install --upgrade pip && \ rm -rf /var/lib/apt/lists/* COPY requirements.txt ./ RUN pip3 install -r requirements.txt COPY . . RUN useradd -ms /bin/bash moduleuser USER moduleuser CMD [ "python3", "-u", "./main.py" ]
添加文件后,模块结构在 Visual Studio Code 中应如下所示。
部署模块
1. 在 Visual Studio Code 中,右键单击 deployment.template.solution,然后选择 **Build and Push IoT Edge Solution**。
构建包含所有必需项的容器需要一些时间。
2. 然后右键单击要部署到的 Azure IoT Hub 设备,然后选择 **Create Deployment for Single Device**。
3. 登录到 IoT Hub 边缘设备,本例中为 Tank。
使用以下命令监视进度
sudo iotedge logs KmeansModule –f
下载和提取数据需要一些时间。模块将首先下载数据,提取数据,然后开始将其复制到文件夹中。之后,它会将消息发送回 IoT Hub。
在开发机器上,可以通过右键单击 Visual Studio Code 中 **Azure IOT HUB Devices** 旁边的 **…** 并选择 **Start Monitoring D2C Message** 来查看消息。
Tank 上的有用模块命令
列出已安装的模块
iotedge list
移除容器
sudo docker rm -f KmeansModule
查看容器的日志
sudo iotedge logs KmeansModule –f
结论
现在,电机缺陷检测器 Python 项目已转换为 Azure IoT Hub 上的模块,并部署到边缘设备。下一步,Azure 可以利用路由将这些消息转化为可操作的事件。
作者简介
Whitney Foster 是 Intel Core and Visual Computing Group 的一名软件工程师,致力于物联网和计算机视觉的可扩展性赋能项目。