65.9K
CodeProject 正在变化。 阅读更多。
Home

人脸识别的 Web API

2021年7月30日

CPOL

3分钟阅读

viewsIcon

18672

在本文中,我们将把人脸识别模型封装在一个简单的 Web API 中,在树莓派上创建一个客户端应用程序,并运行客户端-服务器系统。

引言

人脸识别是人工智能 (AI) 的一个领域,在过去的十年中,深度学习 (DL) 的现代方法取得了巨大成功。最好的人脸识别系统可以像人类一样,甚至比人类更精确地识别图像和视频中的人物。

我们关于这个系列文章分为两部分

  • 人脸检测,客户端应用程序在图像或视频源中检测人脸,对检测到的人脸图像进行对齐,并将它们提交给服务器。
  • 人脸识别(本部分),服务器端应用程序执行人脸识别。

我们假设您熟悉 DNN、Python、Keras 和 TensorFlow。欢迎下载此项目代码以进行学习。

在之前的文章中,我们学习了如何 在树莓派设备上使用 MTCNN 库检测人脸 以及如何 使用 FaceNet 模型识别人脸。在本文中,我们将看到如何在简单的 Web 客户端-服务器系统中使用这些组件。

客户端应用程序

让我们从在边缘设备上运行的客户端开始。首先,我们为将人脸图像发送到服务器的简单类编写代码

class ImgSend:
    def __init__(self, host, port, debug_mode=False):
        self.host = host
        self.port = port
        self.url = host+":"+str(port)+"/api/faceimg"
        self.dbg_mode = debug_mode
    
    def send(self, img):
        (_, encoded) = cv2.imencode(".png", img)
        
        data = encoded.tostring()
        headers = { "content-type": "image/png" }
        if self.dbg_mode:
            print("Sending request... ")
            #print(data)
        t1 = time.time()
        response = requests.post(self.url, data=data, headers=headers)
        t2 = time.time()
        dt = t2-t1
        if self.dbg_mode:
            print("Request processed: "+str(dt)+" sec")
        
        result = json.loads(response.text)
        
        return result

构造函数接收 hostport 参数,并使用特殊的路径 /api/faceimg 形成最终 URL,以将请求路由到人脸识别方法。在 send 方法中,我们将图像编码为 png 格式,将其转换为字符串,然后使用 requests.post 函数将该字符串发送到服务器。

我们还必须修改本文中描述的(参考“树莓派上的人脸检测”部分)人脸检测器。

class VideoWFR:    
    def __init__(self, detector, sender):
        self.detector = detector
        self.sender = sender
    
    def process(self, video, align=False, save_path=None):
        detection_num = 0;
        rec_num = 0
        capture = cv2.VideoCapture(video)
        img = None

        dname = 'AI face recognition'
        cv2.namedWindow(dname, cv2.WINDOW_NORMAL)
        cv2.resizeWindow(dname, 960, 720)
        
        frame_count = 0
        dt = 0
        if align:
            fa = Face_Align_Mouth(160)
            
        # Capture all frames
        while(True):    
            (ret, frame) = capture.read()
            if frame is None:
                break
            frame_count = frame_count+1
            
            t1 = time.time()
            faces = self.detector.detect(frame)
            f_count = len(faces)
            detection_num += f_count
            
            names = None
            if (f_count>0) and (not (self.sender is None)):
                names = [None]*f_count
                for (i, face) in enumerate(faces):
                    if align:
                        (f_cropped, f_img) = fa.align(frame, face)
                    else:
                        (f_cropped, f_img) = self.detector.extract(frame, face)
                    if (not (f_img is None)) and (not f_img.size==0):
                        response = self.sender.send(f_img)
                        is_recognized = response["message"]=="RECOGNIZED"
                        print(response["message"])
                        if is_recognized:
                            print(response["name"]+": "+response["percent"])
                        
                        if is_recognized:
                            rec_num += 1
                            name = response["name"]
                            percent = int(response["percent"])
                            conf = percent*0.01
                            names[i] = (name, conf)
                            if not (save_path is None):
                                ps = ("%03d" % rec_num)+"_"+name+"_"+("%03d" % percent)+".png"
                                ps = os.path.join(save_path, ps)
                                cv2.imwrite(ps, f_img)
                        
            t2 = time.time()
            dt = dt + (t2-t1)
                    
            if len(faces)>0:
                Utils.draw_faces(faces, (0, 0, 255), frame, True, True, names)
            
            # Display the resulting frame
            cv2.imshow(dname,frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        capture.release()
        cv2.destroyAllWindows()
        
        if dt>0:
            fps = detection_num/dt
        else:
            fps = 0
        
        return (detection_num, rec_num, fps)

现在我们有了一个人脸识别器,而不仅仅是一个人脸检测器。它包括一个内部 MTCNN 检测器和一个图像发送器。当检测到人脸时,它将被发送到服务器。当从服务器收到响应时,它会被解析并保存到指定的文件夹中。

服务器端应用程序

让我们转到服务器应用程序。我们使用 Flask 微框架将我们的人脸识别代码封装成 Web API

import flask
from flask import Flask, request, Response

print(flask.__version__)

# Initialize the Flask application
app = Flask(__name__)

rec = None
f_db = None
rec_data = None
save_path = None

@app.route("/api/faceimg", methods=['POST'])
def test():
    response = {}
    r_status = 200
    r = request
    
    print("Processing recognition request... ")
    t1 = time.time()
    nparr = np.fromstring(r.data, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    embds = rec.embeddings(img)
    data = rec.recognize(embds, f_db)
    t2 = time.time()
    dt = t2-t1
    print("Recognition request processed: "+str(dt)+" sec")
    
    rec_data.count()
    ps = ""
    info = ""
    if not (data is None):
        (name, dist, p_photo) = data
        conf = 1.0 - dist
        percent = int(conf*100)
        info = "Recognized: "+name+" "+str(conf)
        ps = ("%03d" % rec_data.get_count())+"_"+name+"_"+("%03d" % percent)+".png"
        response = { "message": "RECOGNIZED",
                     "name": name,
                     "percent": str(percent) }
    else:
        info = "UNRECOGNIZED"
        ps = ("%03d" % rec_data.get_count())+"_unrecognized"+".png"
        response = { "message": "UNRECOGNIZED" }
    
    print(info)
    
    if not (save_path is None):
       ps = os.path.join(save_path, ps)
       cv2.imwrite(ps, img)
        
    # encode response using jsonpickle
    response_pickled = jsonpickle.encode(response)
    return Response(response=response_pickled, status=r_status, mimetype="application/json")

初始化 Flask 应用程序后,我们应用 route 装饰器以使用指定的 URL(与客户端应用程序使用的 URL 相同)触发 test 方法。在这种方法中,我们解码收到的 PNG 人脸图像,获取嵌入,识别面部,并将响应发送回客户端。

在容器中运行系统

最后,这是运行我们的 Web 应用程序的代码

if __name__ == "__main__":
    host = str(sys.argv[1])
    port = int(sys.argv[2])

    # FaceNet recognizer
    m_file = r"/home/pi_fr/net/facenet_keras.h5"
    rec = FaceNetRec(m_file, 0.5)
    rec_data = RecData()
    print("Recognizer loaded.")
    print(rec.get_model().inputs)
    print(rec.get_model().outputs)
    
    # Face DB 
    save_path = r"/home/pi_fr/rec"
    db_path = r"/home/pi_fr/db"
    f_db = FaceDB()
    f_db.load(db_path, rec)
    db_f_count = len(f_db.get_data())
    print("Face DB loaded: "+str(db_f_count))
    
    print("Face recognition running")
          
    #host = "0.0.0.0"
    #port = 50
    app.run(host=host, port=port, threaded=False)

由于我们将在创建的 Docker 容器 中运行 Web 应用程序(参考上一部分),因此我们需要使用适当的网络设置启动此容器。使用以下命令从镜像创建一个新容器

c:\>docker network create my-net
c:\>docker create --name FR_2 --network my-net --publish 5050:50 sergeylgladkiy/fr:v1

FR_2 容器启动时,它将主机机器的端口 5050 转发到容器的内部端口 50。

现在我们可以在容器中运行应用程序(请注意,由于它在容器内部,我们指定内部端口 50)

# python /home/pi_fr/pi_fr_facenet.run_align_dock_flask.lnx.py 0.0.0.0 50

当服务器启动时,我们可以在树莓派设备上运行客户端。这是我们用来启动应用程序的代码

if __name__ == "__main__":
    #v_file = str(sys.argv[1])
    #host = str(sys.argv[2])
    #port = int(sys.argv[3])
    
    v_file = r"/home/pi/Desktop/PI_FR/video/5_2.mp4"
    host = "http://192.168.2.135"
    port = 5050
    
    # Video Web recognition 
    save_path = r"/home/pi/Desktop/PI_FR/rec"
    d = MTCNN_Detector(50, 0.95)
    sender = ImgSend(host, port, True)
    vr = VideoWFR(d, sender)

    (f_count, rec_count, fps) = vr.process(v_file, True, save_path)

    print("Face detections: "+str(f_count))
    print("Face recognitions: "+str(rec_count))
    print("FPS: "+str(fps))

请注意,客户端使用主机机器的 IP 和端口(5050),而不是容器的 IP 和内部端口号(50)。

以下两个视频显示了我们的客户端-服务器系统是如何工作的

正如您所看到的,识别请求的处理速度非常快;它只用了大约 0.07 秒。这是正确系统架构的证明。客户端仅将裁剪和对齐的检测到的人脸图像发送到服务器,从而减少了网络负载,而识别算法在强大的服务器计算机上运行。

后续步骤

在本系列的 下一篇文章 中,我们将展示如何在 Kubernetes 上运行人脸识别服务器。敬请关注!

© . All rights reserved.