AI 队列长度检测:计算区域内人数





5.00/5 (3投票s)
在本文中,我们将训练一个深度学习模型来检测和计算给定区域内的人数。
之前,我们实现了 R-CNN 进行对象检测。虽然这些对象检测算法在检测人脸时效果很好,但当目标对象不清晰时,它们的效果就不佳了。此外,由于它使用了滑动窗口技术,搜索会变得详尽,从而影响性能。在本文中,我们将学习如何实现深度神经网络,利用密度映射来估算人群或队列中的人数。
我们将使用 ShangaiTech 数据集。该数据集分为两部分。对于本文,我们将仅使用 B 部分来训练我们的人群检测模型,然后在其自定义数据集上进行测试。您可以选择使用任一部分;代码对两者都能正常工作。
让我们开始导入所需的库。
import os
import cv2
import csv
import math
import random
import numpy as np
from scipy.io import loadmat
from keras import backend as K
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from keras.callbacks import ModelCheckpoint
from keras.models import load_model, load_model, Model
from keras.layers import Conv2D, MaxPooling2D, Concatenate, Input
预处理输入数据
我们的数据集包含两个子目录:test_data 和 train_data。这两个目录都包含图像及其对应的真实标签。我们无法以原始格式使用数据,因此需要进行一些预处理。由于我们将使用“按密度计数 CNN”方法,因此真实标签数据也需要是密度图。在这里,我们将尝试从给定的真实标签文件中计算出真实密度图。
首先,让我们定义生成输入图像密度图的函数。
def get_density_map(image, points):
image_density = np.zeros_like(image, dtype=np.float64)
height, width = image_density.shape
if points is None:
return image_density
if points.shape[0] == 1:
x1 = max(0, min(width-1, round(points[0, 0])))
y1 = max(0, min(height-1, round(points[0, 1])))
image_density[y1, x1] = 255
return image_density
for j in range(points.shape[0]):
frame_size = 15
sigma = 4.0
Height = np.multiply(cv2.getGaussianKernel(frame_size, sigma), (cv2.getGaussianKernel(frame_size, sigma)).T)
x = min(width-1, max(0, abs(int(math.floor(points[j, 0])))))
y = min(height-1, max(0, abs(int(math.floor(points[j, 1])))))
if x >= width or y >= height:
continue
x1 = x - frame_size//2 + 0
y1 = y - frame_size//2 + 0
x2 = x + frame_size//2 + 1
y2 = y + frame_size//2 + 1
dfx1, dfy1, dfx2, dfy2 = 0, 0, 0, 0
change_Height = False
if x1 < 0:
dfx1 = abs(x1) + 0
x1 = 0
change_Height = True
if y1 < 0:
dfy1 = abs(y1) + 0
y1 = 0
change_Height = True
if x2 > width:
dfx2 = x2 - width
x2 = width
change_Height = True
if y2 > height:
dfy2 = y2 - height
y2 = height
change_Height = True
x1h, y1h, x2h, y2h = 1 + dfx1, 1 + dfy1, frame_size - dfx2, frame_size - dfy2
if change_Height is True:
Height = np.multiply(cv2.getGaussianKernel(y2h-y1h+1, sigma), (cv2.getGaussianKernel(x2h-x1h+1, sigma)).T)
image_density[y1:y2, x1:x2] += Height
return image_density
现在,我们可以创建我们的测试和验证数据了。指定输入图像文件的目录、输入真实标签文件的目录、测试和验证图像、以及标签和输出路径。
input_images_path = ''.join(['./ShanghaiTech/part_B/train_data/images/'])
output_path = './ShanghaiTech/processed_trainval/'
training_images_path = ''.join((output_path, '/training_images/'))
training_densities_path = ''.join((output_path, '/training_densities/'))
validation_images_path = ''.join((output_path, '/validation_images/'))
validation_densities_path = ''.join((output_path, '/valalidation_densities/'))
ground_truth_path = ''.join(['./ShanghaiTech/part_B/train_data/ground-truth/'])
for i in [output_path, training_images_path, training_densities_path, validation_images_path, validation_densities_path]:
if not os.path.exists(i):
os.makedirs(i)
现在,我们将遍历所有训练图像并计算它们的密度图。我们将使用真实标签文件为每个图像文件单独计算密度图,并将其保存为相应的 csv 文件。
seed = 95461354
random.seed(seed)
n = 400
val_test_num = math.ceil(n*0.1)
indices = list(range(1, n+1))
random.shuffle(indices)
for idx in range(1, n+1):
i = indices[idx-1]
image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
input_image = ''.join((input_images_path, 'IMG_',str(i), '.jpg'))
img = cv2.imread(input_image, 0)
height, width = img.shape
new_width, new_height = width / 8, height / 8
new_width, new_height = int(new_width / 8) * 8, int(new_height / 8) * 8
annotation_Points = image_info[0][0][0][0][0] - 1
if width <= new_width * 2:
img = cv2.resize(img, [h, new_width*2+1], interpolation=cv2.INTER_LANCZOS4)
annotation_Points[:, 0] = annotation_Points[:, 0] * 2 * new_width / width
if height <= new_height * 2:
img = cv2.resize(img, [new_height*2+1, w], interpolation=cv2.INTER_LANCZOS4)
annotation_Points[:, 1] = annotation_Points[:,1] * 2 * new_height / height
height, width = img.shape
x_width, y_width = new_width + 1, width - new_width
x_height, y_height = new_height + 1, height - new_height
image_density = get_density_map(img, annotation_Points)
for j in range(1, 10):
x = math.floor((y_width - x_width) * random.random() + x_width)
y = math.floor((y_height - x_height) * random.random() + x_height)
x1, y1 = x - new_width, y - new_height
x2, y2 = x + new_width - 1, y + new_height - 1
base_image = im[y1-1:y2, x1-1:x2]
base_image_density = image_density[y1-1:y2, x1-1:x2]
base_image_annPoints = annotation_Points[
list(
set(np.where(np.squeeze(annotation_Points[:,0]) > x1)[0].tolist()) &
set(np.where(np.squeeze(annotation_Points[:,0]) < x2)[0].tolist()) &
set(np.where(np.squeeze(annotation_Points[:,1]) > y1)[0].tolist()) &
set(np.where(np.squeeze(annotation_Points[:,1]) < y2)[0].tolist())
)
]
base_image_annPoints[:, 0] = base_image_annPoints[:, 0] - x1
base_image_annPoints[:, 1] = base_image_annPoints[:, 1] - y1
img_idx = ''.join((str(i), '_',str(j)))
if idx < val_test_num:
cv2.imwrite(''.join([validation_images_path, img_idx, '.jpg']), base_image)
with open(''.join([validation_densities_path, img_idx, '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(base_image_density)
else:
cv2.imwrite(''.join([training_images_path, img_idx, '.jpg']), base_image)
with open(''.join([training_densities_path, img_idx, '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(base_image_density)
print("Successfully processed files!")
遵循相同的模式,我们也需要处理我们的测试数据。
images_path = ''.join(['./ShanghaiTech/part_B/test_data/images/'])
ground_truth_path = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth/'])
ground_truth_csv = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth_csv/'])
n = 316
for i in range(1, n+1):
image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
input_img = ''.join((images_path, 'IMG_', str(i), '.jpg'))
img = cv2.imread(input_img, 0)
annotationPoints = image_info[0][0][0][0][0] - 1
image_density = get_density_map(img, annotationPoints)
with open(''.join([ground_truth_csv, 'IMG_', str(i), '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(image_density)
print("Successfully processed files!")
训练模型
完成上述步骤后,我们的数据就准备好了,可以加载它来训练我们的模型了。我们现在将定义一个函数来加载图像和标签(基于数据)。
def x_y_generator(images_path, labels_path, batch_size=64):
break_point = 0
t = 0
images_path = np.squeeze(images_path).tolist() if isinstance(images_path, np.ndarray) else images_path
labels_path = np.squeeze(labels_path).tolist() if isinstance(labels_path, np.ndarray) else labels_path
data_length = len(labels_path)
while True:
if not break_point:
x = []
y = []
inner_iteration = batch_size
else:
t = 0
inner_iteration = batch_size - data_length % batch_size
for i in range(inner_iteration):
if t >= data_length:
break_point = 1
break
else:
break_point = 0
img = (cv2.imread(images_path[t], 0) - 127.5) / 128
density_map = np.loadtxt(labels_path[t], delimiter=',')
std = 4
quarter_den = np.zeros((np.asarray(density_map.shape).astype(int)//std).tolist())
for r in range(quarter_den.shape[0]):
for c in range(quarter_den.shape[1]):
quarter_den[r, c] = np.sum(density_map[r*std:(r+1)*std, c*std:(c+1)*std])
x.append(img.reshape(*img.shape, 1))
y.append(quarter_den.reshape(*quarter_den.shape, 1))
t += 1
if not break_point:
x, y = np.asarray(x), np.asarray(y)
yield x, y
我们可以使用以下函数来读取我们的训练、验证和测试数据。
# read training data
train_generator = x_y_generator(train_paths, train_labels, batch_size=len(train_paths))
training_img, train_labels = train_generator.__next__()
# read validation data
validation_generator = x_y_generator(validation_paths, validation_labels, batch_size=len(validation_paths))
validating_img, validation_labels = validation_generator.__next__()
# read test data
test_generator = x_y_generator(test_paths, test_labels, batch_size=len(test_paths))
testing_img, test_labels = test_generator.__next__()
我们的数据已经准备就绪,现在可以定义我们的神经网络了。我们将实现一个多列卷积神经网络。它包含三列不同滤波器大小的卷积神经网络。其思想是将图像作为输入馈送到我们的神经网络,并输出具有总体人群计数的密度图。由于这三列对应于不同的滤波器大小,因此每个 CNN 列学习的特征能够适应人员大小的变化,并且可以轻松地用于拥挤的地方或队列中。
def Multi_Column_CNN(input_shape=None):
inputs = Input(shape=input_shape)
# first column
conv_1 = Conv2D(16, (9, 9), padding='same', activation='relu')(inputs)
conv_1 = MaxPooling2D(2)(conv_1)
conv_1 = (conv_1)
conv_1 = Conv2D(32, (7, 7), padding='same', activation='relu')(conv_1)
conv_1 = MaxPooling2D(2)(conv_1)
conv_1 = Conv2D(16, (7, 7), padding='same', activation='relu')(conv_1)
conv_1 = Conv2D(8, (7, 7), padding='same', activation='relu')(conv_1)
# second column
conv_2 = Conv2D(20, (7, 7), padding='same', activation='relu')(inputs)
conv_2 = MaxPooling2D(2)(conv_2)
conv_2 = (conv_2)
conv_2 = Conv2D(40, (5, 5), padding='same', activation='relu')(conv_2)
conv_2 = MaxPooling2D(2)(conv_2)
conv_2 = Conv2D(20, (5, 5), padding='same', activation='relu')(conv_2)
conv_2 = Conv2D(10, (5, 5), padding='same', activation='relu')(conv_2)
# third column
conv_3 = Conv2D(24, (5, 5), padding='same', activation='relu')(inputs)
conv_3 = MaxPooling2D(2)(conv_3)
conv_3 = (conv_3)
conv_3 = Conv2D(48, (3, 3), padding='same', activation='relu')(conv_3)
conv_3 = MaxPooling2D(2)(conv_3)
conv_3 = Conv2D(24, (3, 3), padding='same', activation='relu')(conv_3)
conv_3 = Conv2D(12, (3, 3), padding='same', activation='relu')(conv_3)
# merge feature map of third column in last dimension and get density map
conv_merge = Concatenate(axis=-1)([conv_1, conv_2, conv_3])
# getting density map as output
density_map = Conv2D(1, (1, 1), padding='same')(conv_merge)
model = Model(inputs=inputs, outputs=density_map)
return model
在我们的模型就绪后,我们也定义一些指标来衡量模型的性能。我们将使用标准均方误差和平均绝对误差。
def mean_absolute_error(labels, predictions):
return K.sum(K.abs(labels - predictions)) / 1
def mean_square_error(labels, predictions):
return K.sum(K.square(labels - predictions)) / 1
现在,让我们训练我们的模型。我们还将使用 Keras 的 ModelCheckpoint
来节省计算资源,并且仅为训练和验证保存最佳模型。
best_validation = ModelCheckpoint(
filepath= 'mcnn_val.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode='min'
)
best_training = ModelCheckpoint(
filepath= 'mcnn_train.hdf5', monitor='loss', verbose=1, save_best_only=True, mode='min'
)
input_shape = (None, None, 1)
model = Multi_Column_CNN(input_shape)
model.compile(loss='mean_squared_error', optimizer='adam', metrics=[mean_absolute_error, mean_square_error])
history = model.fit(
x=training_img, y=train_labels, batch_size=1, epochs=100,
validation_data=(validating_img, validation_labels),
callbacks=[best_validation, best_training]
)
模型训练所需的时间取决于您使用的资源。模型训练完成后,您可以进行测试。
测试模型
作为基本的测试,我们可以绘制训练数据和验证数据的损失。
val_loss, loss = history.history['val_loss'], history.history['loss']
loss = np.asarray(loss)
plt.plot(loss, 'b')
plt.legend(['loss'])
plt.show()
plt.plot(val_loss, 'r')
plt.legend(['val_loss'])
plt.show()
我们的训练模型显示了以下损失图
损失图看起来不错,但让我们获取图像上的预测,看看我们的模型是否能准确地计算出图像中的人数。
from keras import models
#load the trained model
model = models.load_model('./ShanghaiTech/part_B/weights/mcnn_val.hdf5', custom_objects={'mean_absolute_error': mean_absolute_error, 'mean_square_error': mean_square_error })
absolute_error = []
squared_error = []
# specifying the number of test to run
num_test = 50
for i in range(testing_img.shape[0])[:num_test]:
inputs = np.reshape(testing_img[i], [1, *testing_img[i].shape[:2], 1])
outputs = np.squeeze(model.predict(inputs))
density_map = np.squeeze(test_labels[i])
count = np.sum(density_map)
prediction = np.sum(outputs)
fg, (ax0, ax1) = plt.subplots(1, 2, figsize=(16, 5))
# plotting the density maps along with predicted count
plt.suptitle(' '.join([
'count:', str(round(count, 2)),
'prediction:', str(round(prediction, 2))
]))
ax0.imshow(np.squeeze(inputs))
ax1.imshow(density_map * (255 / (np.max(density_map) - np.min(density_map))))
plt.show()
absolute_error.append(abs(count - prediction))
square_error.append((count - prediction) ** 2)
mean_absolute_error = np.mean(absolute_error)
mean_square_error = np.mean(square_error)
print('mean_absolute_error:', mean_absolute_error, 'mean_square_error:', mean_square_error)
这里有一些(不错的)预测结果
在现阶段,我们的模型表现良好,但它在计算队列中的人数方面表现如何呢?没有可用的开源数据集来专门训练和测试队列长度模型,因此我们需要生成自己的数据集。
创建自定义队列长度数据集
牢记基础知识,我们只需要一些图像及其对应的真实标签来组成数据集。我们可以直接从 Google 搜索中收集图像。这很容易,对吧?但是如何生成真实标签文件呢?有各种工具可用于标注图像,包括基于 Web 的边界框标注器、头部标注器,或云供应商(如 AWS SageMaker)提供的一些专用工具。您可以选择任何您想要生成真实标签文件的工具。我在这里坚持最基本的方法,并使用 MATLAB 生成真实标签。为了使用 MATLAB 生成真实标签文件,请将您的图像保存在一个名为“images”的目录中,并运行以下脚本
filePath = fullfile('images', '/*.jpg');
ImageFiles = dir(filePath);
n = length(ImageFiles)
read_images_path = 'images/';
store_gt_path = 'ground-truth/';
t = 0; %number of files initially in training set
for i=1:n
# read image files
img = imread([read_path 'IMG_' num2str(i+t) '.jpg']);
# resize image files
img = imresize(im, [768 1024]);
imwrite(img,[read_images_path 'IMG_' num2str(i+t) '.jpg'], 'jpg');
figure
# show image on screen
imshow(img)
[x,y] = getpts;
image_info{1,1}.location = [x y];
image_info{1,1}.number = size(x,1);
save([store_gt_path 'GT_IMG_' num2str(t+i) '.mat'], 'image_info')
close
end
当脚本运行时,它将逐一遍历“images”目录中的所有图像并在屏幕上显示它们。当显示图像时,点击图像中的人物头部,然后按 Enter 键移至下一张图像。
在自定义数据集上进行测试
一旦您的数据集准备就绪,加载您的训练模型并进行测试。以下是我测试后获得的一些结果
我们的模型表现良好。请注意,这些只是一些“好的”结果。您的结果可能略有不同。
下一步是什么?
在本文中,我们学习了估算图像中人数的方法。您也可能会遇到一些非常糟糕的结果,但我将把模型的精调留给您。此外,这里获得的密度图还可以进一步馈送到全连接网络,以获得更准确的队列人数预测。
在 本系列文章的下一篇中,我们将比较从头开始训练模型与使用 YOLO 等更高级的预训练方法。