407 lines
16 KiB
Python
407 lines
16 KiB
Python
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
|
||
from flask_login import login_user, logout_user, login_required, current_user
|
||
from app import db
|
||
from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory
|
||
from urllib.parse import urlparse # 替换 werkzeug.urls import
|
||
import traceback
|
||
import random
|
||
import string
|
||
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \
|
||
verify_jwt_in_request
|
||
from functools import wraps
|
||
from datetime import datetime, timedelta
|
||
from app.authorization import Authorization, department_and_role_required
|
||
|
||
bp = Blueprint('auth', __name__)
|
||
|
||
@bp.route('/logout')
|
||
def logout():
|
||
logout_user()
|
||
return redirect(url_for('index'))
|
||
|
||
@bp.route('/register', methods=['POST'])
|
||
def register():
|
||
current_app.logger.info('Received registration request')
|
||
data = request.get_json()
|
||
current_app.logger.info(f'Registration data: {data}')
|
||
|
||
email = data.get('email')
|
||
if email:
|
||
existing_user = User.find_by_username_or_email(email)
|
||
if existing_user:
|
||
return jsonify({'message': '该邮箱已被注册'}), 400
|
||
|
||
required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender']
|
||
missing_fields = [field for field in required_fields if field not in data or not data[field]]
|
||
|
||
if missing_fields:
|
||
current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}')
|
||
return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400
|
||
|
||
try:
|
||
# 生成用户名和密码
|
||
username = generate_username(data['first_name'], data['last_name'])
|
||
password = generate_password()
|
||
current_app.logger.info(f'Generated username: {username}')
|
||
|
||
user = User(username=username)
|
||
user.set_password(password)
|
||
db.session.add(user)
|
||
|
||
# 设置用户详细信息
|
||
for field in UserField.query.all():
|
||
if field.name in data:
|
||
user.set_detail(field.name, data[field.name])
|
||
current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}')
|
||
|
||
# 设置新用户标记
|
||
user.login_info.is_new_user = True
|
||
user.login_info.has_changed_initial_password = False
|
||
|
||
db.session.commit()
|
||
current_app.logger.info(f'User {username} registered successfully')
|
||
return jsonify({
|
||
'message': '注册成功',
|
||
'username': username,
|
||
'password': password
|
||
}), 201
|
||
except Exception as e:
|
||
current_app.logger.error(f'Registration error: {str(e)}')
|
||
current_app.logger.error(traceback.format_exc())
|
||
db.session.rollback()
|
||
return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500
|
||
|
||
def generate_username(first_name, last_name):
|
||
base_username = f"{first_name.lower()} {last_name.lower()}"
|
||
username = base_username
|
||
counter = 1
|
||
while User.query.filter_by(username=username).first():
|
||
username = f"{base_username} {counter}"
|
||
counter += 1
|
||
return username
|
||
|
||
def generate_password():
|
||
characters = string.ascii_letters + string.digits + string.punctuation
|
||
return ''.join(random.choice(characters) for i in range(16))
|
||
|
||
|
||
@bp.route('/login', methods=['POST'])
|
||
def login_api():
|
||
current_app.logger.info('Received login request')
|
||
data = request.get_json()
|
||
|
||
if not data:
|
||
current_app.logger.warning('Invalid request data')
|
||
return jsonify({'message': '无效的请求数据'}), 400
|
||
|
||
username_or_email = data.get('username')
|
||
password = data.get('password')
|
||
|
||
current_app.logger.info(f'Login attempt for user: {username_or_email}')
|
||
|
||
if not username_or_email or not password:
|
||
current_app.logger.warning('Missing username/email or password')
|
||
return jsonify({'message': '缺少用户名/邮箱或密码'}), 400
|
||
|
||
try:
|
||
user = User.find_by_username_or_email(username_or_email)
|
||
if user is None:
|
||
current_app.logger.warning(f'User not found: {username_or_email}')
|
||
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
|
||
|
||
if not user.check_password(password):
|
||
current_app.logger.warning(f'Invalid password for user: {username_or_email}')
|
||
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
|
||
|
||
current_app.logger.info(f'User authenticated: {username_or_email}')
|
||
current_app.logger.info(f'Is new user: {user.login_info.is_new_user}')
|
||
current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}')
|
||
|
||
# 检查是否是新用户或未更改初始密码
|
||
if user.login_info.is_new_user or not user.login_info.has_changed_initial_password:
|
||
current_app.logger.info(f'New user or initial password not changed: {username_or_email}')
|
||
return jsonify({
|
||
'message': '首次登录,请修改密码',
|
||
'require_password_change': True
|
||
}), 403
|
||
|
||
# 检查密码是否过期
|
||
if user.password_history:
|
||
days_since_last_change = (datetime.utcnow() - user.password_history.last_password_change).days
|
||
if days_since_last_change >= 90:
|
||
current_app.logger.info(f'Password expired for user: {username_or_email}')
|
||
return jsonify({
|
||
'message': '密码已过期,请修改密码',
|
||
'require_password_change': True
|
||
}), 403
|
||
elif days_since_last_change >= 75:
|
||
expiry_date = user.password_history.last_password_change + timedelta(days=90)
|
||
warning_message = f'您的密码将在 {expiry_date.strftime("%Y-%m-%d")} 过期,请尽快修改密码'
|
||
current_app.logger.info(f'Password expiry warning for user: {username_or_email}')
|
||
else:
|
||
warning_message = None
|
||
else:
|
||
warning_message = None
|
||
|
||
login_user(user)
|
||
user.login_info.update_login_info()
|
||
|
||
access_token = create_access_token(identity=user.id)
|
||
refresh_token = create_refresh_token(identity=user.id)
|
||
|
||
current_app.logger.info(f'User {user.username} logged in successfully')
|
||
|
||
# 获取用户详细信息
|
||
user_fields = UserField.query.all()
|
||
user_details = {}
|
||
for field in user_fields:
|
||
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
|
||
user_details[field.name] = detail.value if detail else None
|
||
|
||
user_info = {
|
||
'id': user.id,
|
||
'username': user.username,
|
||
'primary_department': user.primary_department.name if user.primary_department else None,
|
||
'secondary_departments': [dept.name for dept in user.secondary_departments],
|
||
'roles': [role.name for role in user.roles],
|
||
'permissions': [perm.name for perm in user.get_all_permissions()],
|
||
'details': user_details,
|
||
'login_info': {
|
||
'register_time': user.login_info.register_time.isoformat(),
|
||
'login_count': user.login_info.login_count,
|
||
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
|
||
'is_new_user': user.login_info.is_new_user,
|
||
'has_changed_initial_password': user.login_info.has_changed_initial_password
|
||
}
|
||
}
|
||
|
||
response_data = {
|
||
'message': '登录成功',
|
||
'access_token': access_token,
|
||
'refresh_token': refresh_token,
|
||
'user_info': user_info
|
||
}
|
||
|
||
if warning_message:
|
||
response_data['warning'] = warning_message
|
||
|
||
return jsonify(response_data), 200
|
||
except Exception as e:
|
||
current_app.logger.error(f'Login error: {str(e)}')
|
||
current_app.logger.error(traceback.format_exc())
|
||
return jsonify({'message': '登录失败,请稍后再试'}), 500
|
||
|
||
@bp.route('/refresh', methods=['POST'])
|
||
@jwt_required(refresh=True)
|
||
def refresh():
|
||
current_user_id = get_jwt_identity()
|
||
access_token = create_access_token(identity=current_user_id)
|
||
return jsonify({'access_token': access_token}), 200
|
||
|
||
@bp.route('/logout', methods=['POST'])
|
||
@login_required
|
||
def logout_api():
|
||
current_app.logger.info(f'User {current_user.username} logged out')
|
||
logout_user()
|
||
return jsonify({'message': '登出成功'}), 200
|
||
|
||
@bp.route('/profile', methods=['GET'])
|
||
@login_required
|
||
def profile():
|
||
current_app.logger.info(f'Profile requested for user {current_user.username}')
|
||
try:
|
||
profile_data = {
|
||
'username': current_user.username,
|
||
'login_info': {
|
||
'register_time': current_user.login_info.register_time.isoformat(),
|
||
'login_count': current_user.login_info.login_count,
|
||
'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None,
|
||
'is_new_user': current_user.login_info.is_new_user
|
||
}
|
||
}
|
||
current_app.logger.info(f'Profile data retrieved for user {current_user.username}')
|
||
return jsonify(profile_data)
|
||
except Exception as e:
|
||
current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}')
|
||
current_app.logger.error(traceback.format_exc())
|
||
return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500
|
||
|
||
@bp.route('/change_password', methods=['POST'])
|
||
@jwt_required()
|
||
def change_password():
|
||
current_app.logger.info('Received change password request')
|
||
current_user_id = get_jwt_identity()
|
||
data = request.get_json()
|
||
old_password = data.get('old_password')
|
||
new_password = data.get('new_password')
|
||
|
||
if not all([old_password, new_password]):
|
||
return jsonify({'message': '缺少必要的字段'}), 400
|
||
|
||
try:
|
||
user = User.query.get(current_user_id)
|
||
if not user:
|
||
return jsonify({'message': '用户不存在'}), 404
|
||
|
||
if not user.check_password(old_password):
|
||
return jsonify({'message': '旧密码不正确'}), 401
|
||
|
||
user.set_password(new_password)
|
||
user.login_info.has_changed_initial_password = True
|
||
|
||
# 更新密码修改历史
|
||
if user.password_history:
|
||
user.password_history.update_password_change()
|
||
else:
|
||
user.password_history = UserPasswordHistory(user=user)
|
||
|
||
db.session.commit()
|
||
|
||
current_app.logger.info(f'Password changed successfully for user {user.username}')
|
||
return jsonify({'message': '密码修改成功'}), 200
|
||
except Exception as e:
|
||
current_app.logger.error(f'Password change error: {str(e)}')
|
||
current_app.logger.error(traceback.format_exc())
|
||
db.session.rollback()
|
||
return jsonify({'message': '密码修改失败,请稍后再试'}), 500
|
||
|
||
def token_required(f):
|
||
@wraps(f)
|
||
def decorated(*args, **kwargs):
|
||
try:
|
||
# 验证 JWT token
|
||
verify_jwt_in_request()
|
||
# 获取当前用户的身份(通常是用户ID)
|
||
current_user_id = get_jwt_identity()
|
||
# 从数据库中获取用户对象
|
||
current_user = User.query.get(current_user_id)
|
||
if not current_user:
|
||
return jsonify({'message': '找不到用户'}), 404
|
||
except Exception as e:
|
||
return jsonify({'message': '无效的令牌'}), 401
|
||
# 将当前用户对象传递给被装饰的函数
|
||
return f(current_user, *args, **kwargs)
|
||
return decorated
|
||
|
||
@bp.route('/change_initial_password', methods=['POST'])
|
||
def change_initial_password():
|
||
data = request.get_json()
|
||
|
||
if not data:
|
||
return jsonify({'message': '缺少请求数据'}), 400
|
||
|
||
username_or_email = data.get('username_or_email') or data.get('username')
|
||
old_password = data.get('old_password')
|
||
new_password = data.get('new_password')
|
||
|
||
if not all([username_or_email, old_password, new_password]):
|
||
return jsonify({'message': '缺少必要的字段'}), 400
|
||
|
||
user = User.find_by_username_or_email(username_or_email)
|
||
if not user:
|
||
return jsonify({'message': '用户不存在'}), 404
|
||
|
||
if not user.check_password(old_password):
|
||
return jsonify({'message': '旧密码不正确'}), 401
|
||
|
||
if user.login_info.has_changed_initial_password:
|
||
return jsonify({'message': '初始密码已经被修改过'}), 400
|
||
|
||
user.set_password(new_password)
|
||
user.login_info.has_changed_initial_password = True
|
||
user.login_info.is_new_user = False
|
||
|
||
# 更新密码修改历史
|
||
if user.password_history:
|
||
user.password_history.update_password_change()
|
||
else:
|
||
user.password_history = UserPasswordHistory(user=user)
|
||
|
||
db.session.commit()
|
||
|
||
access_token = create_access_token(identity=user.id)
|
||
refresh_token = create_refresh_token(identity=user.id)
|
||
|
||
current_app.logger.info(f'Initial password changed for user: {user.username}')
|
||
current_app.logger.info(f'New user status: {user.login_info.is_new_user}')
|
||
current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}')
|
||
|
||
return jsonify({
|
||
'message': '密码修改成功,现在可以登录',
|
||
'access_token': access_token,
|
||
'refresh_token': refresh_token
|
||
}), 200
|
||
|
||
@bp.route('/current_user', methods=['GET'])
|
||
@jwt_required()
|
||
def get_current_user():
|
||
current_user_id = get_jwt_identity()
|
||
user = User.query.get(current_user_id)
|
||
|
||
if not user:
|
||
return jsonify({'message': '用户不存在'}), 404
|
||
|
||
try:
|
||
user_fields = UserField.query.all()
|
||
user_details = {}
|
||
for field in user_fields:
|
||
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
|
||
user_details[field.name] = detail.value if detail else None
|
||
|
||
user_info = {
|
||
'id': user.id,
|
||
'username': user.username,
|
||
'details': user_details,
|
||
'primary_department': user.primary_department.name if user.primary_department else None,
|
||
'secondary_departments': [dept.name for dept in user.secondary_departments],
|
||
'roles': [role.name for role in user.roles],
|
||
'permissions': [perm.name for perm in user.get_all_permissions()],
|
||
'login_info': {
|
||
'register_time': user.login_info.register_time.isoformat(),
|
||
'login_count': user.login_info.login_count,
|
||
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
|
||
'is_new_user': user.login_info.is_new_user,
|
||
'has_changed_initial_password': user.login_info.has_changed_initial_password
|
||
}
|
||
}
|
||
|
||
return jsonify(user_info), 200
|
||
except Exception as e:
|
||
current_app.logger.error(f'Error retrieving current user info: {str(e)}')
|
||
current_app.logger.error(traceback.format_exc())
|
||
return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500
|
||
|
||
@bp.route('/force_password_change', methods=['POST'])
|
||
@jwt_required()
|
||
@department_and_role_required('信息技术', ['Global Administrator', 'frontline staff'])
|
||
def force_password_change():
|
||
data = request.get_json()
|
||
target_user_id = data.get('user_id')
|
||
new_password = data.get('new_password')
|
||
|
||
if not target_user_id or not new_password:
|
||
return jsonify({'message': '缺少必要的字段'}), 400
|
||
|
||
target_user = User.query.get(target_user_id)
|
||
if not target_user:
|
||
return jsonify({'message': '目标用户不存在'}), 404
|
||
|
||
try:
|
||
target_user.set_password(new_password)
|
||
target_user.login_info.has_changed_initial_password = False
|
||
target_user.login_info.is_new_user = False
|
||
|
||
# 更新密码修改历史
|
||
if target_user.password_history:
|
||
target_user.password_history.update_password_change()
|
||
else:
|
||
target_user.password_history = UserPasswordHistory(user=target_user)
|
||
|
||
db.session.commit()
|
||
|
||
current_app.logger.info(f'Password forcibly changed for user: {target_user.username}')
|
||
return jsonify({'message': '密码已成功强制修改'}), 200
|
||
except Exception as e:
|
||
current_app.logger.error(f'Force password change error: {str(e)}')
|
||
db.session.rollback()
|
||
return jsonify({'message': '强制修改密码失败,请稍后再试'}), 500 |