OA/APP/auth.py
2024-10-14 15:37:51 +08:00

407 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from flask_login import login_user, logout_user, login_required, current_user
from app import db
from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory
from urllib.parse import urlparse # 替换 werkzeug.urls import
import traceback
import random
import string
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \
verify_jwt_in_request
from functools import wraps
from datetime import datetime, timedelta
from app.authorization import Authorization, department_and_role_required
bp = Blueprint('auth', __name__)
@bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@bp.route('/register', methods=['POST'])
def register():
current_app.logger.info('Received registration request')
data = request.get_json()
current_app.logger.info(f'Registration data: {data}')
email = data.get('email')
if email:
existing_user = User.find_by_username_or_email(email)
if existing_user:
return jsonify({'message': '该邮箱已被注册'}), 400
required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender']
missing_fields = [field for field in required_fields if field not in data or not data[field]]
if missing_fields:
current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}')
return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400
try:
# 生成用户名和密码
username = generate_username(data['first_name'], data['last_name'])
password = generate_password()
current_app.logger.info(f'Generated username: {username}')
user = User(username=username)
user.set_password(password)
db.session.add(user)
# 设置用户详细信息
for field in UserField.query.all():
if field.name in data:
user.set_detail(field.name, data[field.name])
current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}')
# 设置新用户标记
user.login_info.is_new_user = True
user.login_info.has_changed_initial_password = False
db.session.commit()
current_app.logger.info(f'User {username} registered successfully')
return jsonify({
'message': '注册成功',
'username': username,
'password': password
}), 201
except Exception as e:
current_app.logger.error(f'Registration error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500
def generate_username(first_name, last_name):
base_username = f"{first_name.lower()} {last_name.lower()}"
username = base_username
counter = 1
while User.query.filter_by(username=username).first():
username = f"{base_username} {counter}"
counter += 1
return username
def generate_password():
characters = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(characters) for i in range(16))
@bp.route('/login', methods=['POST'])
def login_api():
current_app.logger.info('Received login request')
data = request.get_json()
if not data:
current_app.logger.warning('Invalid request data')
return jsonify({'message': '无效的请求数据'}), 400
username_or_email = data.get('username')
password = data.get('password')
current_app.logger.info(f'Login attempt for user: {username_or_email}')
if not username_or_email or not password:
current_app.logger.warning('Missing username/email or password')
return jsonify({'message': '缺少用户名/邮箱或密码'}), 400
try:
user = User.find_by_username_or_email(username_or_email)
if user is None:
current_app.logger.warning(f'User not found: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
if not user.check_password(password):
current_app.logger.warning(f'Invalid password for user: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
current_app.logger.info(f'User authenticated: {username_or_email}')
current_app.logger.info(f'Is new user: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}')
# 检查是否是新用户或未更改初始密码
if user.login_info.is_new_user or not user.login_info.has_changed_initial_password:
current_app.logger.info(f'New user or initial password not changed: {username_or_email}')
return jsonify({
'message': '首次登录,请修改密码',
'require_password_change': True
}), 403
# 检查密码是否过期
if user.password_history:
days_since_last_change = (datetime.utcnow() - user.password_history.last_password_change).days
if days_since_last_change >= 90:
current_app.logger.info(f'Password expired for user: {username_or_email}')
return jsonify({
'message': '密码已过期,请修改密码',
'require_password_change': True
}), 403
elif days_since_last_change >= 75:
expiry_date = user.password_history.last_password_change + timedelta(days=90)
warning_message = f'您的密码将在 {expiry_date.strftime("%Y-%m-%d")} 过期,请尽快修改密码'
current_app.logger.info(f'Password expiry warning for user: {username_or_email}')
else:
warning_message = None
else:
warning_message = None
login_user(user)
user.login_info.update_login_info()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'User {user.username} logged in successfully')
# 获取用户详细信息
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'details': user_details,
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
response_data = {
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user_info': user_info
}
if warning_message:
response_data['warning'] = warning_message
return jsonify(response_data), 200
except Exception as e:
current_app.logger.error(f'Login error: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '登录失败,请稍后再试'}), 500
@bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
access_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': access_token}), 200
@bp.route('/logout', methods=['POST'])
@login_required
def logout_api():
current_app.logger.info(f'User {current_user.username} logged out')
logout_user()
return jsonify({'message': '登出成功'}), 200
@bp.route('/profile', methods=['GET'])
@login_required
def profile():
current_app.logger.info(f'Profile requested for user {current_user.username}')
try:
profile_data = {
'username': current_user.username,
'login_info': {
'register_time': current_user.login_info.register_time.isoformat(),
'login_count': current_user.login_info.login_count,
'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None,
'is_new_user': current_user.login_info.is_new_user
}
}
current_app.logger.info(f'Profile data retrieved for user {current_user.username}')
return jsonify(profile_data)
except Exception as e:
current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500
@bp.route('/change_password', methods=['POST'])
@jwt_required()
def change_password():
current_app.logger.info('Received change password request')
current_user_id = get_jwt_identity()
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
try:
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
current_app.logger.info(f'Password changed successfully for user {user.username}')
return jsonify({'message': '密码修改成功'}), 200
except Exception as e:
current_app.logger.error(f'Password change error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': '密码修改失败,请稍后再试'}), 500
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
# 验证 JWT token
verify_jwt_in_request()
# 获取当前用户的身份通常是用户ID
current_user_id = get_jwt_identity()
# 从数据库中获取用户对象
current_user = User.query.get(current_user_id)
if not current_user:
return jsonify({'message': '找不到用户'}), 404
except Exception as e:
return jsonify({'message': '无效的令牌'}), 401
# 将当前用户对象传递给被装饰的函数
return f(current_user, *args, **kwargs)
return decorated
@bp.route('/change_initial_password', methods=['POST'])
def change_initial_password():
data = request.get_json()
if not data:
return jsonify({'message': '缺少请求数据'}), 400
username_or_email = data.get('username_or_email') or data.get('username')
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([username_or_email, old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
user = User.find_by_username_or_email(username_or_email)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
if user.login_info.has_changed_initial_password:
return jsonify({'message': '初始密码已经被修改过'}), 400
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
user.login_info.is_new_user = False
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'Initial password changed for user: {user.username}')
current_app.logger.info(f'New user status: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}')
return jsonify({
'message': '密码修改成功,现在可以登录',
'access_token': access_token,
'refresh_token': refresh_token
}), 200
@bp.route('/current_user', methods=['GET'])
@jwt_required()
def get_current_user():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
try:
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'details': user_details,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify(user_info), 200
except Exception as e:
current_app.logger.error(f'Error retrieving current user info: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500
@bp.route('/force_password_change', methods=['POST'])
@jwt_required()
@department_and_role_required('信息技术', ['Global Administrator', 'frontline staff'])
def force_password_change():
data = request.get_json()
target_user_id = data.get('user_id')
new_password = data.get('new_password')
if not target_user_id or not new_password:
return jsonify({'message': '缺少必要的字段'}), 400
target_user = User.query.get(target_user_id)
if not target_user:
return jsonify({'message': '目标用户不存在'}), 404
try:
target_user.set_password(new_password)
target_user.login_info.has_changed_initial_password = False
target_user.login_info.is_new_user = False
# 更新密码修改历史
if target_user.password_history:
target_user.password_history.update_password_change()
else:
target_user.password_history = UserPasswordHistory(user=target_user)
db.session.commit()
current_app.logger.info(f'Password forcibly changed for user: {target_user.username}')
return jsonify({'message': '密码已成功强制修改'}), 200
except Exception as e:
current_app.logger.error(f'Force password change error: {str(e)}')
db.session.rollback()
return jsonify({'message': '强制修改密码失败,请稍后再试'}), 500