IEI Tank AIoT 开发套件和 AWS Greengrass:在边缘运行机器学习预测
在本教程中,我们将设置一个基本的机器学习预测模型,作为 Amazon Web Services (AWS) Lambda 函数在 AWS Greengrass 组中运行。
引言
在本教程中,我们将设置一个基本的机器学习预测模型,作为 Amazon Web Services (AWS)* Lambda 函数在 AWS Greengrass* 组中运行。我们将使用基本的 K-Means 聚类来训练电机故障预测模块。Lambda 函数将利用 Greengrass Core 的资源,该核心将设置在 IEI Tank* AIoT 开发套件上。IEI Tank AIoT 开发套件预装了 OpenVINO™ 工具包、Intel® Media SDK 和 Intel® System Studio 2018 等开发工具和 SDK,以帮助您加速部署。Lambda 函数将使用 MQTT 消息将 ML 预测过程的状态更新发送到 Greengrass 组。
必备组件
IEI TANK 配备 Ubuntu* 16.04 操作系统
AWS Greengrass 设置
首先,我们需要在 IEI TANK 上设置 Greengrass Core。请按照链接文档中模块 1 和 2 的说明进行操作,即 AWS Greengrass 中的 Greengrass 环境设置和安装 Greengrass Core 软件。
转到 AWS 控制台,从左上角功能区选择服务,在搜索栏中输入IoT,然后选择IoT Core。在IoT Core页面,从左下角选择软件。通过点击配置下载下载AWS Greengrass Core SDK。选择Python* 2.7,然后点击下载 Greengrass Core SDK。加载完软件包后,解压它。
tar –xzvf greengrass-core-python-sdk-1.0.0.tar.gz
进入 HelloWorld 文件夹并解压文件
cd aws_greengrass_core_sdk/examples/HelloWorld unzip greengrassHelloWorld.zip
解压后文件夹的内容将在本教程稍后用于创建 AWS Lambda 的 zip 文件夹。
IEI Tank* 设置
由于 AWS Greengrass 需要 Python* 2.7,因此我们需要安装专门针对 Python 2.7 的软件包。
sudo apt install python-pip sudo pip2 install pandas numpy matplotlib scipy sklearn sudo pip2 install -U pandas numpy matplotlib scipy sklearn
克隆 Motor-Defect-Detector GitHub* 存储库,然后进入 Kmeans 文件夹。
git clone https://github.com/intel-iot-devkit/motor-defect-detector.git cd motor-defect-detector/Kmeans/
我们将使用轴承数据集进行 K-means 基本模型训练和预测。通过访问网站下载轴承数据集。
安装用于提取文件的应用程序。
sudo apt-get install p7zip-full unrar
解压数据集。
7za x IMS.7z
解压 rar 文件(本教程仅使用第一个和第二个测试集)。
unrar x 1st_test.rar unrar x 2nd_test.rar
将代码降级到 Python* 2.7
在使用 GitHub 存储库代码之前,我们需要进行一些更改,将其从 Python* 3.5 降级到 Python 2.7,然后运行训练脚本。要自行修改脚本,请按照以下两个步骤操作。
在 *Kmeans* 文件夹中,打开 *kmeanstraining.py* 脚本并在第一行添加
from __future__ import print_statement
将文件中的 input
替换为 raw_input
,如下所示:
filedir_testset1 = raw_input("enter the complete directory path for the testset1")
或者,您也可以从本文的示例代码部分获取完全修改后的训练脚本 *kmeanstrainingall.py*。
训练模型
在 *Kmeans* 文件夹中,训练 K-means 模型并按照提示操作。
python kmeanstrainingall.py enter the complete directory path for the testset1 /<path-to>/motor-defect-detector/Kmeans/1st_test/ enter the complete directory path for the testset2 /<path-to>/motor-defect-detector/Kmeans/2nd_test/
在轴承数据集上进行训练,以改进电机故障预测。该方法输出 *kmeanModel.npy* 文件,该文件将用于实际的电机故障预测。
AWS* Lambda 设置
在本节中,我们将创建一个 *kmeans.zip* 压缩文件夹,并用它创建 AWS Lambda 函数。然后,我们将把 Lambda 部署到我们的 Greengrass 组中。
将 Greengrass 文件复制到 Kmeans 文件夹。
cp –r <path-to>/aws_greengrass_core_sdk/examples/HelloWorld/greengrasssdk .
从本文的示例代码部分创建 *kmeans_test.py* 并将其移至 *Kmeans* 文件夹。
将文件压缩成 zip 文件夹。
zip –r kmeans.zip greengrasssdk/ utils.py kmeanModel.npy kmeans_test.py
转到 AWS 控制台,点击左上角的服务,在搜索栏中输入Lambda,然后点击它。Lambda 管理控制台将打开。点击创建函数。
如果未选中,请选择从头开始创作并填写已概述的字段。
点击创建函数。
上传 *kmeans.zip*。将处理程序名称更改为 kmeans_test.function_handler. 点击保存。
点击操作,选择创建新版本,然后添加版本说明。点击发布。
转到 AWS IoT 控制台。从左侧菜单中选择Greengrass,在其下方选择组,然后从主窗口中选择您的组。
从左侧菜单中选择Lambda。点击屏幕右上角的添加 Lambda。
选择使用现有 Lambda。
从菜单中选择 kmeans_test,然后点击下一步。
选择版本,然后点击完成。
点击点状区域,然后选择编辑配置。
将内存限制更改为 1024 MB,将超时更改为 25 秒,并将Lambda 生命周期选择为长期运行的函数。
定位所需的环境变量。例如,要定位 numpy 等 Python 包,请运行此命令。
locate 2.7/dist-packages/numpy
添加环境变量以及软件包和 *2nd_test* 文件夹作为值的路径。
点击页面底部的更新。
点击灰色小后退按钮,选择资源。点击蓝色按钮添加本地资源。
创建本地资源以访问 IEI Tank 上的 *Kmeans* 文件夹。将 kmeans_test Lambda 附加到它,并具有读写访问权限。
为 Python 软件包文件夹和 *2nd_test* 文件夹创建另外两个本地资源,并具有只读访问权限。完成后,您应该会看到类似的屏幕。
转到订阅。点击添加订阅或添加您的第一个订阅。
对于源,从Lambda选项卡中选择,然后选择 kmeans_test。对于目标,选择IoT Cloud。
点击下一步。为主题添加 hello/world,然后点击下一步。
点击**完成**。
在组标题上,点击操作,选择部署,然后等待其成功完成。
转到 AWS IoT 控制台。从左侧菜单中选择测试。在主题字段中键入 hello/world,将 MQTT 负载显示更改为以字符串形式显示,然后点击订阅主题。
一段时间后,屏幕底部应会显示消息。
结论
我们已成功将用于电机故障检测的基本 K-means 模型设置为 Lambda 函数。作为下一步,您可以探索自动更新的功能。一个 Lambda 函数设置为查找新的测试集,一旦找到,它将触发新集合的自动下载,并基于这些集合创建新的学习脚本。然后,模型将更新,以提供新的、改进的预测。
示例代码
kmeanstraingall.py
from __future__ import print_function import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn import cluster from utils import cal_max_freq,create_dataframe,elbow_method import os try: # reading all the files from the testset1, and testset2 filedir_testset1 = raw_input("enter the complete directory path for the testset1 ") filedir_testset2 = raw_input("enter the complete directory path for the testset2 ") all_files_testset1 = os.listdir(filedir_testset1) all_files_testset2 = os.listdir(filedir_testset2) # relative path of the dataset, after the current working directory path_testset2 = "2nd_test/" path_testset1 = "1st_test/" testset1_freq_max1,testset1_freq_max2,testset1_freq_max3,testset1_freq_max4,testset1_freq_max5 = cal_max_freq(all_files_testset1,path_testset1) testset2_freq_max1,testset2_freq_max2,testset2_freq_max3,testset2_freq_max4,testset2_freq_max5 = cal_max_freq(all_files_testset2,path_testset2) except IOError: print("you have entered either the wrong data directory path for either testset1 or testset2") result1 = create_dataframe(testset1_freq_max1,testset1_freq_max2,testset1_freq_max3,testset1_freq_max4,testset1_freq_max5,7) result2 = create_dataframe(testset2_freq_max1,testset2_freq_max2,testset2_freq_max3,testset2_freq_max4,testset2_freq_max5,0) result3 = create_dataframe(testset1_freq_max1,testset1_freq_max2,testset1_freq_max3,testset1_freq_max4,testset1_freq_max5,2) result3 = result3[:1800] result4 = create_dataframe(testset2_freq_max1,testset2_freq_max2,testset2_freq_max3,testset2_freq_max4,testset2_freq_max5,1) result4 = result4[:800] #creating the final result print("creating the final result") frames = [result1,result3,result2,result4] result = pd.concat(frames) X = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]] #elbow method: to calculate the optimal no of cluster #elbow_method(X) #plt.show() #clustering print("clustering") k_means = cluster.KMeans(n_clusters = 8,n_init = 10,max_iter = 1000,n_jobs = -1,random_state = 42) kmeans_model = k_means.fit(X) label = kmeans_model.labels_ #plot the labels print("plotting the labels") plt.scatter((np.array(range(1,len(result)+1))),label) #save the model print("saving the model") filename = "kmeanModel.npy" np.save(filename,kmeans_model)
kmeans_test.py
from __future__ import print_function import time from threading import Timer import os import greengrasssdk import platform import pandas as pd import numpy as np import matplotlib.pyplot as plt from utils import cal_max_freq, plotlabels # Creating a greengrass core sdk client client = greengrasssdk.client('iot-data') # Retrieving platform information to send from Greengrass Core my_platform = platform.platform() def kmeans_test_run(): client.publish(topic='hello/world', payload='Started kmeans test run.') try: filedir = os.environ.get("TESTSET2") client.publish(topic='hello/world', payload='Got data dir.') #filepath ="2nd_test/" filepath = os.environ.get("TESTSET2FOLDER") client.publish(topic='hello/world', payload='Got data folder.') # load the files all_files = os.listdir(filedir) client.publish(topic='hello/world', payload='Got all files.') freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = cal_max_freq(all_files, filedir) client.publish(topic='hello/world', payload='Got all frequencies.') except IOError: print("you have entered either the wrong data directory path or filepath") client.publish(topic='hello/world', payload='Wrong data dir or folder.') # load the model filename = "kmeanModel.npy" model = np.load(filename).item() client.publish(topic='hello/world', payload='Loaded K-means model.') # checking the iteration if (filepath == "1st_test/"): rhigh = 8 else: rhigh = 4 testlabels = [] for i in range(0,rhigh): print("Checking for the bearing",i+1) result = pd.DataFrame() result['freq_max1'] = list((np.array(freq_max1))[:,i]) result['freq_max2'] = list((np.array(freq_max2))[:,i]) result['freq_max3'] = list((np.array(freq_max3))[:,i]) result['freq_max4'] = list((np.array(freq_max4))[:,i]) result['freq_max5'] = list((np.array(freq_max5))[:,i]) X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]] label = model.predict(X) labelfive = list(label[-100:]).count(5) labelsix = list(label[-100:]).count(6) labelseven = list(label[-100:]).count(7) totalfailur = labelfive+labelsix+labelseven#+labelfour ratio = (totalfailur/100)*100 if(ratio >= 25): client.publish(topic='hello/world', payload='Bearing is suspected to fail.') else: client.publish(topic='hello/world', payload='Bearing is in normal condition.') testlabels.append(label[-100:]) # Asynchronously schedule this function to be run again in 5 seconds Timer(5, kmeans_test_run).start() # Start executing the function above kmeans_test_run() # This is a dummy handler and will not be invoked # Instead the code above will be executed in an infinite loop for our example def function_handler(event, context): return
关于作者
Rosalia Nyurguhun 是 Intel 核心和视觉计算集团的一名软件工程师,致力于物联网的规模化赋能项目。