更新 APP/auth.py

This commit is contained in:
wangwei 2024-10-14 15:37:51 +08:00
parent be8628dceb
commit 342a6c4f99

View File

@ -1,356 +1,407 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from flask_login import login_user, logout_user, login_required, current_user
from app import db
from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory
from urllib.parse import urlparse # 替换 werkzeug.urls import
import traceback
import random
import string
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \
verify_jwt_in_request
from functools import wraps
from datetime import datetime, timedelta
bp = Blueprint('auth', __name__)
@bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@bp.route('/register', methods=['POST'])
def register():
current_app.logger.info('Received registration request')
data = request.get_json()
current_app.logger.info(f'Registration data: {data}')
email = data.get('email')
if email:
existing_user = User.find_by_username_or_email(email)
if existing_user:
return jsonify({'message': '该邮箱已被注册'}), 400
required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender']
missing_fields = [field for field in required_fields if field not in data or not data[field]]
if missing_fields:
current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}')
return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400
try:
# 生成用户名和密码
username = generate_username(data['first_name'], data['last_name'])
password = generate_password()
current_app.logger.info(f'Generated username: {username}')
user = User(username=username)
user.set_password(password)
db.session.add(user)
# 设置用户详细信息
for field in UserField.query.all():
if field.name in data:
user.set_detail(field.name, data[field.name])
current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}')
# 设置新用户标记
user.login_info.is_new_user = True
user.login_info.has_changed_initial_password = False
db.session.commit()
current_app.logger.info(f'User {username} registered successfully')
return jsonify({
'message': '注册成功',
'username': username,
'password': password
}), 201
except Exception as e:
current_app.logger.error(f'Registration error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500
def generate_username(first_name, last_name):
base_username = f"{first_name.lower()} {last_name.lower()}"
username = base_username
counter = 1
while User.query.filter_by(username=username).first():
username = f"{base_username} {counter}"
counter += 1
return username
def generate_password():
characters = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(characters) for i in range(16))
@bp.route('/login', methods=['POST'])
def login_api():
current_app.logger.info('Received login request')
data = request.get_json()
if not data:
current_app.logger.warning('Invalid request data')
return jsonify({'message': '无效的请求数据'}), 400
username_or_email = data.get('username')
password = data.get('password')
current_app.logger.info(f'Login attempt for user: {username_or_email}')
if not username_or_email or not password:
current_app.logger.warning('Missing username/email or password')
return jsonify({'message': '缺少用户名/邮箱或密码'}), 400
try:
user = User.find_by_username_or_email(username_or_email)
if user is None:
current_app.logger.warning(f'User not found: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
if not user.check_password(password):
current_app.logger.warning(f'Invalid password for user: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
current_app.logger.info(f'User authenticated: {username_or_email}')
current_app.logger.info(f'Is new user: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}')
# 检查是否是新用户或未更改初始密码
if user.login_info.is_new_user or not user.login_info.has_changed_initial_password:
current_app.logger.info(f'New user or initial password not changed: {username_or_email}')
return jsonify({
'message': '首次登录,请修改密码',
'require_password_change': True
}), 403
# 检查密码是否过期
if user.needs_password_change():
current_app.logger.info(f'Password expired for user: {username_or_email}')
return jsonify({
'message': '密码已过期,请修改密码',
'require_password_change': True
}), 403
login_user(user)
user.login_info.update_login_info()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'User {user.username} logged in successfully')
# 获取用户详细信息
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()], # 添加这行
'details': user_details,
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify({
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user_info': user_info
}), 200
except Exception as e:
current_app.logger.error(f'Login error: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '登录失败,请稍后再试'}), 500
@bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
access_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': access_token}), 200
@bp.route('/logout', methods=['POST'])
@login_required
def logout_api():
current_app.logger.info(f'User {current_user.username} logged out')
logout_user()
return jsonify({'message': '登出成功'}), 200
@bp.route('/profile', methods=['GET'])
@login_required
def profile():
current_app.logger.info(f'Profile requested for user {current_user.username}')
try:
profile_data = {
'username': current_user.username,
'login_info': {
'register_time': current_user.login_info.register_time.isoformat(),
'login_count': current_user.login_info.login_count,
'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None,
'is_new_user': current_user.login_info.is_new_user
}
}
current_app.logger.info(f'Profile data retrieved for user {current_user.username}')
return jsonify(profile_data)
except Exception as e:
current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500
@bp.route('/change_password', methods=['POST'])
@jwt_required()
def change_password():
current_app.logger.info('Received change password request')
current_user_id = get_jwt_identity()
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
try:
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
current_app.logger.info(f'Password changed successfully for user {user.username}')
return jsonify({'message': '密码修改成功'}), 200
except Exception as e:
current_app.logger.error(f'Password change error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': '密码修改失败,请稍后再试'}), 500
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
# 验证 JWT token
verify_jwt_in_request()
# 获取当前用户的身份通常是用户ID
current_user_id = get_jwt_identity()
# 从数据库中获取用户对象
current_user = User.query.get(current_user_id)
if not current_user:
return jsonify({'message': '找不到用户'}), 404
except Exception as e:
return jsonify({'message': '无效的令牌'}), 401
# 将当前用户对象传递给被装饰的函数
return f(current_user, *args, **kwargs)
return decorated
@bp.route('/change_initial_password', methods=['POST'])
def change_initial_password():
data = request.get_json()
if not data:
return jsonify({'message': '缺少请求数据'}), 400
username_or_email = data.get('username_or_email') or data.get('username')
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([username_or_email, old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
user = User.find_by_username_or_email(username_or_email)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
if user.login_info.has_changed_initial_password:
return jsonify({'message': '初始密码已经被修改过'}), 400
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
user.login_info.is_new_user = False
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'Initial password changed for user: {user.username}')
current_app.logger.info(f'New user status: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}')
return jsonify({
'message': '密码修改成功,现在可以登录',
'access_token': access_token,
'refresh_token': refresh_token
}), 200
@bp.route('/current_user', methods=['GET'])
@jwt_required()
def get_current_user():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
try:
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'details': user_details,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify(user_info), 200
except Exception as e:
current_app.logger.error(f'Error retrieving current user info: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from flask_login import login_user, logout_user, login_required, current_user
from app import db
from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory
from urllib.parse import urlparse # 替换 werkzeug.urls import
import traceback
import random
import string
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \
verify_jwt_in_request
from functools import wraps
from datetime import datetime, timedelta
from app.authorization import Authorization, department_and_role_required
bp = Blueprint('auth', __name__)
@bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@bp.route('/register', methods=['POST'])
def register():
current_app.logger.info('Received registration request')
data = request.get_json()
current_app.logger.info(f'Registration data: {data}')
email = data.get('email')
if email:
existing_user = User.find_by_username_or_email(email)
if existing_user:
return jsonify({'message': '该邮箱已被注册'}), 400
required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender']
missing_fields = [field for field in required_fields if field not in data or not data[field]]
if missing_fields:
current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}')
return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400
try:
# 生成用户名和密码
username = generate_username(data['first_name'], data['last_name'])
password = generate_password()
current_app.logger.info(f'Generated username: {username}')
user = User(username=username)
user.set_password(password)
db.session.add(user)
# 设置用户详细信息
for field in UserField.query.all():
if field.name in data:
user.set_detail(field.name, data[field.name])
current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}')
# 设置新用户标记
user.login_info.is_new_user = True
user.login_info.has_changed_initial_password = False
db.session.commit()
current_app.logger.info(f'User {username} registered successfully')
return jsonify({
'message': '注册成功',
'username': username,
'password': password
}), 201
except Exception as e:
current_app.logger.error(f'Registration error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500
def generate_username(first_name, last_name):
base_username = f"{first_name.lower()} {last_name.lower()}"
username = base_username
counter = 1
while User.query.filter_by(username=username).first():
username = f"{base_username} {counter}"
counter += 1
return username
def generate_password():
characters = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(characters) for i in range(16))
@bp.route('/login', methods=['POST'])
def login_api():
current_app.logger.info('Received login request')
data = request.get_json()
if not data:
current_app.logger.warning('Invalid request data')
return jsonify({'message': '无效的请求数据'}), 400
username_or_email = data.get('username')
password = data.get('password')
current_app.logger.info(f'Login attempt for user: {username_or_email}')
if not username_or_email or not password:
current_app.logger.warning('Missing username/email or password')
return jsonify({'message': '缺少用户名/邮箱或密码'}), 400
try:
user = User.find_by_username_or_email(username_or_email)
if user is None:
current_app.logger.warning(f'User not found: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
if not user.check_password(password):
current_app.logger.warning(f'Invalid password for user: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
current_app.logger.info(f'User authenticated: {username_or_email}')
current_app.logger.info(f'Is new user: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}')
# 检查是否是新用户或未更改初始密码
if user.login_info.is_new_user or not user.login_info.has_changed_initial_password:
current_app.logger.info(f'New user or initial password not changed: {username_or_email}')
return jsonify({
'message': '首次登录,请修改密码',
'require_password_change': True
}), 403
# 检查密码是否过期
if user.password_history:
days_since_last_change = (datetime.utcnow() - user.password_history.last_password_change).days
if days_since_last_change >= 90:
current_app.logger.info(f'Password expired for user: {username_or_email}')
return jsonify({
'message': '密码已过期,请修改密码',
'require_password_change': True
}), 403
elif days_since_last_change >= 75:
expiry_date = user.password_history.last_password_change + timedelta(days=90)
warning_message = f'您的密码将在 {expiry_date.strftime("%Y-%m-%d")} 过期,请尽快修改密码'
current_app.logger.info(f'Password expiry warning for user: {username_or_email}')
else:
warning_message = None
else:
warning_message = None
login_user(user)
user.login_info.update_login_info()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'User {user.username} logged in successfully')
# 获取用户详细信息
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'details': user_details,
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
response_data = {
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user_info': user_info
}
if warning_message:
response_data['warning'] = warning_message
return jsonify(response_data), 200
except Exception as e:
current_app.logger.error(f'Login error: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '登录失败,请稍后再试'}), 500
@bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
access_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': access_token}), 200
@bp.route('/logout', methods=['POST'])
@login_required
def logout_api():
current_app.logger.info(f'User {current_user.username} logged out')
logout_user()
return jsonify({'message': '登出成功'}), 200
@bp.route('/profile', methods=['GET'])
@login_required
def profile():
current_app.logger.info(f'Profile requested for user {current_user.username}')
try:
profile_data = {
'username': current_user.username,
'login_info': {
'register_time': current_user.login_info.register_time.isoformat(),
'login_count': current_user.login_info.login_count,
'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None,
'is_new_user': current_user.login_info.is_new_user
}
}
current_app.logger.info(f'Profile data retrieved for user {current_user.username}')
return jsonify(profile_data)
except Exception as e:
current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500
@bp.route('/change_password', methods=['POST'])
@jwt_required()
def change_password():
current_app.logger.info('Received change password request')
current_user_id = get_jwt_identity()
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
try:
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
current_app.logger.info(f'Password changed successfully for user {user.username}')
return jsonify({'message': '密码修改成功'}), 200
except Exception as e:
current_app.logger.error(f'Password change error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': '密码修改失败,请稍后再试'}), 500
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
# 验证 JWT token
verify_jwt_in_request()
# 获取当前用户的身份通常是用户ID
current_user_id = get_jwt_identity()
# 从数据库中获取用户对象
current_user = User.query.get(current_user_id)
if not current_user:
return jsonify({'message': '找不到用户'}), 404
except Exception as e:
return jsonify({'message': '无效的令牌'}), 401
# 将当前用户对象传递给被装饰的函数
return f(current_user, *args, **kwargs)
return decorated
@bp.route('/change_initial_password', methods=['POST'])
def change_initial_password():
data = request.get_json()
if not data:
return jsonify({'message': '缺少请求数据'}), 400
username_or_email = data.get('username_or_email') or data.get('username')
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([username_or_email, old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
user = User.find_by_username_or_email(username_or_email)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
if user.login_info.has_changed_initial_password:
return jsonify({'message': '初始密码已经被修改过'}), 400
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
user.login_info.is_new_user = False
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'Initial password changed for user: {user.username}')
current_app.logger.info(f'New user status: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}')
return jsonify({
'message': '密码修改成功,现在可以登录',
'access_token': access_token,
'refresh_token': refresh_token
}), 200
@bp.route('/current_user', methods=['GET'])
@jwt_required()
def get_current_user():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
try:
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'details': user_details,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify(user_info), 200
except Exception as e:
current_app.logger.error(f'Error retrieving current user info: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500
@bp.route('/force_password_change', methods=['POST'])
@jwt_required()
@department_and_role_required('信息技术', ['Global Administrator', 'frontline staff'])
def force_password_change():
data = request.get_json()
target_user_id = data.get('user_id')
new_password = data.get('new_password')
if not target_user_id or not new_password:
return jsonify({'message': '缺少必要的字段'}), 400
target_user = User.query.get(target_user_id)
if not target_user:
return jsonify({'message': '目标用户不存在'}), 404
try:
target_user.set_password(new_password)
target_user.login_info.has_changed_initial_password = False
target_user.login_info.is_new_user = False
# 更新密码修改历史
if target_user.password_history:
target_user.password_history.update_password_change()
else:
target_user.password_history = UserPasswordHistory(user=target_user)
db.session.commit()
current_app.logger.info(f'Password forcibly changed for user: {target_user.username}')
return jsonify({'message': '密码已成功强制修改'}), 200
except Exception as e:
current_app.logger.error(f'Force password change error: {str(e)}')
db.session.rollback()
return jsonify({'message': '强制修改密码失败,请稍后再试'}), 500