65.9K
CodeProject 正在变化。 阅读更多。
Home

AI 队列长度检测:计算区域内人数

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2020 年 10 月 28 日

CPOL

5分钟阅读

viewsIcon

9792

downloadIcon

244

在本文中,我们将训练一个深度学习模型来检测和计算给定区域内的人数。

之前,我们实现了 R-CNN 进行对象检测。虽然这些对象检测算法在检测人脸时效果很好,但当目标对象不清晰时,它们的效果就不佳了。此外,由于它使用了滑动窗口技术,搜索会变得详尽,从而影响性能。在本文中,我们将学习如何实现深度神经网络,利用密度映射来估算人群或队列中的人数。

我们将使用 ShangaiTech 数据集。该数据集分为两部分。对于本文,我们将仅使用 B 部分来训练我们的人群检测模型,然后在其自定义数据集上进行测试。您可以选择使用任一部分;代码对两者都能正常工作。

让我们开始导入所需的库。

import os
import cv2
import csv
import math
import random
import numpy as np
from scipy.io import loadmat
from keras import backend as K
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from keras.callbacks import ModelCheckpoint
from keras.models import load_model, load_model, Model
from keras.layers import Conv2D, MaxPooling2D, Concatenate, Input

预处理输入数据

我们的数据集包含两个子目录:test_data 和 train_data。这两个目录都包含图像及其对应的真实标签。我们无法以原始格式使用数据,因此需要进行一些预处理。由于我们将使用“按密度计数 CNN”方法,因此真实标签数据也需要是密度图。在这里,我们将尝试从给定的真实标签文件中计算出真实密度图。

首先,让我们定义生成输入图像密度图的函数。

def get_density_map(image, points):
    image_density = np.zeros_like(image, dtype=np.float64)
    height, width = image_density.shape
    if points is None:
        return image_density
    if points.shape[0] == 1:
        x1 = max(0, min(width-1, round(points[0, 0])))
        y1 = max(0, min(height-1, round(points[0, 1])))
        image_density[y1, x1] = 255
        return image_density
    for j in range(points.shape[0]):
        frame_size = 15
        sigma = 4.0
        Height = np.multiply(cv2.getGaussianKernel(frame_size, sigma), (cv2.getGaussianKernel(frame_size, sigma)).T)
        x = min(width-1, max(0, abs(int(math.floor(points[j, 0])))))
        y = min(height-1, max(0, abs(int(math.floor(points[j, 1])))))
        if x >= width or y >= height:
            continue
        x1 = x - frame_size//2 + 0
        y1 = y - frame_size//2 + 0
        x2 = x + frame_size//2 + 1
        y2 = y + frame_size//2 + 1
        dfx1, dfy1, dfx2, dfy2 = 0, 0, 0, 0
        change_Height = False
        if x1 < 0:
            dfx1 = abs(x1) + 0
            x1 = 0
            change_Height = True
        if y1 < 0:
            dfy1 = abs(y1) + 0
            y1 = 0
            change_Height = True
        if x2 > width:
            dfx2 = x2 - width
            x2 = width
            change_Height = True
        if y2 > height:
            dfy2 = y2 - height
            y2 = height
            change_Height = True
        x1h, y1h, x2h, y2h = 1 + dfx1, 1 + dfy1, frame_size - dfx2, frame_size - dfy2
        if change_Height is True:
            Height = np.multiply(cv2.getGaussianKernel(y2h-y1h+1, sigma), (cv2.getGaussianKernel(x2h-x1h+1, sigma)).T)
        image_density[y1:y2, x1:x2] += Height
 
    return image_density

现在,我们可以创建我们的测试和验证数据了。指定输入图像文件的目录、输入真实标签文件的目录、测试和验证图像、以及标签和输出路径。

input_images_path = ''.join(['./ShanghaiTech/part_B/train_data/images/'])
output_path = './ShanghaiTech/processed_trainval/'
 
training_images_path = ''.join((output_path, '/training_images/'))
training_densities_path = ''.join((output_path, '/training_densities/'))
validation_images_path = ''.join((output_path, '/validation_images/'))
validation_densities_path = ''.join((output_path, '/valalidation_densities/'))
 
ground_truth_path = ''.join(['./ShanghaiTech/part_B/train_data/ground-truth/'])
 
for i in [output_path, training_images_path, training_densities_path, validation_images_path, validation_densities_path]:
	if not os.path.exists(i):
    	os.makedirs(i)

现在,我们将遍历所有训练图像并计算它们的密度图。我们将使用真实标签文件为每个图像文件单独计算密度图,并将其保存为相应的 csv 文件。

seed = 95461354
random.seed(seed)
 
n = 400
 
val_test_num = math.ceil(n*0.1)
indices = list(range(1, n+1))
random.shuffle(indices)
 
for idx in range(1, n+1):
    i = indices[idx-1]
    image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
    input_image = ''.join((input_images_path, 'IMG_',str(i), '.jpg'))
    img = cv2.imread(input_image, 0)
    height, width = img.shape
    new_width, new_height = width / 8, height / 8
    new_width, new_height = int(new_width / 8) * 8, int(new_height / 8) * 8
    annotation_Points =  image_info[0][0][0][0][0] - 1
    if width <= new_width * 2:
        img = cv2.resize(img, [h, new_width*2+1], interpolation=cv2.INTER_LANCZOS4)
        annotation_Points[:, 0] = annotation_Points[:, 0] * 2 * new_width / width
    if height <= new_height * 2:
        img = cv2.resize(img, [new_height*2+1, w], interpolation=cv2.INTER_LANCZOS4)
        annotation_Points[:, 1] = annotation_Points[:,1] * 2 * new_height / height
    height, width = img.shape
    x_width, y_width = new_width + 1, width - new_width
    x_height, y_height = new_height + 1, height - new_height
 
    image_density = get_density_map(img, annotation_Points)
    for j in range(1, 10):
 
        x = math.floor((y_width - x_width) * random.random() + x_width)
        y = math.floor((y_height - x_height) * random.random() + x_height)
        x1, y1 = x - new_width, y - new_height
        x2, y2 = x + new_width - 1, y + new_height - 1
        base_image = im[y1-1:y2, x1-1:x2]
        base_image_density = image_density[y1-1:y2, x1-1:x2]
        base_image_annPoints = annotation_Points[
            list(
                set(np.where(np.squeeze(annotation_Points[:,0]) > x1)[0].tolist()) &
                set(np.where(np.squeeze(annotation_Points[:,0]) < x2)[0].tolist()) &
                set(np.where(np.squeeze(annotation_Points[:,1]) > y1)[0].tolist()) &
                set(np.where(np.squeeze(annotation_Points[:,1]) < y2)[0].tolist())
            )
        ]
 
        base_image_annPoints[:, 0] = base_image_annPoints[:, 0] - x1
        base_image_annPoints[:, 1] = base_image_annPoints[:, 1] - y1
        img_idx = ''.join((str(i), '_',str(j)))
 
        if idx < val_test_num:
            cv2.imwrite(''.join([validation_images_path, img_idx, '.jpg']), base_image)
            with open(''.join([validation_densities_path, img_idx, '.csv']), 'w', newline='') as output:
                writer = csv.writer(output)
                writer.writerows(base_image_density)
        else:
            cv2.imwrite(''.join([training_images_path, img_idx, '.jpg']), base_image)
            with open(''.join([training_densities_path, img_idx, '.csv']), 'w', newline='') as output:
                writer = csv.writer(output)
                writer.writerows(base_image_density)
print("Successfully processed files!")

遵循相同的模式,我们也需要处理我们的测试数据。

images_path = ''.join(['./ShanghaiTech/part_B/test_data/images/'])
ground_truth_path = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth/'])
ground_truth_csv = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth_csv/'])
 
n = 316
 
for i in range(1, n+1):
    image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
    input_img  = ''.join((images_path, 'IMG_', str(i), '.jpg'))
    img = cv2.imread(input_img, 0)
    annotationPoints =  image_info[0][0][0][0][0] - 1
    image_density = get_density_map(img, annotationPoints)
    with open(''.join([ground_truth_csv, 'IMG_', str(i), '.csv']), 'w', newline='') as output:
        writer = csv.writer(output)
        writer.writerows(image_density)
print("Successfully processed files!")

训练模型

完成上述步骤后,我们的数据就准备好了,可以加载它来训练我们的模型了。我们现在将定义一个函数来加载图像和标签(基于数据)。

def x_y_generator(images_path, labels_path, batch_size=64):
    break_point = 0
    t = 0
    images_path = np.squeeze(images_path).tolist() if isinstance(images_path, np.ndarray) else images_path
    labels_path = np.squeeze(labels_path).tolist() if isinstance(labels_path, np.ndarray) else labels_path
    data_length = len(labels_path)
    while True:
        if not break_point:
            x = []
            y = []
            inner_iteration = batch_size
        else:
            t = 0
            inner_iteration = batch_size - data_length % batch_size
        for i in range(inner_iteration):
            if t >= data_length:
                break_point = 1
                break
            else:
                break_point = 0
            img = (cv2.imread(images_path[t], 0) - 127.5) / 128
            density_map = np.loadtxt(labels_path[t], delimiter=',')
            std = 4
            quarter_den = np.zeros((np.asarray(density_map.shape).astype(int)//std).tolist())
            for r in range(quarter_den.shape[0]):
                for c in range(quarter_den.shape[1]):
                    quarter_den[r, c] = np.sum(density_map[r*std:(r+1)*std, c*std:(c+1)*std])
            x.append(img.reshape(*img.shape, 1))
            y.append(quarter_den.reshape(*quarter_den.shape, 1))
            t += 1
        if not break_point:
            x, y = np.asarray(x), np.asarray(y)
            yield x, y

我们可以使用以下函数来读取我们的训练、验证和测试数据。

# read training data
train_generator = x_y_generator(train_paths, train_labels, batch_size=len(train_paths))
training_img, train_labels = train_generator.__next__()
 
# read validation data
validation_generator = x_y_generator(validation_paths, validation_labels, batch_size=len(validation_paths))
validating_img, validation_labels = validation_generator.__next__()
 
# read test data
test_generator = x_y_generator(test_paths, test_labels, batch_size=len(test_paths))
testing_img, test_labels = test_generator.__next__()

我们的数据已经准备就绪,现在可以定义我们的神经网络了。我们将实现一个多列卷积神经网络。它包含三列不同滤波器大小的卷积神经网络。其思想是将图像作为输入馈送到我们的神经网络,并输出具有总体人群计数的密度图。由于这三列对应于不同的滤波器大小,因此每个 CNN 列学习的特征能够适应人员大小的变化,并且可以轻松地用于拥挤的地方或队列中。

def Multi_Column_CNN(input_shape=None):
    inputs = Input(shape=input_shape)
 
    # first column 
    conv_1 = Conv2D(16, (9, 9), padding='same', activation='relu')(inputs)
    conv_1 = MaxPooling2D(2)(conv_1)
    conv_1 = (conv_1)
    conv_1 = Conv2D(32, (7, 7), padding='same', activation='relu')(conv_1)
    conv_1 = MaxPooling2D(2)(conv_1)
    conv_1 = Conv2D(16, (7, 7), padding='same', activation='relu')(conv_1)
    conv_1 = Conv2D(8, (7, 7), padding='same', activation='relu')(conv_1)
 
    # second column 
    conv_2 = Conv2D(20, (7, 7), padding='same', activation='relu')(inputs)
    conv_2 = MaxPooling2D(2)(conv_2)
    conv_2 = (conv_2)
    conv_2 = Conv2D(40, (5, 5), padding='same', activation='relu')(conv_2)
    conv_2 = MaxPooling2D(2)(conv_2)
    conv_2 = Conv2D(20, (5, 5), padding='same', activation='relu')(conv_2)
    conv_2 = Conv2D(10, (5, 5), padding='same', activation='relu')(conv_2)
 
    # third column 
    conv_3 = Conv2D(24, (5, 5), padding='same', activation='relu')(inputs)
    conv_3 = MaxPooling2D(2)(conv_3)
    conv_3 = (conv_3)
    conv_3 = Conv2D(48, (3, 3), padding='same', activation='relu')(conv_3)
    conv_3 = MaxPooling2D(2)(conv_3)
    conv_3 = Conv2D(24, (3, 3), padding='same', activation='relu')(conv_3)
    conv_3 = Conv2D(12, (3, 3), padding='same', activation='relu')(conv_3)
 
    # merge feature map of third column in last dimension and get density map
    conv_merge = Concatenate(axis=-1)([conv_1, conv_2, conv_3])
    # getting density map as output
    density_map = Conv2D(1, (1, 1), padding='same')(conv_merge)
 
    model = Model(inputs=inputs, outputs=density_map)
    return model

在我们的模型就绪后,我们也定义一些指标来衡量模型的性能。我们将使用标准均方误差和平均绝对误差。

def mean_absolute_error(labels, predictions):
    return K.sum(K.abs(labels - predictions)) / 1
 
def mean_square_error(labels, predictions):
    return K.sum(K.square(labels - predictions)) / 1

现在,让我们训练我们的模型。我们还将使用 Keras 的 ModelCheckpoint 来节省计算资源,并且仅为训练和验证保存最佳模型。

best_validation = ModelCheckpoint(
    filepath= 'mcnn_val.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode='min'
)
best_training = ModelCheckpoint(
    filepath= 'mcnn_train.hdf5', monitor='loss', verbose=1, save_best_only=True, mode='min'
)
 
input_shape = (None, None, 1)
model = Multi_Column_CNN(input_shape)
model.compile(loss='mean_squared_error', optimizer='adam', metrics=[mean_absolute_error, mean_square_error])
history = model.fit(
    x=training_img, y=train_labels, batch_size=1, epochs=100,
    validation_data=(validating_img, validation_labels),
    callbacks=[best_validation, best_training]
)

模型训练所需的时间取决于您使用的资源。模型训练完成后,您可以进行测试。

测试模型

作为基本的测试,我们可以绘制训练数据和验证数据的损失。

val_loss, loss = history.history['val_loss'], history.history['loss']
loss = np.asarray(loss)
plt.plot(loss, 'b')
plt.legend(['loss'])
plt.show()
plt.plot(val_loss, 'r')
plt.legend(['val_loss'])
plt.show()

我们的训练模型显示了以下损失图

损失图看起来不错,但让我们获取图像上的预测,看看我们的模型是否能准确地计算出图像中的人数。

from keras import models
#load the trained model
model = models.load_model('./ShanghaiTech/part_B/weights/mcnn_val.hdf5', custom_objects={'mean_absolute_error': mean_absolute_error, 'mean_square_error': mean_square_error })
absolute_error = []
squared_error = []
# specifying the number of test to run
num_test = 50
for i in range(testing_img.shape[0])[:num_test]:
    inputs = np.reshape(testing_img[i], [1, *testing_img[i].shape[:2], 1])
    outputs = np.squeeze(model.predict(inputs))
    density_map = np.squeeze(test_labels[i])
    count = np.sum(density_map)
    prediction = np.sum(outputs)
    fg, (ax0, ax1) = plt.subplots(1, 2, figsize=(16, 5))
	# plotting the density maps along with predicted count
    plt.suptitle(' '.join([
        'count:', str(round(count, 2)),
        'prediction:', str(round(prediction, 2))
    ]))
    ax0.imshow(np.squeeze(inputs))
    ax1.imshow(density_map * (255 / (np.max(density_map) - np.min(density_map))))
    plt.show()
    absolute_error.append(abs(count -  prediction))
    square_error.append((count -  prediction) ** 2)
mean_absolute_error = np.mean(absolute_error)
mean_square_error = np.mean(square_error)
print('mean_absolute_error:', mean_absolute_error, 'mean_square_error:', mean_square_error)

这里有一些(不错的)预测结果

在现阶段,我们的模型表现良好,但它在计算队列中的人数方面表现如何呢?没有可用的开源数据集来专门训练和测试队列长度模型,因此我们需要生成自己的数据集。

创建自定义队列长度数据集

牢记基础知识,我们只需要一些图像及其对应的真实标签来组成数据集。我们可以直接从 Google 搜索中收集图像。这很容易,对吧?但是如何生成真实标签文件呢?有各种工具可用于标注图像,包括基于 Web 的边界框标注器、头部标注器,或云供应商(如 AWS SageMaker)提供的一些专用工具。您可以选择任何您想要生成真实标签文件的工具。我在这里坚持最基本的方法,并使用 MATLAB 生成真实标签。为了使用 MATLAB 生成真实标签文件,请将您的图像保存在一个名为“images”的目录中,并运行以下脚本

filePath = fullfile('images', '/*.jpg');
ImageFiles = dir(filePath);
n = length(ImageFiles)
read_images_path = 'images/';
store_gt_path = 'ground-truth/';
t = 0;                      	%number of files initially in training set
 
for i=1:n
   	# read image files
	img = imread([read_path 'IMG_' num2str(i+t) '.jpg']);
# resize image files
	img = imresize(im, [768 1024]);
	imwrite(img,[read_images_path 'IMG_' num2str(i+t) '.jpg'], 'jpg');
	figure
   	# show image on screen
	imshow(img)
	[x,y] = getpts;
	image_info{1,1}.location = [x y];
	image_info{1,1}.number = size(x,1);
	save([store_gt_path 'GT_IMG_' num2str(t+i) '.mat'], 'image_info')
	close
end

当脚本运行时,它将逐一遍历“images”目录中的所有图像并在屏幕上显示它们。当显示图像时,点击图像中的人物头部,然后按 Enter 键移至下一张图像。

在自定义数据集上进行测试

一旦您的数据集准备就绪,加载您的训练模型并进行测试。以下是我测试后获得的一些结果

 

我们的模型表现良好。请注意,这些只是一些“好的”结果。您的结果可能略有不同。

下一步是什么?

在本文中,我们学习了估算图像中人数的方法。您也可能会遇到一些非常糟糕的结果,但我将把模型的精调留给您。此外,这里获得的密度图还可以进一步馈送到全连接网络,以获得更准确的队列人数预测。

本系列文章的下一篇中,我们将比较从头开始训练模型与使用 YOLO 等更高级的预训练方法。

© . All rights reserved.