65.9K
CodeProject 正在变化。 阅读更多。
Home

自动化 MLOps 部署到 Kubernetes

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2021 年 5 月 17 日

CPOL

3分钟阅读

viewsIcon

7409

在本文中,我们将开发一个半自动的部署到生产环境的脚本,该脚本将完成我们的项目。

在之前的系列文章中,我们解释了如何编写脚本以便在我们的 Docker 容器组中执行,作为 CI/CD MLOps 管道的一部分。在本系列中,我们将设置一个 Google Kubernetes Engine (GKE) 集群来部署这些容器。

本系列文章假设您熟悉深度学习、DevOps、Jenkins 和 Kubernetes 的基本知识。

在本系列的上一篇文章中,我们构建了四个自动化的 Jenkins 工作流。 在本文(本系列的最后一篇文章)中,我们将为我们的 CI/CD MLOps 管道开发一个半自动的生产环境部署。 它是半自动的,因为通常作为产品所有者,您希望在部署到生产环境之前检查单元测试结果,以避免服务故障。 部署到生产环境可以手动完成,但需要自动化才能实现 Google MLOps 成熟度模型的目标。

下图显示了我们在项目模式中的位置。

部署到生产环境涉及

  • 在单元测试结束后,将模型文件从 GCS 测试注册表复制到生产注册表
  • 清理已完成的 Kubernetes 任务
  • 启动预测服务 pod 的系统性关闭(如果已经执行了相应的工作流程),这将迫使 Kubernetes 启动新的 pod,这些 pod 将以零服务停机时间加载新模型

开发 Python 脚本

我们一直在使用 Jenkins 和 Kubernetes 来构建我们的 CI/CD 解决方案。 下一个脚本将向您展示如何使用 Python 与 Jenkins 和 Kubernetes 交互,以便自动化部署到生产环境的任务。 我们的 Python 脚本将在本地运行。

让我们深入研究代码。 首先,我们导入所需的库并定义变量

from kubernetes import client, config
from google.cloud import storage
import jenkins
import time
import os
 
bucket_name = 'automatictrainingcicd-aiplatform'
model_name = 'best_model.hdf5'

接下来,我们声明一个函数来清除集群中已完成的作业

def clean_jobs():
    config.load_kube_config()
 
    api_instance=client.BatchV1Api()
    print("Listing jobs:")
    api_response = api_instance.list_job_for_all_namespaces()
    jobs = []
    print('job-name  job-namespace  active  succeeded  failed  start-time  completion-time')
    for i in api_response.items:
        jobs.append([i.metadata.name,i.metadata.namespace])
        print("%s  %s  %s  %s  %s  %s  %s" % (i.metadata.name,i.metadata.namespace,i.status.active,i.status.succeeded,i.status.failed,i.status.start_time,i.status.completion_time))
    print('Deleting jobs...')
    if len(jobs) > 0:
        for i in range(len(jobs)):
            api_instance.delete_namespaced_job(jobs[i][0],jobs[i][1])
        print("Jobs deleted.")
    else:
        print("No jobs found.")
return

将模型从 GCS 测试注册表复制到生产注册表的函数如下

def model_to_production():
    storage_client = storage.Client.from_service_account_json('AutomaticTrainingCICD-68f56bfa992e.json')
    bucket = storage_client.bucket(bucket_name)
    status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_name)).exists(storage_client)
    if status == True:
        print('Copying model...')
        source_blob = bucket.blob('{}/{}'.format('testing',model_name))
        destination_blob_name = '{}/{}'.format('production',model_name)
        blob_copy = bucket.copy_blob(source_blob, bucket, destination_blob_name)
        print('Model from testing registry has been copied to production registry.')
    else:
        print('No model found at testing registry.')
    return

下一个函数检查预测服务是否处于活动状态。 如果是,则启动系统性 pod 关闭; 否则,将触发 AutomaticTraining-PredictionAPI Jenkins 工作流

def check_services():
    api_instance = client.CoreV1Api()
    api_response = api_instance.list_service_for_all_namespaces()
    print('Listing services:')
    print('service-namespace  service-name')
    services = []
    for i in api_response.items:
        print("%s  %s" % (i.metadata.namespace, i.metadata.name))
        services.append(i.metadata.name)
    if True in (t.startswith('gke-api') for t in services):
        print('gke-api service is active. Proceeding to systematically shutdown its pods...')
        shutdown_pods()
        return
    else:
        jenkins_build()
        return

如果预测服务处于活动状态,则以下函数负责 pod 关闭

def shutdown_pods():
    config.load_kube_config()
    api_instance = client.CoreV1Api()
    print("Listing pods:")
    api_response = api_instance.list_pod_for_all_namespaces(watch=False)
    pods = []
    print('pod-ip-address  pod-namespace  pod-name')
    for i in api_response.items:
        print("%s  %s  %s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
        pods.append([i.metadata.name, i.metadata.namespace])
    print('Shutting down pods...')
    print('Deleting only gke-api pods...')
    if len(pods) > 0:
        for i in range(len(pods)):
            if pods[i][0].startswith('gke-api') == True:
                api_instance.delete_namespaced_pod(pods[i][0],pods[i][1])
                print("Pod '{}' shut down.".format(pods[i][0]))
                time.sleep(120)
        print("All pods have been shut down.")
    else:
        print("No pods found.")
   return

如果预测服务未激活,则会触发以下函数。 它部署预测服务

def jenkins_build():
    print('gke-api service is not active. Proceeding to build AutomaticTraining-PredictionAPI job at Jenkins.')
    server = jenkins.Jenkins('https://:8080', username='your_username', password='your_password')
    server.build_job('AutomaticTraining-PredictionAPI')
    print('AutomaticTraining-PredictionAPI job has been triggered, check Jenkins logs for more information.')
    return

最后,主函数; 它以所需的顺序执行整个脚本

def main():
    clean_jobs()
    model_to_production()
    check_services()
 
if __name__ == '__main__':
    main()

运行脚本

一旦您运行我们开发的 Python 脚本文件,您应该得到以下响应

所有旧的、已完成的作业都将被删除,该模型将被复制到生产注册表,并且这些 pod 将被成功终止。 要仔细检查您是否获得了新的 pod,请在脚本执行前后运行 kubectl get pods。 您应该会看到不同的 pod 标识符

要查看最终产品是什么样子(包括界面“奖励”),请查看此内容。 界面的公共 IP 地址是我们访问服务的地方

服务界面如下所示

最后,这是您提交图像后显示的预测

结论

这结束了我们的系列。 我们希望您喜欢这次旅程,并且您可以利用它来获得处理具有挑战性的 ML 任务的知识!

© . All rights reserved.