65.9K
CodeProject 正在变化。 阅读更多。
Home

IEI Tank AIoT 开发套件和 Microsoft Azure:在边缘运行机器学习

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2018 年 10 月 25 日

CPOL
viewsIcon

6668

本文将把一个基于 Python 的电机缺陷检测器解决方案转换为可部署的 Azure 模块。

使用 Microsoft Azure* IoT Edge 将机器学习模型远程部署到边缘,有助于扩展 IoT 应用程序。Azure IoT Edge 通过将解决方案容器化为一个模块,然后将其推送到边缘设备上运行。本文将把一个基于 Python* 的 电机缺陷检测器 解决方案转换为可部署的 Azure 模块。该电机缺陷检测器使用 K-means 聚类算法对电机轴承进行预测性维护,以确定它们是否会发生故障。该模块分析来自电机的模拟数据,然后将每​​个轴承的状态信息发送回 IoT Hub。

图 1:模块从开发机器到边缘设备的流程

硬件

  • Windows 10® 开发机器
  • 预装 Ubuntu* 16.04 的 IEI Tank* AIoT 开发套件作为边缘设备

设置开发环境和 Microsoft Azure*

请在此 按照 Python 模块教程进行操作,以确保所有先决条件都已设置。它将引导您完成在开发机器上设置 Visual Studio Code 和其他资源,以及在 Azure 中创建所有必要的依赖项。在 Azure 中需要的主要组件是一个标准层 IoT Hub,一个已注册到该中心的 边缘设备,以及一个用于存储容器映像的注册表。

技巧

需要将 Cookie Cutter 添加到开发机器的“Path”环境变量中。它安装在开发机器上的位置如下。

C:\Users\\AppData\Roaming\Python\Python36\Scripts\

开发机器应使用 Python3,并且还需要安装以下组件,以便 Visual Studio Code 不会在教程代码中显示错误

pip install iothub_client pandas numpy sklearn scipy

如果 Visual Studio Code 中还没有 Pylint,可能也需要安装它。

pip install pylint

重新启动 Visual Studio Code,以便它可以找到安装的组件。

创建模块

在开发机器上,我们将使用 Visual Studio Code 创建要部署到 Tank 边缘设备的模块。

  1. 右键单击“modules”,然后选择“New IoT Edge Solution”。
  2. 将其命名为 MotorDefectDetectorSolution。
  3. 选择 **Python Module**,并将模块命名为 KmeansModule。
  4. 输入注册表地址:.azurecr.io/KmeansModule
  5. 新的 Edge Solution 将打开。
  6. 将来自电机缺陷检测器 GitHub* 的 *kmeanModel.npy* 文件复制到 KmeansModule。这是模型文件。
  7. 创建并从下方复制 *utils.py*。*Utils.py* 负责大部分数学计算。它已从 github 的 utils.py 文件进行了编辑,默认情况下改为使用第一个测试集,并删除了未使用的绘图函数。
    #importing the libraries
    import numpy as np
    import pandas as pd
    #import matplotlib.pyplot as plt
    
    from scipy.fftpack import fft
    from scipy.spatial.distance import cdist
    #from sklearn import cluster
    
    #cal_labels function take no_of_files as input and generate the label based on 70-30 split.
    #files for the testset1 = 2148,testset2 = 984,testset3 = 6324
    def cal_Labels(files):
        range_low = files*0.7
        range_high = files*1.0
        label = []
        for i in range(0,files):
            if(i= range_low and i <= range_high):
                label.append(1)
            else:
                label.append(2)
        return label
    
    # cal_amplitude take the fftdata, n = no of maximun amplitude as input and return the top5 frequecy which has the highest amplitude
    def cal_amplitude(fftData,n):
        ifa = []
        ia = []
        amp = abs(fftData[0:int(len(fftData)/2)])
        freq = np.linspace(0,10000,num = int(len(fftData)/2))
        ida = np.array(amp).argsort()[-n:][::-1]
        ia.append([amp[i] for i in ida])
        ifa.append([freq[i] for i in ida])
        return(ifa,ia)
    
    # this function calculate the top n freq which has the heighest amplitude and retuen the list for each maximum
    def cal_max_freq(files,path):
        freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = ([] for _ in range(5))
        for f in files:
            temp = pd.read_csv(path+f,  sep = "\t",header = None)
            temp_freq_max1,temp_freq_max2,temp_freq_max3,temp_freq_max4,temp_freq_max5 = ([] for _ in range(5))
            rhigh = 8
            for i in range(0,rhigh):
                t = fft(temp[i])
                ff,aa = cal_amplitude(t,5)
                temp_freq_max1.append(np.array(ff)[:,0])
                temp_freq_max2.append(np.array(ff)[:,1])
                temp_freq_max3.append(np.array(ff)[:,2])
                temp_freq_max4.append(np.array(ff)[:,3])
                temp_freq_max5.append(np.array(ff)[:,4])
            freq_max1.append(temp_freq_max1)
            freq_max2.append(temp_freq_max2)
            freq_max3.append(temp_freq_max3)
            freq_max4.append(temp_freq_max4)
            freq_max5.append(temp_freq_max5)
        return(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5)
    
    
    def create_dataframe(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5,bearing):
        result = pd.DataFrame()
        result['fmax1'] = list((np.array(freq_max1))[:,bearing])
        result['fmax2'] = list((np.array(freq_max2))[:,bearing])
        result['fmax3'] = list((np.array(freq_max3))[:,bearing])
        result['fmax4'] = list((np.array(freq_max4))[:,bearing])
        result['fmax5'] = list((np.array(freq_max5))[:,bearing])
        x = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]]
        return x
    代码 1:utils.py
  8. 将下方的 main.py 代码复制到默认的 *main.py* 文件中。Main 是程序将运行并向主题发送有关轴承状态的消息的地方。为了模拟生成数据,脚本将下载原始 GitHub 项目中使用的 NASA 数据集,并提取第一个测试集。然后,它将逐个文件将第一个测试集复制到 /tmp/test 文件夹。程序将从该文件夹中提取数据,从而模拟电机运行和随时间推移收集数据。
    import random
    import time
    import sys
    import iothub_client
    # pylint: disable=E0611
    from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
    from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
    
    import pandas as pd
    import numpy as np
    from utils import cal_max_freq
    import os
    import urllib
    import shutil
    
    def checkBearings(hubManager):
        datadir= '/tmp/1st_test/'
        filedir = '/tmp/test/'
        try:
            if not os.path.exists(datadir): 
                os.system("mkdir /tmp/test")
                print("data not found7, downloading")
                urllib.request.urlretrieve("https://ti.arc.nasa.gov/c/3/", "/tmp/IMS.7z")
                print("downloaded, now unzipping")
                os.system("7za x /tmp/IMS.7z -o/tmp/")
                os.system("unrar x /tmp/1st_test.rar /tmp/")
                print("unzipped")
                files = [x for x in os.listdir(datadir)]
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
        except IOError as e:
            print(e)
            print("error end")
    
        # load the model
        filename = "kmeanModel.npy"
        model = np.load(filename).item() 
    
        # iteration for 1st_test data
        rhigh = 8
    
        moredata= True
    
        while moredata:
            try:
                # load the files
                all_files = os.listdir(filedir)
                freq_max1,freq_max2,freq_max3,freq_max4,freq_max5  =  cal_max_freq(all_files,filedir)
            except IOError as e:
                print("you have entered either the wrong data directory path or filepath ")
                print(e)
                print("error end")
    
    
            #testlabels = []
            for i in range(0,rhigh):
                print("checking for the bearing",i+1)
                result = pd.DataFrame()
                result['freq_max1'] = list((np.array(freq_max1))[:,i])
                result['freq_max2'] = list((np.array(freq_max2))[:,i])
                result['freq_max3'] = list((np.array(freq_max3))[:,i])
                result['freq_max4'] = list((np.array(freq_max4))[:,i])
                result['freq_max5'] = list((np.array(freq_max5))[:,i])
    
                X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]]
    
                label = model.predict(X)
                labelfive = list(label[-100:]).count(5)
                labelsix = list(label[-100:]).count(6)
                labelseven = list(label[-100:]).count(7)
                totalfailur = labelfive+labelsix+labelseven#+labelfour
                ratio = (totalfailur/100)*100
                if(ratio >= 25):
                    print("bearing"+ str(i+1) + " is suspected to fail")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is suspected to fail", 0)
                else:
                    print("bearing"+ str(i+1) + " is working in normal condition")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is working in normal condition", 0)
    
            files = [x for x in os.listdir(datadir)]
            if len(files):
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
            else:
                moredata = False
                print("done")
    		
    
    # messageTimeout - the maximum time in milliseconds until a message times out.
    # The timeout period starts at IoTHubModuleClient.send_event_async.
    # By default, messages do not expire.
    MESSAGE_TIMEOUT = 10000
    
    # global counters
    RECEIVE_CALLBACKS = 0
    SEND_CALLBACKS = 0
    
    # Choose HTTP, AMQP or MQTT as transport protocol.  Currently only MQTT is supported.
    PROTOCOL = IoTHubTransportProvider.MQTT
    
    # Callback received when the message that we're forwarding is processed.
    def send_confirmation_callback(message, result, user_context):
        global SEND_CALLBACKS
        print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        print ( "    Properties: %s" % key_value_pair )
        SEND_CALLBACKS += 1
        print ( "    Total calls confirmed: %d" % SEND_CALLBACKS )
    
    
    # receive_message_callback is invoked when an incoming message arrives on the specified 
    # input queue (in the case of this sample, "input1").  Because this is a filter module, 
    # we will forward this message onto the "output1" queue.
    def receive_message_callback(message, hubManager):
        global RECEIVE_CALLBACKS
        message_buffer = message.get_bytearray()
        size = len(message_buffer)
        #print ( "    Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        #print ( "    Properties: %s" % key_value_pair )
        RECEIVE_CALLBACKS += 1
        #print ( "    Total calls received: %d" % RECEIVE_CALLBACKS )
        #hubManager.forward_event_to_output("output1", message, 0)
        return IoTHubMessageDispositionResult.ACCEPTED
    
    
    class HubManager(object):
    
        def __init__(
                self,
                protocol=IoTHubTransportProvider.MQTT):
            self.client_protocol = protocol
            self.client = IoTHubModuleClient()
            self.client.create_from_environment(protocol)
    
            # set the time until a message times out
            self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
            
            # sets the callback when a message arrives on "input1" queue.  Messages sent to 
            # other inputs or to the default will be silently discarded.
            self.client.set_message_callback("input1", receive_message_callback, self)
    
        # Forwards the message received onto the next stage in the process.
        def forward_event_to_output(self, outputQueueName, event, send_context):
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
        # Send the message 
        def send_event_to_output(self, outputQueueName, messsage, send_context):
            event=IoTHubMessage(messsage)
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
    
    def main(protocol):
        try:
            print ( "\nPython %s\n" % sys.version )
            print ( "IoT Hub Client for Python3" )
    
            hub_manager = HubManager(protocol)
    
            print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
            print ( "The sample is now waiting for messages and will indefinitely.  Press Ctrl-C to exit. ")
    
            checkBearings(hub_manager)
    
            while True:
                time.sleep(1)
    
        except IoTHubError as iothub_error:
            print ( "Unexpected error %s from IoTHub" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "IoTHubModuleClient sample stopped" )
    
    if __name__ == '__main__':
        main(PROTOCOL)
    代码 2:main.py
  9. 更新 *requirements.txt*。这将安装电机故障检测器的依赖项。
    azure-iothub-device-client==1.4.0
    numpy>=1.11.2
    scipy>=1.1.0
    pandas>=0.23.4
    scikit-learn>=0.19.1
    sklearn>=0.0
    代码 3:requirements.txt
  10. 并更新 **Dockerfile.amd64**。请注意,容器默认仅包含 Python 2.7,因此需要安装 Python3 并更新 Python 路径。
FROM ubuntu:xenial

WORKDIR /app

RUN apt-get update && \
    apt-get install -y --no-install-recommends libcurl4-openssl-dev libboost-python-dev p7zip-full unrar python3-pip python3-dev python3-setuptools && \
    cd /usr/local/bin && \
    ln -s /usr/bin/python3 python && \
    pip3 install --upgrade pip && \
    rm -rf /var/lib/apt/lists/* 

COPY requirements.txt ./
RUN pip3 install -r requirements.txt

COPY . .

RUN useradd -ms /bin/bash moduleuser
USER moduleuser

CMD [ "python3", "-u", "./main.py" ]
代码 4:Dockerfile.amd64

添加文件后,模块结构在 Visual Studio Code 中应如下所示。

图 2:Visual Studio Code 中的 IoT Edge 解决方案

部署模块

1. 在 Visual Studio Code 中,右键单击 deployment.template.solution,然后选择 **Build and Push IoT Edge Solution**。

图 3:Build and Push IoT Edge Solution 位置

构建包含所有必需项的容器需要一些时间。

2. 然后右键单击要部署到的 Azure IoT Hub 设备,然后选择 **Create Deployment for Single Device**。

图 4:部署模块

3. 登录到 IoT Hub 边缘设备,本例中为 Tank。

使用以下命令监视进度

sudo iotedge logs KmeansModule –f

下载和提取数据需要一些时间。模块将首先下载数据,提取数据,然后开始将其复制到文件夹中。之后,它会将消息发送回 IoT Hub。

在开发机器上,可以通过右键单击 Visual Studio Code 中 **Azure IOT HUB Devices** 旁边的 **…** 并选择 **Start Monitoring D2C Message** 来查看消息。

图 5:监视设备到云的消息

图 6:IoT Hub 消息

Tank 上的有用模块命令

列出已安装的模块

iotedge list

移除容器

sudo docker rm -f KmeansModule

查看容器的日志

sudo iotedge logs KmeansModule –f

结论

现在,电机缺陷检测器 Python 项目已转换为 Azure IoT Hub 上的模块,并部署到边缘设备。下一步,Azure 可以利用路由将这些消息转化为可操作的事件。

作者简介

Whitney Foster 是 Intel Core and Visual Computing Group 的一名软件工程师,致力于物联网和计算机视觉的可扩展性赋能项目。

了解更多

© . All rights reserved.