MLOps 持续交付与模型单元测试





5.00/5 (1投票)
在本文中,我们将开发一个模型单元测试容器。
在本系列文章中,我们将引导您完成将 CI/CD 应用于 AI 任务的过程。您最终将获得一个功能齐全的流水线,满足 Google MLOps 成熟度模型 中 2 级的要求。我们假设您对 Python、深度学习、Docker、DevOps 和 Flask 有一定的了解。
在这一系列文章的前几篇中,我们解释了如何持续集成模型更改,以及如何在收集到新数据时持续训练我们的模型。在本文中,我们将测试在模拟生产环境中的训练模型。我们将加载保存在测试注册表中的模型,通过模型 API 的克隆版本公开它,并对其运行测试。欢迎您在此阶段添加您自己的测试。下图显示了我们在项目流程中的位置。
代码文件结构如下
从 其仓库 获取原始代码。
data_utils.py
data_utils.py 文件包含检查模型是否存在于测试模型注册表中的函数,如果存在,则加载该模型
import datetime from google.cloud import storage import pandas as pd import numpy as np import os import cv2 import sys def previous_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) return status,None except Exception as e: print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: '+e,flush=True) return None,e def load_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) blob1.download_to_filename('/root/'+str(model_filename)) return True,None except Exception as e: print('Something went wrong when trying to load previous model from GCS bucket. Exception: '+e,flush=True) return False,e
email_notifications.py
email_notifications.py 文件处理有关代码执行成功或出现问题的通知,并将其发送给产品负责人
import smtplib
import os
# Email variables definition
sender = ‘example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = ‘example@gmail.com’
smtp_password = ‘your_password’
def send_update(message):
message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message)
try:
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email.',flush=True)
print('Exception: ',e)
return
def exception(e_message):
try:
message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message)
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email.',flush=True)
print('Exception: ',e)
return
task.py
task.py 文件处理容器执行。它协调 Flask 应用程序的初始化和结束、模型加载、模型测试和电子邮件通知
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/
# Starting flask app
app = Flask(__name__)
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
class_names = ['Normal','Viral Pneumonia','COVID-19']
headers = {'content-type': 'image/png'}
api = 'http://127.0.0.1:5000/' # self app
global model
@app.before_first_request
def before_first_request():
def initialize_job():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
global model
# Checking if there's any model saved at testing on GCS
model_gcs = data_utils.previous_model(bucket_name,model_name)
# If any model exists at testing, load it, test it on data and use it on the API
if model_gcs[0] == True:
model_gcs = data_utils.load_model(bucket_name,model_name)
if model_gcs[0] == True:
try:
model = load_model(model_name)
except Exception as e:
email_notifications.exception('Something went wrong trying to test old /testing model. Exception: '+str(e))
sys.exit(1)
else:
email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: '+str(model_gcs[1]))
sys.exit(1)
if model_gcs[0] == False:
email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
sys.exit(1)
if model_gcs[0] == None:
email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: '+model_gcs[1]+'. Aborting automatic testing.')
sys.exit(1)
api_test()
thread = threading.Thread(target=initialize_job)
thread.start()
@app.route('/init', methods=['GET','POST'])
def init():
message = {'message': 'API initialized.'}
response = jsonpickle.encode(message)
return Response(response=response, status=200, mimetype="application/json")
@app.route('/', methods=['POST'])
def index():
if request.method=='POST':
try:
#Converting string that contains image to uint8
image = np.fromstring(request.data,np.uint8)
image = image.reshape((128,128,3))
image = [image]
image = np.array(image)
image = image.astype(np.float16)
result = model.predict(image)
result = np.argmax(result)
message = {'message': '{}'.format(str(result))}
json_response = jsonify(message)
return json_response
except Exception as e:
message = {'message': 'Error: '+str(e)}
json_response = jsonify(message)
email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: '+str(e)+'. Aborting automatic testing.')
return json_response
else:
message = {'message': 'Error. Please use this API in a proper manner.'}
json_response = jsonify(message)
return json_response
def self_initialize():
def initialization():
global started
started = False
while started == False:
try:
server_response = requests.get('http://127.0.0.1:5000/init')
if server_response.status_code == 200:
started = True
except:
pass
time.sleep(3)
thread = threading.Thread(target=initialization)
thread.start()
def api_test():
try:
image = cv2.imread('TEST_IMAGE.jpg')
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (128, 128))
result = requests.post(api, data=image.tostring(),headers=headers)
result = result.json()
prediction = int(result['message'])
if prediction == 1:
email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.')
sys.exit(0)
else:
email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.')
sys.exit(1)
except Exception as e:
email_notifications.exception('Testing stage crashed with an exception: '+str(e)+'. Check the GCP logs for more information.')
sys.exit(1)
if __name__ == '__main__':
self_initialize()
app.run(host='0.0.0.0',debug=True,threaded=True)
Dockerfile
我们的 Dockerfile 提供了容器构建的规则
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root RUN mv /root/AutomaticTraining-UnitTesting/task.py /root RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root EXPOSE 5000 ENTRYPOINT ["python","task.py"]
构建并运行容器后,您将获得一个功能齐全的模型单元测试器。它允许您验证即将部署到生产环境的模型是否输出预期结果,且没有错误或故障。
请随时在此作业中包含其他测试。通常,此类测试取决于业务场景。
后续步骤
在下一篇文章中,我们将构建一个 API,该 API 将从生产注册表加载我们的模型,以启用 Google MLOps 成熟度模型 中描述的预测服务。敬请期待!