From e97b456c4d732ba30220771d25b8defc97838aac Mon Sep 17 00:00:00 2001 From: wangwei Date: Mon, 14 Oct 2024 15:38:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20APP/authorization.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- APP/authorization.py | 243 ++++++++++++++++++++++++++----------------- 1 file changed, 148 insertions(+), 95 deletions(-) diff --git a/APP/authorization.py b/APP/authorization.py index c8df309..4c3673b 100644 --- a/APP/authorization.py +++ b/APP/authorization.py @@ -1,95 +1,148 @@ -from functools import wraps -from flask import jsonify, request, current_app -from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request -from app.models import User, Permission -from .extensions import db - -class Authorization: - @staticmethod - def check_permission(required_permission): - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - from .models import User # 在函数内部导入以避免循环导入 - current_user_id = get_jwt_identity() - user = User.query.get(current_user_id) - if not user: - return jsonify({"message": "User not found"}), 404 - - user_permissions = user.get_all_permissions() - if required_permission not in user_permissions: - return jsonify({"message": "Permission denied"}), 403 - return f(*args, **kwargs) - return decorated_function - return decorator - - @staticmethod - def check_role(required_role): - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - current_user_id = get_jwt_identity() - user = User.query.get(current_user_id) - if not user: - return jsonify({"message": "User not found"}), 404 - - user_roles = [role.name for role in user.roles] - if required_role not in user_roles: - return jsonify({"message": "Role not authorized"}), 403 - return f(*args, **kwargs) - return decorated_function - return decorator - - @staticmethod - def check_department(required_department): - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - current_user_id = get_jwt_identity() - user = User.query.get(current_user_id) - if not user: - return jsonify({"message": "User not found"}), 404 - - user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments] - if required_department not in user_departments: - return jsonify({"message": "Department not authorized"}), 403 - return f(*args, **kwargs) - return decorated_function - return decorator - - @staticmethod - def api_permission_required(permission): - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - verify_jwt_in_request() - current_user_id = get_jwt_identity() - user = User.query.get(current_user_id) - if not user: - return jsonify({"message": "User not found"}), 404 - - user_permissions = user.get_all_permissions() - if permission not in user_permissions: - return jsonify({"message": "Permission denied"}), 403 - return f(*args, **kwargs) - return decorated_function - return decorator - -def init_permissions(): - from .models import Permission # 在函数内部导入以避免循环导入 - # 这个函数可以在应用启动时调用,用于初始化或更新权限 - # 可以从配置文件、数据库或其他来源读取权限定义 - permissions = [ - "create_user", "edit_user", "delete_user", - "create_role", "edit_role", "delete_role", - "create_department", "edit_department", "delete_department", - # ... 其他权限 ... - ] - for perm_name in permissions: - perm = Permission.query.filter_by(name=perm_name).first() - if not perm: - new_perm = Permission(name=perm_name) - db.session.add(new_perm) - db.session.commit() - -# 可以添加更多的辅助函数,比如检查复杂的权限组合等 +from functools import wraps +from flask import jsonify, request, current_app +from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request +from app.models import User, Permission, Department, Role +from .extensions import db + +def department_and_role_required(department_name, role_names): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + verify_jwt_in_request() + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + if user.primary_department and user.primary_department.name == department_name: + user_roles = [role.name for role in user.roles] + if any(role in user_roles for role in role_names): + return f(*args, **kwargs) + return jsonify({"message": "Permission denied"}), 403 + return decorated_function + return decorator + +class Authorization: + @staticmethod + def check_permission(required_permission): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_permissions = user.get_all_permissions() + if required_permission not in user_permissions: + return jsonify({"message": "Permission denied"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def check_role(required_role): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_roles = [role.name for role in user.roles] + if required_role not in user_roles: + return jsonify({"message": "Role not authorized"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def check_department(required_department): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments] + if required_department not in user_departments: + return jsonify({"message": "Department not authorized"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def api_permission_required(permission): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + verify_jwt_in_request() + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_permissions = user.get_all_permissions() + if permission not in user_permissions: + return jsonify({"message": "Permission denied"}), 403 + return f(*args, **kwargs) + return decorated_function + return decorator + + @staticmethod + def department_required(department_name): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + verify_jwt_in_request() + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + if user.primary_department and user.primary_department.name == department_name: + return f(*args, **kwargs) + return jsonify({"message": "Permission denied"}), 403 + return decorated_function + return decorator + + @staticmethod + def role_required(role_names): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + verify_jwt_in_request() + current_user_id = get_jwt_identity() + user = User.query.get(current_user_id) + if not user: + return jsonify({"message": "User not found"}), 404 + + user_roles = [role.name for role in user.roles] + if any(role in user_roles for role in role_names): + return f(*args, **kwargs) + return jsonify({"message": "Permission denied"}), 403 + return decorated_function + return decorator + + +def init_permissions(): + from .models import Permission # 在函数内部导入以避免循环导入 + # 这个函数可以在应用启动时调用,用于初始化或更新权限 + # 可以从配置文件、数据库或其他来源读取权限定义 + permissions = [ + "create_user", "edit_user", "delete_user", + "create_role", "edit_role", "delete_role", + "create_department", "edit_department", "delete_department", + # ... 其他权限 ... + ] + for perm_name in permissions: + perm = Permission.query.filter_by(name=perm_name).first() + if not perm: + new_perm = Permission(name=perm_name) + db.session.add(new_perm) + db.session.commit() + +# 可以添加更多的辅助函数,比如检查复杂的权限组合等