构建 Docker 容器以训练深度伪造自编码器





4.00/5 (1投票)
在下一篇文章中,我们讨论了 Google AI Platform 以及如何借助 Docker 容器在那里训练您的模型。
深度伪造——利用深度学习将一个人的脸替换到视频中的另一个人身上——是当今人工智能最有趣且令人担忧的应用方式之一。
虽然 Deep Fake(深度伪造)可用于合法目的,但它们也可能被用于散布虚假信息。由于能够轻易地将某人的脸换到任何视频中,我们还能真正相信自己的眼睛吗?一段看起来真实的政客或演员做着或说着某些惊人之事的视频,可能根本就不是真的。
在本系列文章中,我们将展示 Deep Fake 的工作原理,并演示如何从零开始实现它。然后,我们将了解 DeepFaceLab,这是一个由 Tensorflow 驱动的一体化工具,常用于创建令人信服的 Deep Fake。
在之前的文章中,我们讨论了用于 Deep Fake 的自动编码器以及使其正常工作所需的条件。如果您是刚加入的读者,我最近解释了先决条件,然后直接进入了创建容器并提交到 Google Cloud Platform 的代码,所以请不要跳过,否则这里概述的步骤可能无法按预期工作。在本文中,我将向您展示如何创建、测试和提交所引用的容器。让我们开始吧!
我们的 Docker 容器结构
在我们的案例中,Docker 容器并不复杂,只包含几个文件
/deepfakes
|
| -- Dockerfile
| -- config.yaml
| -- model.py
| -- task.py
| -- data_utils.py
- Dockerfile 陈述了必要的指令,以使用正确的镜像、安装我们训练顺利运行所需的先决条件、将文件复制到 Docker 镜像,并定义此容器的入口点。您可以在这里找到 AI Platform 的 Dockerfile 基础知识。
- config.yaml 包含定义训练层级所需的信息。它本质上是一个说明您训练模型需要多少计算资源的文件。您可以在这里找到构建此文件、理解和定义您需要使用的层级的步骤。如果您计划实施基于 GPU 的训练,可以在这里找到价格信息。
- task.py 处理与模型训练工作流相关的所有事情。它调用所需的函数来获取通过命令行界面(CLI)传递的参数,解压、预处理和分割从 GitHub 仓库获取的数据。它设置 GPU,构建模型,开始训练并将生成的自动编码器保存在 Google Storage 中。
- model.py 只包含一个函数,该函数构建我们的模型并在编译后返回它。
- data_utils.py 包含数据加载、数据集创建和模型保存所必需的函数。
让我们开始看实际的代码吧!但首先,在您的机器上创建一个文件夹来存放所有这些文件。
编写 Dockerfile
正如我在Docker 容器概述文章中提到的,这个文件说明了构建我们容器所需的一切。以下是在 Google AI Platform 中使此容器可执行所需的 Dockerfile
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-3
WORKDIR /root
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
RUN git clone https://github.com/sergiovirahonda/DeepFakesDataset.git
COPY model.py ./model.py
COPY data_utils.py ./data_utils.py
COPY task.py ./task.py
ENTRYPOINT ["python","task.py"]
第一行表示我们将使用由 GPU 驱动的 TensorFlow 2 镜像,该镜像是基于 Ubuntu 的。第二行定义了工作目录。第三和第四行安装所需的依赖项。第五行克隆了包含我们原始数据集的仓库,从第六行到第八行我们复制了模型训练所需的 .py 文件。最后,我们指出训练过程从 task.py 文件开始,并且必须用 Python 解释。将这些命令复制并粘贴到您的 Dockerfile 中。
定义 config.yaml 文件
这个文件将取决于您的需求。如果您想加速训练过程,它可能会变得更复杂,但在我们的案例中,文件是这样的
trainingInput:
scaleTier: CUSTOM
masterType: standard_gpu
我正在使用一台标准的单 GPU 机器,但如果需要,您可以添加更多 GPU 并实现分布式训练。
编写 task.py 文件
正如我之前提到的,这个文件将处理与训练作业相关的所有事情。它是容器的入口,并调用所有外围文件和函数。
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint
import argparse
import data_utils
import model
import sys
import os
from google.cloud import storage
import datetime
def train_model(args):
X_train_a, X_test_a, X_train_b, X_test_b = data_utils.load_data(args)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
# If you're going to use MultiGPU - Create a MirroredStrategy
# strategy = tf.distribute.MirroredStrategy()
# print("Number of devices: {}".format(strategy.num_replicas_in_sync))
# with strategy.scope():
# autoencoder_a = model.autoencoder_model()
# autoencoder_b = model.autoencoder_model()
# If you're going to use MultiGPU - Comment out the next two lines
autoencoder_a = model.autoencoder_model()
autoencoder_b = model.autoencoder_model()
print('Starting autoencoder_a training', flush=True)
checkpoint_1 = ModelCheckpoint("autoencoder_a.hdf5", monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch")
autoencoder_a.fit(X_train_a, X_train_a,epochs=args.epochs,batch_size=128,shuffle=True,validation_data=(X_test_a, X_test_a),callbacks=[checkpoint_1])
print('Training A has ended. ',flush=True)
print('Starting autoencoder_b training', flush=True)
checkpoint_2 = ModelCheckpoint("autoencoder_b.hdf5", monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq='epoch')
autoencoder_b.fit(X_train_a, X_train_a,epochs=args.epochs,batch_size=128,shuffle=True,validation_data=(X_test_b, X_test_b),callbacks=[checkpoint_2])
print('Training B has ended. ',flush=True)
print('Proceeding to save models into GCP.',flush=True)
data_utils.save_model(args.bucket_name,'autoencoder_a.hdf5')
print('autoencoder_a saved at GCP Storage', flush=True)
data_utils.save_model(args.bucket_name,'autoencoder_b.hdf5')
print('autoencoder_b saved at GCP Storage', flush=True)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--bucket-name',
type=str,
help='GCP bucket name')
parser.add_argument('--epochs',
type=int,
default=10,
help='Epochs number')
args = parser.parse_args()
return args
def main():
args = get_args()
train_model(args)
if __name__ == '__main__':
main()
请记住,如果您决定增加训练机器的数量并使用多 GPU,您需要为此采取一种策略。一个非常常见的是 MirroredStrategy。
编写 model.py 文件
我已经在前几篇文章中详细介绍了模型开发,这个文件基本上复用了构建我们自动编码器所需的代码。它只包含一个函数,返回我们已经编译好的模型。
from tensorflow.keras.models import load_model
from tensorflow.keras import Model
from tensorflow.keras import layers
from tensorflow.keras import optimizers
import tensorflow as tf
import numpy as np
def autoencoder_model():
#Making encoder:
input_img = layers.Input(shape=(120, 120, 3))
x = layers.Conv2D(256,kernel_size=5, strides=2, padding='same',activation='relu')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(512,kernel_size=5, strides=2, padding='same',activation='relu')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(1024,kernel_size=5, strides=2, padding='same',activation='relu')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Flatten()(x)
x = layers.Dense(9216)(x)
encoded = layers.Reshape((3,3,1024))(x)
encoder = Model(input_img, encoded,name="encoder")
encoder.summary()
#Making decoder:
decoder_input= layers.Input(shape=((3,3,1024)))
x = layers.Conv2D(1024,kernel_size=5, strides=2, padding='same',activation='relu')(decoder_input)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(512,kernel_size=5, strides=2, padding='same',activation='relu')(x)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(256,kernel_size=5, strides=2, padding='same',activation='relu')(x)
x = layers.Flatten()(x)
x = layers.Dense(np.prod((120, 120, 3)))(x)
decoded = layers.Reshape((120, 120, 3))(x)
decoder = Model(decoder_input, decoded,name="decoder")
decoder.summary()
#Making autoencoder
auto_input = layers.Input(shape=(120,120,3))
encoded = encoder(auto_input)
decoded = decoder(encoded)
autoencoder = Model(auto_input, decoded,name="autoencoder")
autoencoder.compile(optimizer=optimizers.Adam(lr=5e-5, beta_1=0.5, beta_2=0.999), loss='mae')
#autoencoder.compile(optimizer=optimizers.Adam(lr=5e-5, beta_1=0.5, beta_2=0.999), loss='mse')
autoencoder.summary()
return autoencoder
编写我们的 data_utils.py 文件
我们最后一个文件由 task.py 调用,用于预处理从 GitHub 仓库克隆的数据并返回分割后的数据,也用于将模型保存到我们的 Google Cloud Storage 存储桶中。
import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import os
import zipfile
import cv2
import sys
def load_data(args):
sys.stdout.write("Starting data processing")
print('Current directory:', flush=True)
print(os.getcwdb(), flush=True)
entries = os.listdir('/root/DeepFakesDataset')
print(entries)
print('Extracting files', flush=True)
file_1 = '/root/DeepFakesDataset/trump.zip'
file_2 = '/root/DeepFakesDataset/biden.zip'
extract_to = '/root/DeepFakesDataset/'
with zipfile.ZipFile(file_1, 'r') as zip_ref:
zip_ref.extractall(extract_to)
with zipfile.ZipFile(file_2, 'r') as zip_ref:
zip_ref.extractall(extract_to)
print('Files extracted', flush=True)
faces_1 = create_dataset('/root/DeepFakesDataset/trump')
print('First dataset created', flush=True)
faces_2 = create_dataset('/root/DeepFakesDataset/biden')
print('Second dataset created', flush=True)
print("Total President Trump face's samples: ",len(faces_1))
print("Total President Biden face's samples: ",len(faces_2))
X_train_a, X_test_a, y_train_a, y_test_a = train_test_split(faces_1, faces_1, test_size=0.20, random_state=0)
X_train_b, X_test_b, y_train_b, y_test_b = train_test_split(faces_2, faces_2, test_size=0.15, random_state=0)
print(X_train_a.shape)
print(X_train_a[0])
return X_train_a, X_test_a, X_train_b, X_test_b
def create_dataset(path):
images = []
for dirname, _, filenames in os.walk(path):
for filename in filenames:
image = cv2.imread(os.path.join(dirname, filename))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = image.astype('float32')
image /= 255.0
images.append(image)
images = np.array(images)
return images
def save_model(bucket_name, best_model):
print('inside save_model, about to save it to GCP Storage',flush=True)
bucket = storage.Client().bucket(bucket_name)
print(bucket)
blob1 = bucket.blob('{}/{}'.format(datetime.datetime.now().strftime('model_%Y%m%d_%H%M%S'),best_model))
blob1.upload_from_filename(best_model)
构建 Docker 镜像并将其推送到 Google AI Platform
我们快要完成了!只剩下几条命令了。继续努力!
启动一个终端窗口,浏览到存放容器文件的文件夹,并运行以下命令
docker build -f Dockerfile -t $IMAGE_URI ./
这个命令需要几分钟才能完成。它将启动 Docker 镜像下载,并遵循 Dockerfile 中陈述的所有指令。最后,它将在您的本地注册表中创建该容器。
要将 Docker 镜像推送到 GCP,请运行 `docker push $IMAGE_URI`。这需要几分钟才能完成。它本质上会将您刚刚构建的镜像推送到 GCP 注册表,以便您稍后可以用它来启动训练作业,这也是我们接下来要做的事情。一旦推送过程结束,就该最终提交作业以训练我们的模型了。
顺便说一句,如果您在将镜像推送到 GCP 时遇到问题,这个资源可以帮助您找出问题所在。
在 Google AI Platform 上提交和监控训练作业
要向 AI Platform 提交训练作业,请发出以下命令,您唯一需要调整的是 `--epochs` 参数。将其设置为您需要达到良好指标所需的迭代次数。请记住,epochs 数量越高,您的自动编码器效果就越好。一旦您按下回车键,它应该会显示作业已成功提交。
gcloud ai-platform jobs submit training $JOB_NAME --region $REGION --master-image-uri $IMAGE_URI --config config.yaml -- --epochs=10000 --bucket-name=$BUCKET_NAME
要监控您的训练作业并了解上面发生了什么,请发出命令 gcloud ai-platform jobs describe `$JOB_NAME`,它将自动返回可查看训练日志的 URL。另一种方法是使用训练作业页面并选择您提交的作业。请记住,这个过程可能需要几个小时,如果您只用一个 GPU 在单个实例上训练模型,甚至可能需要更长时间。训练结束后,您应该可以在之前创建的存储桶中找到模型文件。
在 GCS 存储桶中查找并下载您训练好的模型
是时候终于下载您期待已久的东西了。打开 GCP 控制台,使用左侧面板找到存储,然后瞧!您训练好的模型就在存储桶里。
每个模型及其相关文件将保存在不同的文件夹中
在里面您会找到相应的 .hdf5 文件,您可以下载它,然后从任何其他地方加载
最后,要下载所有内容,只需全部选中并点击“下载”按钮。下载过程将自动开始。如果您确定不需要进一步训练,那就去关闭项目,以避免您的账单账户产生额外费用。
我们已经到了本文的结尾。我希望您能获得和我一样的结果!在下一篇文章中,我将向您展示如何使用您获得的训练模型,通过交换转换后的人脸来完成 Deep Fake 的整个流程。下次见!