import time
import warnings
import functools
import base64
from PIL import Image
import numpy as np
import sqlite3
from io import BytesIO
from flaskr.facenet_keras import (
face_encodings
)
import click
import time
from flask import (
Blueprint, flash, g, redirect, render_template, request, session, url_for
)
from werkzeug.security import check_password_hash, generate_password_hash
from flaskr.db import get_db, init_db
bp = Blueprint('admin', __name__, url_prefix='/admin')
@bp.route('/')
def root() :
return redirect('/admin/index')
@bp.route('/index')
def index():
# session 中没有记录,重定向到登陆页面
if session.get('is_admin') is None :
return render_template('/admin/login.html')
# session 中的权限不足,给出提示并重定向
if session.get('is_admin') < 1 :
return render_template('/showmsg.html', title='无权访问!', msg='权限不足,请重新登陆。', jump='/admin/login')
db = get_db()
usertab = db.execute(
'SELECT id, username, is_admin, last_punch FROM user'
)
js_code = 'let datas = ' \
+ str(np.load('face_features.npy').tolist()) + ';'
try :
return render_template('admin/index.html',
username=session['username'], usertab=usertab, js_code=js_code)
# 当 session 不存在 'username' 字段时,重定向到登陆页面
except KeyError :
session.clear()
return redirect('/admin/login')
# 登陆页面
@bp.route('/login', methods=('GET', 'POST'))
def login() :
return render_template('/admin/login.html')
# 发送登陆请求
@bp.route('/login/login', methods=('GET', 'POST'))
def login_login():
if request.method == 'POST' :
session.clear()
username = request.form['username']
password = request.form['password']
db = get_db()
try :
user = db.execute(
'SELECT * FROM user WHERE username = ?', (username,)
).fetchone()
except sqlite3.OperationalError :
init_db()
return render_template('/showmsg.html', title='登陆失败!', msg='数据库尚未初始化,请稍后再试。', jump='/admin/login')
if user is None:
return render_template('/showmsg.html', title='登陆失败!', msg='用户不存在,请检查后重试。', jump='/admin/login')
elif not check_password_hash(user['password'], password):
return render_template('/showmsg.html', title='登陆失败!', msg='密码错误。', jump='/admin/login')
elif not user['is_admin'] >= 1 :
return render_template('/showmsg.html', title='登陆失败!', msg='权限不足。', jump='/admin/login')
session['id'] = user['id']
session['username'] = user['username']
session['is_admin'] = user['is_admin']
return redirect('/admin/index')
return render_template('/showmsg.html', title='登陆失败!', msg='request.method 应为 POST。', jump='/admin/login')
# 删除人脸数据
@bp.route('/delete', methods=('GET', 'POST'))
def delete() :
if request.method == 'POST' :
idx = request.json['idx']
db = get_db()
user = db.execute(
'SELECT * FROM user'
).fetchall()
features = np.load('face_features.npy')
features = np.delete(features, idx, axis=0)
np.save('face_features.npy', features)
# 数据库中的 id 到 np.ndarray 中的 index 的转换
idx = user[idx]['id']
db.execute(
'DELETE FROM user WHERE id = ?', (idx,))
db.commit()
return '{"msg" : "delete success"}'
return render_template('/showmsg.html', title='删除失败!', msg='request.method 应为 POST。', jump='/admin/index')
# 登出
@bp.route('/logout', methods=('GET', 'POST'))
def logout():
session.clear()
return redirect('/admin/login')
@bp.before_app_request
def load_logged_in_user():
is_admin = session.get('is_admin')
if is_admin is None:
g.is_admin = None
else:
g.is_admin = is_admin
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.is_admin is None or g.is_admin <= 0:
return redirect('/admin/login')
return view(**kwargs)
return wrapped_view
# 注册新用户
@bp.route('/register', methods=('GET', 'POST'))
@login_required
def register() :
return render_template('/admin/register.html')
# 主页页面的基本信息
@bp.route('/register/basic_info', methods=('GET', 'POST'))
@login_required
def register_basic_info() :
if request.method == 'POST' :
session['basic_info'] = {
'username' : request.form['username'],
'is_admin' : request.form['is_admin'],
# 不储存密码明文
'password' : generate_password_hash(request.form['password']),
# 当前日期与时间
'last_punch' : time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
}
return render_template('/admin/face.html')
return render_template('/showmsg.html', title='注册失败!', msg='request.method 应为 POST。', jump='/admin/register')
@bp.before_app_request
def load_basic_info() :
basic_info = session.get('basic_info')
if basic_info is None :
g.basic_info = None
else :
g.basic_info = basic_info
def basic_info_registered(view) :
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.basic_info is None :
return redirect('/admin/index')
return view(**kwargs)
return wrapped_view
# 注册页面的人脸录入
@bp.route('/register/recognize', methods=('GET', 'POST'))
@login_required
@basic_info_registered
def register_recognize():
if request.method == 'POST' :
data = request.json['data']
string_b64 = request.json['imgbase64']
if not data or not string_b64 :
return render_template('/showmsg.html', title='注册失败!', msg='图片解析错误,请检查后重试。',
jump='/admin/index')
img_data = base64.b64decode(string_b64[22:])
img = Image.open(BytesIO(img_data)).convert('RGB')
img = np.asarray(img)
try :
feature = face_encodings(img)
except IndexError :
return render_template('/showmsg.html', title='检测失败!', msg='未检测到人脸,请重试。',
jump='/admin/index')
feature = np.expand_dims(feature, axis=0)
features = np.load('face_features.npy')
features = np.concatenate([features, feature])
np.save('face_features.npy', features)
db = get_db()
db.execute(
"INSERT INTO user (username, password, is_admin, last_punch) \
VALUES (?, ?, ?, ?)", (
session['basic_info']['username'],
session['basic_info']['password'],
session['basic_info']['is_admin'],
session['basic_info']['last_punch'],))
try :
db.commit()
except sqlite3.OperationalError :
return render_template('/showmsg.html', title='注册失败!', msg='数据库被锁,请稍后再试。',
jump='/admin/index')
return render_template('/showmsg.html', title='注册成功!', msg='您的账户已经注册成功。',
jump='/admin/index')
return render_template('/showmsg.html', title='注册失败!', msg='request.method 应为 POST。',
jump='/admin/index')
# 修改密码
@bp.route('/change_password', methods=('GET', 'POST'))
@login_required
def change_password() :
return render_template('admin/change_password.html')
# 修改密码的请求
@bp.route('/change_password/change_password', methods=('GET', 'POST'))
def change_password_change_password() :
if request.method == 'POST' :
idx = session['id']
password = request.form['password']
db =