149 lines
6.0 KiB
Python
149 lines
6.0 KiB
Python
from functools import wraps
|
|
from flask import jsonify, request, current_app
|
|
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
|
|
from app.models import User, Permission, Department, Role
|
|
from .extensions import db
|
|
|
|
def department_and_role_required(department_name, role_names):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
verify_jwt_in_request()
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
if user.primary_department and user.primary_department.name == department_name:
|
|
user_roles = [role.name for role in user.roles]
|
|
if any(role in user_roles for role in role_names):
|
|
return f(*args, **kwargs)
|
|
return jsonify({"message": "Permission denied"}), 403
|
|
return decorated_function
|
|
return decorator
|
|
|
|
class Authorization:
|
|
@staticmethod
|
|
def check_permission(required_permission):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
user_permissions = user.get_all_permissions()
|
|
if required_permission not in user_permissions:
|
|
return jsonify({"message": "Permission denied"}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
@staticmethod
|
|
def check_role(required_role):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
user_roles = [role.name for role in user.roles]
|
|
if required_role not in user_roles:
|
|
return jsonify({"message": "Role not authorized"}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
@staticmethod
|
|
def check_department(required_department):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments]
|
|
if required_department not in user_departments:
|
|
return jsonify({"message": "Department not authorized"}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
@staticmethod
|
|
def api_permission_required(permission):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
verify_jwt_in_request()
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
user_permissions = user.get_all_permissions()
|
|
if permission not in user_permissions:
|
|
return jsonify({"message": "Permission denied"}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
@staticmethod
|
|
def department_required(department_name):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
verify_jwt_in_request()
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
if user.primary_department and user.primary_department.name == department_name:
|
|
return f(*args, **kwargs)
|
|
return jsonify({"message": "Permission denied"}), 403
|
|
return decorated_function
|
|
return decorator
|
|
|
|
@staticmethod
|
|
def role_required(role_names):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
verify_jwt_in_request()
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
user_roles = [role.name for role in user.roles]
|
|
if any(role in user_roles for role in role_names):
|
|
return f(*args, **kwargs)
|
|
return jsonify({"message": "Permission denied"}), 403
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def init_permissions():
|
|
from .models import Permission # 在函数内部导入以避免循环导入
|
|
# 这个函数可以在应用启动时调用,用于初始化或更新权限
|
|
# 可以从配置文件、数据库或其他来源读取权限定义
|
|
permissions = [
|
|
"create_user", "edit_user", "delete_user",
|
|
"create_role", "edit_role", "delete_role",
|
|
"create_department", "edit_department", "delete_department",
|
|
# ... 其他权限 ...
|
|
]
|
|
for perm_name in permissions:
|
|
perm = Permission.query.filter_by(name=perm_name).first()
|
|
if not perm:
|
|
new_perm = Permission(name=perm_name)
|
|
db.session.add(new_perm)
|
|
db.session.commit()
|
|
|
|
# 可以添加更多的辅助函数,比如检查复杂的权限组合等
|