65.9K
CodeProject 正在变化。 阅读更多。
Home

构建 MLOps 模型 API

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2021 年 5 月 10 日

CPOL

1分钟阅读

viewsIcon

8086

downloadIcon

82

在本文中,我们将构建模型 API 以支持预测服务。

在本系列文章中,我们将引导您完成将 CI/CD 应用于 人工智能 任务的过程。您最终将获得一个功能齐全的流水线,满足 Google MLOps 成熟度模型 的 2 级要求。我们假设您对 Python深度学习DockerDevOpsFlask 有一定的了解。

上一篇文章 中,我们讨论了 ML CI/CD 流水线中的单元测试步骤。 在本文中,我们将构建模型 API 以支持预测服务。

下图显示了我们在项目流程中的位置。

代码文件的结构如下

本文中的大部分代码与上一篇文章中的代码几乎相同,因此我们只关注差异。

此仓库 中找到完整的代码,如下面的代码片段所示,这些代码片段是精简版本。

task.py

task.py 文件协调容器内的程序执行,如下所示

import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time
 
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/
 
# Starting flask app
app = Flask(__name__)
 
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
global model
 
@app.before_first_request
def before_first_request():
 def initialize_job():
  if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
   tf.config.set_soft_device_placement(True)
   tf.debugging.set_log_device_placement(True)
  global model
  # Checking if there's any model saved at testing on GCS
  model_gcs = data_utils.previous_model(bucket_name,model_name)
  # If any model exists at prod, load it, test it on data and use it on the API
  if model_gcs[0] == True:
   model_gcs = data_utils.load_model(bucket_name,model_name)
   if model_gcs[0] == True:
    try:
     model = load_model(model_name)
    except Exception as e:
     email_notifications.exception('Something went wrong trying to production model. Exception: '+str(e))
     sys.exit(1)
   else:
    email_notifications.exception('Something went wrong when trying to load production model. Exception: '+str(model_gcs[1]))
    sys.exit(1)
  if model_gcs[0] == False:
   email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
   sys.exit(1)
  if model_gcs[0] == None:
   email_notifications.exception('Something went wrong when trying to check if production model exists. Exception: '+model_gcs[1]+'. Aborting execution.')
   sys.exit(1)
 thread = threading.Thread(target=initialize_job)
 thread.start()
 
 
@app.route('/init', methods=['GET','POST'])
def init():
 message = {'message': 'API initialized.'}
 response = jsonpickle.encode(message)
 return Response(response=response, status=200, mimetype="application/json")
 
@app.route('/', methods=['POST'])
def index():
 if request.method=='POST':
  try:
   #Converting string that contains image to uint8
   image = np.fromstring(request.data,np.uint8)
   image = image.reshape((128,128,3))
   image = [image]
   image = np.array(image)
   image = image.astype(np.float16)
   result = model.predict(image)
   result = np.argmax(result)
   message = {'message': '{}'.format(str(result))}
   json_response = jsonify(message)
   return json_response
 
  except Exception as e:
   message = {'message': 'Error'}
   json_response = jsonify(message)
   email_notifications.exception('Something went wrong when trying to make prediction via Production API. Exception: '+str(e)+'. Aborting execution.')
   return json_response
 else:
  message = {'message': 'Error. Please use this API in a proper manner.'}
  json_response = jsonify(message)
  return json_response
 
def self_initialize():
 def initialization():
  global started
  started = False
  while started == False:
   try:
    server_response = requests.get('http://127.0.0.1:5000/init')
    if server_response.status_code == 200:
     print('API has started successfully, quitting initialization job.')
     started = True
   except:
    print('API has not started. Still attempting to initialize it.')
   time.sleep(3)
 thread = threading.Thread(target=initialization)
 thread.start()
 
if __name__ == '__main__':
 self_initialize()
 app.run(host='0.0.0.0',debug=True,threaded=True)

data_utils.py

data_utils.py 文件与之前的版本唯一的不同之处在于加载模型的部分,从生产注册表中加载。 差异如下

  • status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) 改为 status = storage.Blob(bucket=bucket, name='{}/{}'.format('production',model_filename)).exists(storage_client)
  • blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) 改为 blob1 = bucket.blob('{}/{}'.format('production',model_filename))

Dockerfile

在我们的 Dockerfile 中,替换

RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git

RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-PredictionAPI.git

构建并运行容器后,您应该可以通过 POST 请求在 http://172.17.0.2:5000/ 访问一个功能齐全的预测服务。

后续步骤

在接下来的系列文章中,我们将看到如何借助 KubernetesJenkins 和 Google Cloud Platform 将各个容器连接成一个实际的流水线。敬请期待!

© . All rights reserved.