自动化 MLOps 部署到 Kubernetes





5.00/5 (2投票s)
在本文中,我们将开发一个半自动的部署到生产环境的脚本,该脚本将完成我们的项目。
在之前的系列文章中,我们解释了如何编写脚本以便在我们的 Docker 容器组中执行,作为 CI/CD MLOps 管道的一部分。在本系列中,我们将设置一个 Google Kubernetes Engine (GKE) 集群来部署这些容器。
本系列文章假设您熟悉深度学习、DevOps、Jenkins 和 Kubernetes 的基本知识。
在本系列的上一篇文章中,我们构建了四个自动化的 Jenkins 工作流。 在本文(本系列的最后一篇文章)中,我们将为我们的 CI/CD MLOps 管道开发一个半自动的生产环境部署。 它是半自动的,因为通常作为产品所有者,您希望在部署到生产环境之前检查单元测试结果,以避免服务故障。 部署到生产环境可以手动完成,但需要自动化才能实现 Google MLOps 成熟度模型的目标。
下图显示了我们在项目模式中的位置。
部署到生产环境涉及
- 在单元测试结束后,将模型文件从 GCS 测试注册表复制到生产注册表
- 清理已完成的 Kubernetes 任务
- 启动预测服务 pod 的系统性关闭(如果已经执行了相应的工作流程),这将迫使 Kubernetes 启动新的 pod,这些 pod 将以零服务停机时间加载新模型
开发 Python 脚本
我们一直在使用 Jenkins 和 Kubernetes 来构建我们的 CI/CD 解决方案。 下一个脚本将向您展示如何使用 Python 与 Jenkins 和 Kubernetes 交互,以便自动化部署到生产环境的任务。 我们的 Python 脚本将在本地运行。
让我们深入研究代码。 首先,我们导入所需的库并定义变量
from kubernetes import client, config from google.cloud import storage import jenkins import time import os bucket_name = 'automatictrainingcicd-aiplatform' model_name = 'best_model.hdf5'
接下来,我们声明一个函数来清除集群中已完成的作业
def clean_jobs(): config.load_kube_config() api_instance=client.BatchV1Api() print("Listing jobs:") api_response = api_instance.list_job_for_all_namespaces() jobs = [] print('job-name job-namespace active succeeded failed start-time completion-time') for i in api_response.items: jobs.append([i.metadata.name,i.metadata.namespace]) print("%s %s %s %s %s %s %s" % (i.metadata.name,i.metadata.namespace,i.status.active,i.status.succeeded,i.status.failed,i.status.start_time,i.status.completion_time)) print('Deleting jobs...') if len(jobs) > 0: for i in range(len(jobs)): api_instance.delete_namespaced_job(jobs[i][0],jobs[i][1]) print("Jobs deleted.") else: print("No jobs found.") return
将模型从 GCS 测试注册表复制到生产注册表的函数如下
def model_to_production(): storage_client = storage.Client.from_service_account_json('AutomaticTrainingCICD-68f56bfa992e.json') bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_name)).exists(storage_client) if status == True: print('Copying model...') source_blob = bucket.blob('{}/{}'.format('testing',model_name)) destination_blob_name = '{}/{}'.format('production',model_name) blob_copy = bucket.copy_blob(source_blob, bucket, destination_blob_name) print('Model from testing registry has been copied to production registry.') else: print('No model found at testing registry.') return
下一个函数检查预测服务是否处于活动状态。 如果是,则启动系统性 pod 关闭; 否则,将触发 AutomaticTraining-PredictionAPI Jenkins 工作流
def check_services(): api_instance = client.CoreV1Api() api_response = api_instance.list_service_for_all_namespaces() print('Listing services:') print('service-namespace service-name') services = [] for i in api_response.items: print("%s %s" % (i.metadata.namespace, i.metadata.name)) services.append(i.metadata.name) if True in (t.startswith('gke-api') for t in services): print('gke-api service is active. Proceeding to systematically shutdown its pods...') shutdown_pods() return else: jenkins_build() return
如果预测服务处于活动状态,则以下函数负责 pod 关闭
def shutdown_pods(): config.load_kube_config() api_instance = client.CoreV1Api() print("Listing pods:") api_response = api_instance.list_pod_for_all_namespaces(watch=False) pods = [] print('pod-ip-address pod-namespace pod-name') for i in api_response.items: print("%s %s %s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name)) pods.append([i.metadata.name, i.metadata.namespace]) print('Shutting down pods...') print('Deleting only gke-api pods...') if len(pods) > 0: for i in range(len(pods)): if pods[i][0].startswith('gke-api') == True: api_instance.delete_namespaced_pod(pods[i][0],pods[i][1]) print("Pod '{}' shut down.".format(pods[i][0])) time.sleep(120) print("All pods have been shut down.") else: print("No pods found.") return
如果预测服务未激活,则会触发以下函数。 它部署预测服务
def jenkins_build(): print('gke-api service is not active. Proceeding to build AutomaticTraining-PredictionAPI job at Jenkins.') server = jenkins.Jenkins('https://:8080', username='your_username', password='your_password') server.build_job('AutomaticTraining-PredictionAPI') print('AutomaticTraining-PredictionAPI job has been triggered, check Jenkins logs for more information.') return
最后,主函数; 它以所需的顺序执行整个脚本
def main(): clean_jobs() model_to_production() check_services() if __name__ == '__main__': main()
运行脚本
一旦您运行我们开发的 Python 脚本文件,您应该得到以下响应
所有旧的、已完成的作业都将被删除,该模型将被复制到生产注册表,并且这些 pod 将被成功终止。 要仔细检查您是否获得了新的 pod,请在脚本执行前后运行 kubectl get pods
。 您应该会看到不同的 pod 标识符
要查看最终产品是什么样子(包括界面“奖励”),请查看此内容。 界面的公共 IP 地址是我们访问服务的地方
服务界面如下所示
最后,这是您提交图像后显示的预测
结论
这结束了我们的系列。 我们希望您喜欢这次旅程,并且您可以利用它来获得处理具有挑战性的 ML 任务的知识!