更新 APP/authorization.py

This commit is contained in:
wangwei 2024-10-14 15:38:41 +08:00
parent 342a6c4f99
commit e97b456c4d

View File

@ -1,95 +1,148 @@
from functools import wraps
from flask import jsonify, request, current_app
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from app.models import User, Permission
from .extensions import db
class Authorization:
@staticmethod
def check_permission(required_permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from .models import User # 在函数内部导入以避免循环导入
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if required_permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_role(required_role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_roles = [role.name for role in user.roles]
if required_role not in user_roles:
return jsonify({"message": "Role not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_department(required_department):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments]
if required_department not in user_departments:
return jsonify({"message": "Department not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def api_permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def init_permissions():
from .models import Permission # 在函数内部导入以避免循环导入
# 这个函数可以在应用启动时调用,用于初始化或更新权限
# 可以从配置文件、数据库或其他来源读取权限定义
permissions = [
"create_user", "edit_user", "delete_user",
"create_role", "edit_role", "delete_role",
"create_department", "edit_department", "delete_department",
# ... 其他权限 ...
]
for perm_name in permissions:
perm = Permission.query.filter_by(name=perm_name).first()
if not perm:
new_perm = Permission(name=perm_name)
db.session.add(new_perm)
db.session.commit()
# 可以添加更多的辅助函数,比如检查复杂的权限组合等
from functools import wraps
from flask import jsonify, request, current_app
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from app.models import User, Permission, Department, Role
from .extensions import db
def department_and_role_required(department_name, role_names):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
if user.primary_department and user.primary_department.name == department_name:
user_roles = [role.name for role in user.roles]
if any(role in user_roles for role in role_names):
return f(*args, **kwargs)
return jsonify({"message": "Permission denied"}), 403
return decorated_function
return decorator
class Authorization:
@staticmethod
def check_permission(required_permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if required_permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_role(required_role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_roles = [role.name for role in user.roles]
if required_role not in user_roles:
return jsonify({"message": "Role not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_department(required_department):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments]
if required_department not in user_departments:
return jsonify({"message": "Department not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def api_permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def department_required(department_name):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
if user.primary_department and user.primary_department.name == department_name:
return f(*args, **kwargs)
return jsonify({"message": "Permission denied"}), 403
return decorated_function
return decorator
@staticmethod
def role_required(role_names):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_roles = [role.name for role in user.roles]
if any(role in user_roles for role in role_names):
return f(*args, **kwargs)
return jsonify({"message": "Permission denied"}), 403
return decorated_function
return decorator
def init_permissions():
from .models import Permission # 在函数内部导入以避免循环导入
# 这个函数可以在应用启动时调用,用于初始化或更新权限
# 可以从配置文件、数据库或其他来源读取权限定义
permissions = [
"create_user", "edit_user", "delete_user",
"create_role", "edit_role", "delete_role",
"create_department", "edit_department", "delete_department",
# ... 其他权限 ...
]
for perm_name in permissions:
perm = Permission.query.filter_by(name=perm_name).first()
if not perm:
new_perm = Permission(name=perm_name)
db.session.add(new_perm)
db.session.commit()
# 可以添加更多的辅助函数,比如检查复杂的权限组合等