From b52da71197c41a67114a17abae89164b3481428e Mon Sep 17 00:00:00 2001 From: wangwei Date: Mon, 14 Oct 2024 14:12:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20APP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- APP/__init__.py | 93 +++++++++++ APP/auth.py | 356 +++++++++++++++++++++++++++++++++++++++++++ APP/authorization.py | 95 ++++++++++++ APP/decorators.py | 16 ++ APP/departments.py | 150 ++++++++++++++++++ 5 files changed, 710 insertions(+) create mode 100644 APP/__init__.py create mode 100644 APP/auth.py create mode 100644 APP/authorization.py create mode 100644 APP/decorators.py create mode 100644 APP/departments.py diff --git a/APP/__init__.py b/APP/__init__.py new file mode 100644 index 0000000..12bbc26 --- /dev/null +++ b/APP/__init__.py @@ -0,0 +1,93 @@ +from flask import Flask, current_app +from flask_sqlalchemy import SQLAlchemy +from flask_login import LoginManager +from flask_migrate import Migrate +from flask_jwt_extended import JWTManager +from config import Config +import logging +from logging.handlers import RotatingFileHandler +import os + +db = SQLAlchemy() +migrate = Migrate() +login_manager = LoginManager() +jwt = JWTManager() + +def create_app(config_class=Config): + app = Flask(__name__) + app.config.from_object(config_class) + + db.init_app(app) + migrate.init_app(app, db) + login_manager.init_app(app) + jwt.init_app(app) + + # 设置日志记录器 + if not app.debug: + if not os.path.exists('logs'): + os.mkdir('logs') + file_handler = RotatingFileHandler('logs/app.log', maxBytes=10240, backupCount=10, delay=True) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')) + file_handler.setLevel(logging.INFO) + app.logger.addHandler(file_handler) + + app.logger.setLevel(logging.INFO) + app.logger.info('Application startup') + + from app import models + from app.models import RoleTemplate, Permission + + def init_database(): + db.create_all() + models.UserField.init_default_fields() + models.Department.init_default_departments() + models.Department.init_default_fields() + models.RoleField.init_default_fields() + models.Role.init_default_roles() + models.PermissionField.init_default_fields() + models.Permission.init_default_permissions() + + def init_role_templates(): + templates = [ + { + 'name': '管理员', + 'description': '系统管理员,拥有所有权限', + 'permissions': ['用户管理', '角色管理', '部门管理', '权限管理'] + }, + { + 'name': '普通用户', + 'description': '普通用户,拥有基本权限', + 'permissions': ['查看个人信息', '修改个人信息'] + }, + ] + + for template in templates: + if not RoleTemplate.query.filter_by(name=template['name']).first(): + permissions = [Permission.query.filter_by(name=p).first() for p in template['permissions']] + RoleTemplate.create_template( + name=template['name'], + description=template['description'], + permissions=[p for p in permissions if p] + ) + + with app.app_context(): + init_database() + init_role_templates() + + from app.auth import bp as auth_bp + app.register_blueprint(auth_bp) + + from app.departments import bp as departments_bp + app.register_blueprint(departments_bp, url_prefix='/departments') + + from app.roles import bp as roles_bp + app.register_blueprint(roles_bp, url_prefix='/roles') + + from app.permissions import bp as permissions_bp + app.register_blueprint(permissions_bp, url_prefix='/permissions') + + from app.users import bp as users_bp + app.register_blueprint(users_bp, url_prefix='/users') + + return app diff --git a/APP/auth.py b/APP/auth.py new file mode 100644 index 0000000..fb6bc5d --- /dev/null +++ b/APP/auth.py @@ -0,0 +1,356 @@ +from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app +from flask_login import login_user, logout_user, login_required, current_user +from app import db +from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory +from urllib.parse import urlparse # 替换 werkzeug.urls import +import traceback +import random +import string +from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \ + verify_jwt_in_request +from functools import wraps +from datetime import datetime, timedelta + +bp = Blueprint('auth', __name__) + +@bp.route('/logout') +def logout(): + logout_user() + return redirect(url_for('index')) + +@bp.route('/register', methods=['POST']) +def register(): + current_app.logger.info('Received registration request') + data = request.get_json() + current_app.logger.info(f'Registration data: {data}') + + email = data.get('email') + if email: + existing_user = User.find_by_username_or_email(email) + if existing_user: + return jsonify({'message': '该邮箱已被注册'}), 400 + + required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender'] + missing_fields = [field for field in required_fields if field not in data or not data[field]] + + if missing_fields: + current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}') + return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400 + + try: + # 生成用户名和密码 + username = generate_username(data['first_name'], data['last_name']) + password = generate_password() + current_app.logger.info(f'Generated username: {username}') + + user = User(username=username) + user.set_password(password) + db.session.add(user) + + # 设置用户详细信息 + for field in UserField.query.all(): + if field.name in data: + user.set_detail(field.name, data[field.name]) + current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}') + + # 设置新用户标记 + user.login_info.is_new_user = True + user.login_info.has_changed_initial_password = False + + db.session.commit() + current_app.logger.info(f'User {username} registered successfully') + return jsonify({ + 'message': '注册成功', + 'username': username, + 'password': password + }), 201 + except Exception as e: + current_app.logger.error(f'Registration error: {str(e)}') + current_app.logger.error(traceback.format_exc()) + db.session.rollback() + return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500 + +def generate_username(first_name, last_name): + base_username = f"{first_name.lower()} {last_name.lower()}" + username = base_username + counter = 1 + while User.query.filter_by(username=username).first(): + username = f"{base_username} {counter}" + counter += 1 + return username + +def generate_password(): + characters = string.ascii_letters + string.digits + string.punctuation + return ''.join(random.choice(characters) for i in range(16)) + + +@bp.route('/login', methods=['POST']) +def login_api(): + current_app.logger.info('Received login request') + data = request.get_json() + + if not data: + current_app.logger.warning('Invalid request data') + return jsonify({'message': '无效的请求数据'}), 400 + + username_or_email = data.get('username') + password = data.get('password') + + current_app.logger.info(f'Login attempt for user: {username_or_email}') + + if not username_or_email or not password: + current_app.logger.warning('Missing username/email or password') + return jsonify({'message': '缺少用户名/邮箱或密码'}), 400 + + try: + user = User.find_by_username_or_email(username_or_email) + if user is None: + current_app.logger.warning(f'User not found: {username_or_email}') + return jsonify({'message': '无效的用户名/邮箱或密码'}), 401 + + if not user.check_password(password): + current_app.logger.warning(f'Invalid password for user: {username_or_email}') + return jsonify({'message': '无效的用户名/邮箱或密码'}), 401 + + current_app.logger.info(f'User authenticated: {username_or_email}') + current_app.logger.info(f'Is new user: {user.login_info.is_new_user}') + current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}') + + # 检查是否是新用户或未更改初始密码 + if user.login_info.is_new_user or not user.login_info.has_changed_initial_password: + current_app.logger.info(f'New user or initial password not changed: {username_or_email}') + return jsonify({ + 'message': '首次登录,请修改密码', + 'require_password_change': True + }), 403 + + # 检查密码是否过期 + if user.needs_password_change(): + current_app.logger.info(f'Password expired for user: {username_or_email}') + return jsonify({ + 'message': '密码已过期,请修改密码', + 'require_password_change': True + }), 403 + + login_user(user) + user.login_info.update_login_info() + + access_token = create_access_token(identity=user.id) + refresh_token = create_refresh_token(identity=user.id) + + current_app.logger.info(f'User {user.username} logged in successfully') + + # 获取用户详细信息 + user_fields = UserField.query.all() + user_details = {} + for field in user_fields: + detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first() + user_details[field.name] = detail.value if detail else None + + user_info = { + 'id': user.id, + 'username': user.username, + 'primary_department': user.primary_department.name if user.primary_department else None, + 'secondary_departments': [dept.name for dept in user.secondary_departments], + 'roles': [role.name for role in user.roles], + 'permissions': [perm.name for perm in user.get_all_permissions()], # 添加这行 + 'details': user_details, + 'login_info': { + 'register_time': user.login_info.register_time.isoformat(), + 'login_count': user.login_info.login_count, + 'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None, + 'is_new_user': user.login_info.is_new_user, + 'has_changed_initial_password': user.login_info.has_changed_initial_password + } + } + + return jsonify({ + 'message': '登录成功', + 'access_token': access_token, + 'refresh_token': refresh_token, + 'user_info': user_info + }), 200 + except Exception as e: + current_app.logger.error(f'Login error: {str(e)}') + current_app.logger.error(traceback.format_exc()) + return jsonify({'message': '登录失败,请稍后再试'}), 500 + +@bp.route('/refresh', methods=['POST']) +@jwt_required(refresh=True) +def refresh(): + current_user_id = get_jwt_identity() + access_token = create_access_token(identity=current_user_id) + return jsonify({'access_token': access_token}), 200 + +@bp.route('/logout', methods=['POST']) +@login_required +def logout_api(): + current_app.logger.info(f'User {current_user.username} logged out') + logout_user() + return jsonify({'message': '登出成功'}), 200 + +@bp.route('/profile', methods=['GET']) +@login_required +def profile(): + current_app.logger.info(f'Profile requested for user {current_user.username}') + try: + profile_data = { + 'username': current_user.username, + 'login_info': { + 'register_time': current_user.login_info.register_time.isoformat(), + 'login_count': current_user.login_info.login_count, + 'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None, + 'is_new_user': current_user.login_info.is_new_user + } + } + current_app.logger.info(f'Profile data retrieved for user {current_user.username}') + return jsonify(profile_data) + except Exception as e: + current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}') + current_app.logger.error(traceback.format_exc()) + return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500 + +@bp.route('/change_password', methods=['POST']) +@jwt_required() +def change_password(): + current_app.logger.info('Received change password request') + current_user_id = get_jwt_identity() + data = request.get_json() + old_password = data.get('old_password') + new_password = data.get('new_password') + + if not all([old_password, new_password]): + return jsonify({'message': '缺少必要的字段'}), 400 + + try: + user = User.query.get(current_user_id) + if not user: + return jsonify({'message': '用户不存在'}), 404 + + if not user.check_password(old_password): + return jsonify({'message': '旧密码不正确'}), 401 + + user.set_password(new_password) + user.login_info.has_changed_initial_password = True + + # 更新密码修改历史 + if user.password_history: + user.password_history.update_password_change() + else: + user.password_history = UserPasswordHistory(user=user) + + db.session.commit() + + current_app.logger.info(f'Password changed successfully for user {user.username}') + return jsonify({'message': '密码修改成功'}), 200 + except Exception as e: + current_app.logger.error(f'Password change error: {str(e)}') + current_app.logger.error(traceback.format_exc()) + db.session.rollback() + return jsonify({'message': '密码修改失败,请稍后再试'}), 500 + +def token_required(f): + @wraps(f) + def decorated(*args, **kwargs): + try: + # 验证 JWT token + verify_jwt_in_request() + # 获取当前用户的身份(通常是用户ID) + current_user_id = get_jwt_identity() + # 从数据库中获取用户对象 + current_user = User.query.get(current_user_id) + if not current_user: + return jsonify({'message': '找不到用户'}), 404 + except Exception as e: + return jsonify({'message': '无效的令牌'}), 401 + # 将当前用户对象传递给被装饰的函数 + return f(current_user, *args, **kwargs) + return decorated + +@bp.route('/change_initial_password', methods=['POST']) +def change_initial_password(): + data = request.get_json() + + if not data: + return jsonify({'message': '缺少请求数据'}), 400 + + username_or_email = data.get('username_or_email') or data.get('username') + old_password = data.get('old_password') + new_password = data.get('new_password') + + if not all([username_or_email, old_password, new_password]): + return jsonify({'message': '缺少必要的字段'}), 400 + + user = User.find_by_username_or_email(username_or_email) + if not user: + return jsonify({'message': '用户不存在'}), 404 + + if not user.check_password(old_password): + return jsonify({'message': '旧密码不正确'}), 401 + + if user.login_info.has_changed_initial_password: + return jsonify({'message': '初始密码已经被修改过'}), 400 + + user.set_password(new_password) + user.login_info.has_changed_initial_password = True + user.login_info.is_new_user = False + + # 更新密码修改历史 + if user.password_history: + user.password_history.update_password_change() + else: + user.password_history = UserPasswordHistory(user=user) + + db.session.commit() + + access_token = create_access_token(identity=user.id) + refresh_token = create_refresh_token(identity=user.id) + + current_app.logger.info(f'Initial password changed for user: {user.username}') + current_app.logger.info(f'New user status: {user.login_info.is_new_user}') + current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}') + + return jsonify({ + 'message': '密码修改成功,现在可以登录', + 'access_token': access_token, + 'refresh_token': refresh_token + }), 200 + +@bp.route('/current_user', methods=['GET']) +@jwt_required() +def get_current_user(): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + + if not user: + return jsonify({'message': '用户不存在'}), 404 + + try: + user_fields = UserField.query.all() + user_details = {} + for field in user_fields: + detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first() + user_details[field.name] = detail.value if detail else None + + user_info = { + 'id': user.id, + 'username': user.username, + 'details': user_details, + 'primary_department': user.primary_department.name if user.primary_department else None, + 'secondary_departments': [dept.name for dept in user.secondary_departments], + 'roles': [role.name for role in user.roles], + 'permissions': [perm.name for perm in user.get_all_permissions()], + 'login_info': { + 'register_time': user.login_info.register_time.isoformat(), + 'login_count': user.login_info.login_count, + 'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None, + 'is_new_user': user.login_info.is_new_user, + 'has_changed_initial_password': user.login_info.has_changed_initial_password + } + } + + return jsonify(user_info), 200 + except Exception as e: + current_app.logger.error(f'Error retrieving current user info: {str(e)}') + current_app.logger.error(traceback.format_exc()) + return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500 diff --git a/APP/authorization.py b/APP/authorization.py new file mode 100644 index 0000000..c8df309 --- /dev/null +++ b/APP/authorization.py @@ -0,0 +1,95 @@ +from functools import wraps +from flask import jsonify, request, current_app +from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request +from app.models import User, Permission +from .extensions import db + +class Authorization: + @staticmethod + def check_permission(required_permission): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + from .models import User # 在函数内部导入以避免循环导入 + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_permissions = user.get_all_permissions() + if required_permission not in user_permissions: + return jsonify({"message": "Permission denied"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def check_role(required_role): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_roles = [role.name for role in user.roles] + if required_role not in user_roles: + return jsonify({"message": "Role not authorized"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def check_department(required_department): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments] + if required_department not in user_departments: + return jsonify({"message": "Department not authorized"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def api_permission_required(permission): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + verify_jwt_in_request() + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_permissions = user.get_all_permissions() + if permission not in user_permissions: + return jsonify({"message": "Permission denied"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + +def init_permissions(): + from .models import Permission # 在函数内部导入以避免循环导入 + # 这个函数可以在应用启动时调用,用于初始化或更新权限 + # 可以从配置文件、数据库或其他来源读取权限定义 + permissions = [ + "create_user", "edit_user", "delete_user", + "create_role", "edit_role", "delete_role", + "create_department", "edit_department", "delete_department", + # ... 其他权限 ... + ] + for perm_name in permissions: + perm = Permission.query.filter_by(name=perm_name).first() + if not perm: + new_perm = Permission(name=perm_name) + db.session.add(new_perm) + db.session.commit() + +# 可以添加更多的辅助函数,比如检查复杂的权限组合等 diff --git a/APP/decorators.py b/APP/decorators.py new file mode 100644 index 0000000..f029a50 --- /dev/null +++ b/APP/decorators.py @@ -0,0 +1,16 @@ +from functools import wraps +from flask import abort +from flask_login import current_user + +def permission_required(permission): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + abort(403) + user_permissions = current_user.get_all_permissions() + if permission not in [p.name for p in user_permissions]: + abort(403) + return f(*args, **kwargs) + return decorated_function + return decorator diff --git a/APP/departments.py b/APP/departments.py new file mode 100644 index 0000000..64c29e2 --- /dev/null +++ b/APP/departments.py @@ -0,0 +1,150 @@ +from flask import Blueprint, request, jsonify +from app.models import Department, User, db +from sqlalchemy.exc import IntegrityError + +bp = Blueprint('departments', __name__) + +@bp.route('', methods=['POST']) +def create_department(): + data = request.get_json() + + if not data or 'name' not in data: + return jsonify({'message': '部门名称是必需的'}), 400 + + name = data['name'] + + # 检查部门名称是否已存在 + if Department.query.filter_by(name=name).first(): + return jsonify({'message': '部门名称已存在'}), 400 + + new_department = Department(name=name) + db.session.add(new_department) + db.session.flush() # 这会给 new_department 分配一个 id + + # 如果提供了其他字段,设置它们 + for field in ['department_code', 'manager', 'phone', 'email', 'location']: + if field in data: + new_department.set_detail(field, data[field]) + + try: + db.session.commit() + except IntegrityError: + db.session.rollback() + return jsonify({'message': '创建部门失败,可能是由于数据完整性问题'}), 500 + + return jsonify({'message': '部门创建成功', 'id': new_department.id}), 201 + +@bp.route('//children', methods=['POST']) +def create_child_department(parent_id): + parent_department = Department.query.get(parent_id) + if not parent_department: + return jsonify({'message': '父部门不存在'}), 404 + + data = request.get_json() + + if not data or 'name' not in data: + return jsonify({'message': '子部门名称是必需的'}), 400 + + name = data['name'] + + # 检查部门名称是否已存在 + if Department.query.filter_by(name=name).first(): + return jsonify({'message': '部门名称已存在'}), 400 + + new_department = Department(name=name, parent_id=parent_id) + db.session.add(new_department) + db.session.flush() # 这会给 new_department 分配一个 id + + # 如果提供了其他字段,设置它们 + for field in ['department_code', 'manager', 'phone', 'email', 'location']: + if field in data: + new_department.set_detail(field, data[field]) + + try: + db.session.commit() + except IntegrityError: + db.session.rollback() + return jsonify({'message': '创建子部门失败,可能是由于数据完整性问题'}), 500 + + return jsonify({'message': '子部门创建成功', 'id': new_department.id}), 201 + +@bp.route('//add_user', methods=['POST']) +def add_user_to_department(department_id): + data = request.get_json() + + if not data or 'user_id' not in data: + return jsonify({'message': '用户ID是必需的'}), 400 + + user_id = data['user_id'] + + department = Department.query.get(department_id) + if not department: + return jsonify({'message': '部门不存在'}), 404 + + user = User.query.get(user_id) + if not user: + return jsonify({'message': '用户不存在'}), 404 + + if 'is_primary' in data and data['is_primary']: + # 如果是主要部门,直接设置 + user.primary_department = department + else: + # 如果不是主要部门,添加到次要部门 + if department not in user.secondary_departments: + user.secondary_departments.append(department) + + try: + db.session.commit() + return jsonify({'message': '用户成功添加到部门'}), 200 + except IntegrityError: + db.session.rollback() + return jsonify({'message': '添加用户到部门失败,可能是由于数据完整性问题'}), 500 + +@bp.route('', methods=['GET']) +def get_departments(): + departments = Department.query.all() + departments_list = [{ + 'id': dept.id, + 'name': dept.name, + 'parent_id': dept.parent_id, + 'created_at': dept.created_at.isoformat() if dept.created_at else None, + 'updated_at': dept.updated_at.isoformat() if dept.updated_at else None + } for dept in departments] + return jsonify(departments_list), 200 + +@bp.route('/', methods=['GET']) +def get_department_details(department_id): + department = Department.query.get(department_id) + if not department: + return jsonify({'message': '部门不存在'}), 404 + + # 获取主要部门的用户 + primary_users = User.query.filter_by(primary_department_id=department.id).all() + + # 获取次要部门的用户 + secondary_users = department.users.all() + + # 合并用户列表并去重 + all_users = list(set(primary_users + secondary_users)) + + users = [{'id': u.id, 'username': u.username} for u in all_users] + children = [{'id': c.id, 'name': c.name} for c in department.children] + + department_details = { + 'id': department.id, + 'name': department.name, + 'parent_id': department.parent_id, + 'created_at': department.created_at.isoformat() if department.created_at else None, + 'updated_at': department.updated_at.isoformat() if department.updated_at else None, + 'department_code': department.get_detail('department_code'), + 'manager': department.get_detail('manager'), + 'phone': department.get_detail('phone'), + 'email': department.get_detail('email'), + 'location': department.get_detail('location'), + 'users': users, + 'children': children + } + + return jsonify(department_details), 200 + +# 你可以在这里添加更多的部门相关路由...