From be8628dceb6a62d28362fcb8eb177fd3149ae43f Mon Sep 17 00:00:00 2001 From: wangwei Date: Mon, 14 Oct 2024 14:14:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20APP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- APP/extensions.py | 7 + APP/models.py | 553 +++++++++++++++++++++++++++++++++++++++++++++ APP/permissions.py | 95 ++++++++ APP/roles.py | 138 +++++++++++ APP/users.py | 57 +++++ 5 files changed, 850 insertions(+) create mode 100644 APP/extensions.py create mode 100644 APP/models.py create mode 100644 APP/permissions.py create mode 100644 APP/roles.py create mode 100644 APP/users.py diff --git a/APP/extensions.py b/APP/extensions.py new file mode 100644 index 0000000..88521ac --- /dev/null +++ b/APP/extensions.py @@ -0,0 +1,7 @@ +from flask_sqlalchemy import SQLAlchemy +from flask_login import LoginManager +from flask_jwt_extended import JWTManager + +db = SQLAlchemy() +login_manager = LoginManager() +jwt = JWTManager() diff --git a/APP/models.py b/APP/models.py new file mode 100644 index 0000000..09e1f44 --- /dev/null +++ b/APP/models.py @@ -0,0 +1,553 @@ +from app import db, login_manager +from flask_login import UserMixin +from werkzeug.security import generate_password_hash, check_password_hash +from datetime import datetime, timedelta +from sqlalchemy.orm import relationship + +class UserField(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + description = db.Column(db.String(255)) + is_required = db.Column(db.Boolean, default=True) + + @staticmethod + def init_default_fields(): + default_fields = [ + {'name': 'first_name', 'description': '名', 'is_required': True}, + {'name': 'last_name', 'description': '姓', 'is_required': True}, + {'name': 'id_number', 'description': '身份证号码', 'is_required': True}, + {'name': 'phone', 'description': '电话', 'is_required': True}, + {'name': 'gender', 'description': '性别', 'is_required': True}, + {'name': 'email', 'description': '邮箱', 'is_required': False}, + {'name': 'photo', 'description': '照片', 'is_required': False}, + ] + for field in default_fields: + if not UserField.query.filter_by(name=field['name']).first(): + new_field = UserField(name=field['name'], description=field['description'], is_required=field['is_required']) + db.session.add(new_field) + db.session.commit() + +class User(UserMixin, db.Model): + id = db.Column(db.Integer, primary_key=True) + username = db.Column(db.String(64), index=True, unique=True, nullable=False) + roles = db.relationship('Role', secondary='user_roles', back_populates='users') + details = db.relationship('UserDetail', back_populates='user', cascade='all, delete-orphan') + login_info = db.relationship('UserLoginInfo', uselist=False, back_populates='user', cascade='all, delete-orphan') + password = db.relationship('UserPassword', uselist=False, back_populates='user', cascade='all, delete-orphan') + primary_department_id = db.Column(db.Integer, db.ForeignKey('department.id')) + primary_department = db.relationship('Department', foreign_keys=[primary_department_id]) + secondary_departments = db.relationship('Department', secondary='user_departments', back_populates='users', lazy='dynamic') + password_history = db.relationship('UserPasswordHistory', uselist=False, back_populates='user', cascade='all, delete-orphan') + + def set_password(self, password): + if self.password: + self.password.set_password(password) + else: + self.password = UserPassword(user=self) + self.password.set_password(password) + + # 更新密码修改历史 + if self.password_history: + self.password_history.update_password_change() + else: + self.password_history = UserPasswordHistory(user=self) + + def needs_password_change(self): + if self.login_info.is_new_user or not self.login_info.has_changed_initial_password: + return True + if not self.password_history: + return True + return self.password_history.is_password_expired() + + def set_detail(self, field_name, value): + field = UserField.query.filter_by(name=field_name).first() + if not field: + return False + detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first() + if detail: + detail.value = value + else: + detail = UserDetail(user_id=self.id, field_id=field.id, value=value) + db.session.add(detail) + db.session.commit() + return True + + def get_detail(self, field_name): + field = UserField.query.filter_by(name=field_name).first() + if not field: + return None + detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first() + return detail.value if detail else None + + @staticmethod + def create_user_field(name, description=None, is_required=True): + field = UserField(name=name, description=description, is_required=is_required) + db.session.add(field) + db.session.commit() + return field + + def __init__(self, *args, **kwargs): + super(User, self).__init__(*args, **kwargs) + self.login_info = UserLoginInfo(user=self) + + @classmethod + def find_by_username_or_email(cls, login): + return cls.query.filter( + db.or_( + cls.username == login, + cls.details.any( + db.and_( + UserDetail.field.has(UserField.name == 'email'), + UserDetail.value == login + ) + ) + ) + ).first() + + def get_all_permissions(self): + """获取用户的所有权限,包括角色权限和部门权限""" + all_permissions = set() + for role in self.roles: + all_permissions.update(role.permissions) + all_permissions.update(self.primary_department.get_effective_permissions()) + for dept in self.secondary_departments: + all_permissions.update(dept.get_effective_permissions()) + return list(all_permissions) + + def check_password(self, password): + if self.password: + return self.password.check_password(password) + return False + +class UserDetail(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + field_id = db.Column(db.Integer, db.ForeignKey('user_field.id'), nullable=False) + value = db.Column(db.String(255)) + user = db.relationship('User', back_populates='details') + field = db.relationship('UserField') + +class Role(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + parent_id = db.Column(db.Integer, db.ForeignKey('role.id')) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + users = relationship('User', secondary='user_roles', back_populates='roles') + permissions = relationship('Permission', secondary='role_permissions', back_populates='roles') + details = relationship('RoleDetail', backref='role', lazy='dynamic') + parent = relationship('Role', remote_side=[id], backref=db.backref('children', lazy='dynamic')) + + def __repr__(self): + return f'' + + def set_detail(self, field_name, value): + detail = RoleDetail.query.filter_by(role_id=self.id, field_id=RoleField.query.filter_by(name=field_name).first().id).first() + if detail: + detail.value = value + else: + field = RoleField.query.filter_by(name=field_name).first() + if field: + detail = RoleDetail(role_id=self.id, field_id=field.id, value=value) + db.session.add(detail) + db.session.commit() + + def get_detail(self, field_name): + field = RoleField.query.filter_by(name=field_name).first() + if field: + detail = RoleDetail.query.filter_by(role_id=self.id, field_id=field.id).first() + return detail.value if detail else None + return None + + @staticmethod + def init_default_roles(): + default_roles = [ + {"name": "Global Administrator", "description": "Has all system permissions and can manage all functions and data"}, + {"name": "Global Readonly", "description": "All data in the system can be viewed but not modified."}, + {"name": "Global NO Permissions", "description": "Basic user, no special permissions."}, + {"name": "frontline staff", "description": "Front-level employees responsible for daily operations."}, + {"name": "manager", "description": "Middle managers responsible for managing teams and departments."}, + {"name": "director", "description": "Senior managers responsible for multiple departments or large projects."}, + {"name": "CEO", "description": "The company's top management is responsible for overall strategy and decision-making."} + ] + for role_data in default_roles: + existing_role = Role.query.filter_by(name=role_data['name']).first() + if not existing_role: + new_role = Role(name=role_data['name']) + db.session.add(new_role) + db.session.flush() # 获取新创建的角色ID + new_role.set_detail('description', role_data['description']) + else: + existing_role.set_detail('description', role_data['description']) + db.session.commit() + + @classmethod + def create_from_template(cls, template_id, name=None): + template = RoleTemplate.query.get(template_id) + if not template: + raise ValueError("Template not found") + + role = cls(name=name or template.name) + role.permissions = template.permissions.copy() + db.session.add(role) + db.session.commit() + return role + +class Permission(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + roles = relationship('Role', secondary='role_permissions', back_populates='permissions') + details = relationship('PermissionDetail', backref='permission', lazy='dynamic') + departments = db.relationship('Department', secondary='department_permissions', back_populates='permissions') + role_templates = db.relationship('RoleTemplate', secondary='role_template_permissions', back_populates='permissions') + + def __repr__(self): + return f'' + + def set_detail(self, field_name, value): + detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=PermissionField.query.filter_by(name=field_name).first().id).first() + if detail: + detail.value = value + else: + field = PermissionField.query.filter_by(name=field_name).first() + if field: + detail = PermissionDetail(permission_id=self.id, field_id=field.id, value=value) + db.session.add(detail) + db.session.commit() + + def get_detail(self, field_name): + field = PermissionField.query.filter_by(name=field_name).first() + if field: + detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=field.id).first() + return detail.value if detail else None + return None + + @staticmethod + def init_default_permissions(): + default_permissions = [ + # 基本操作权限 + {"name": "Create", "description": "Create a new record or resource", "code": "CREATE"}, + {"name": "Delete", "description": "Delete an existing record or resource", "code": "DELETE"}, + {"name": "Read", "description": "View or read a record or resource", "code": "READ"}, + {"name": "Update", "description": "Modify an existing record or resource", "code": "UPDATE"}, + {"name": "Approval", "description": "Review and approve actions or changes", "code": "APPROVE"}, + + # 系统级权限 + {"name": "User Management", "description": "Manage system users", "code": "MANAGE_USERS"}, + {"name": "Role Management", "description": "Management system roles", "code": "MANAGE_ROLES"}, + {"name": "Department Management", "description": "Manage company departments", "code": "MANAGE_DEPARTMENTS"}, + {"name": "Permission Management", "description": "Manage system permissions", "code": "MANAGE_PERMISSIONS"}, + {"name": "Log View", "description": "View system log", "code": "VIEW_LOGS"}, + {"name": "Data Export", "description": "Export system data", "code": "EXPORT_DATA"}, + {"name": "System Setting", "description": "Change system setting", "code": "SYSTEM_SETTINGS"}, + ] + for perm_data in default_permissions: + existing_perm = Permission.query.filter_by(name=perm_data['name']).first() + if not existing_perm: + new_perm = Permission(name=perm_data['name']) + db.session.add(new_perm) + db.session.flush() # 获取新创建的权限ID + new_perm.set_detail('description', perm_data['description']) + new_perm.set_detail('code', perm_data['code']) + else: + existing_perm.set_detail('description', perm_data['description']) + existing_perm.set_detail('code', perm_data['code']) + db.session.commit() + +user_roles = db.Table('user_roles', + db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), + db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True) +) + +role_permissions = db.Table('role_permissions', + db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True), + db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True) +) + +class UserLoginInfo(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False) + register_time = db.Column(db.DateTime, default=datetime.utcnow) + login_count = db.Column(db.Integer, default=0) + last_login_time = db.Column(db.DateTime) + is_new_user = db.Column(db.Boolean, default=True) + has_changed_initial_password = db.Column(db.Boolean, default=False) + user = db.relationship('User', back_populates='login_info') + + def update_login_info(self): + self.login_count += 1 + self.last_login_time = datetime.utcnow() + self.is_new_user = False + db.session.commit() + +class UserPassword(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False) + password_hash = db.Column(db.String(255)) # 将长度从 128 增加到 255 + user = db.relationship('User', back_populates='password') + + def set_password(self, password): + self.password_hash = generate_password_hash(password) + + def check_password(self, password): + return check_password_hash(self.password_hash, password) + +@login_manager.user_loader +def load_user(user_id): + return User.query.get(int(user_id)) + +# 部门表 +class Department(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), index=True, unique=True, nullable=False) + parent_id = db.Column(db.Integer, db.ForeignKey('department.id')) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + details = db.relationship('DepartmentDetail', backref='department', lazy='dynamic') + parent = db.relationship('Department', remote_side=[id], backref=db.backref('children', lazy='dynamic')) + permissions = db.relationship('Permission', secondary='department_permissions', back_populates='departments') + users = db.relationship('User', secondary='user_departments', back_populates='secondary_departments', lazy='dynamic') + + def __repr__(self): + return f'' + + def set_detail(self, field_name, value): + field = DepartmentField.query.filter_by(name=field_name).first() + if not field: + field = DepartmentField(name=field_name) + db.session.add(field) + db.session.flush() # 这会给新创建的 field 分配一个 id + + detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first() + if detail: + detail.value = value + else: + detail = DepartmentDetail(department_id=self.id, field_id=field.id, value=value) + db.session.add(detail) + + # 不要在这里提交,让调用者决定何时提交 + # db.session.commit() + + def get_detail(self, field_name): + field = DepartmentField.query.filter_by(name=field_name).first() + if field: + detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first() + return detail.value if detail else None + return None + + def get_all_children(self): + children = self.children.all() + all_children = list(children) + for child in children: + all_children.extend(child.get_all_children()) + return all_children + + def get_ancestors(self): + ancestors = [] + parent = self.parent + while parent: + ancestors.append(parent) + parent = parent.parent + return ancestors[::-1] # 返回反转后的列表,使得最顶层的祖先在前 + + @classmethod + def get_root_departments(cls): + return cls.query.filter_by(parent_id=None).all() + + @staticmethod + def init_default_departments(): + default_departments = [ + "信息技术", + "人事", + "财务", + "行政", + "采购", + "仓储", + "生产", + "物流", + "法务", + "市场", + "销售", + "客服" + ] + for dept_name in default_departments: + if not Department.query.filter_by(name=dept_name).first(): + new_dept = Department(name=dept_name) + db.session.add(new_dept) + db.session.commit() + + @staticmethod + def init_default_fields(): + default_fields = [ + {'name': 'department_code', 'description': '部门代码'}, + {'name': 'manager', 'description': '部门经理'}, + {'name': 'phone', 'description': '部门电话'}, + {'name': 'email', 'description': '部门邮箱'}, + {'name': 'location', 'description': '部门位置'}, + ] + for field in default_fields: + if not DepartmentField.query.filter_by(name=field['name']).first(): + new_field = DepartmentField(name=field['name'], description=field['description']) + db.session.add(new_field) + db.session.commit() + + def get_effective_permissions(self): + """获取部门的有效权限,包括自身的权限和父部门的权限""" + all_permissions = set(self.permissions) + parent = self.parent + while parent: + all_permissions.update(parent.permissions) + parent = parent.parent + return list(all_permissions) + +# 新增关联表 +department_permissions = db.Table('department_permissions', + db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True), + db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True) +) + +# 部门字段表 +class DepartmentField(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + description = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f'' + +# 部门详情表 +class DepartmentDetail(db.Model): + id = db.Column(db.Integer, primary_key=True) + department_id = db.Column(db.Integer, db.ForeignKey('department.id'), nullable=False) + field_id = db.Column(db.Integer, db.ForeignKey('department_field.id'), nullable=False) + value = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + field = db.relationship('DepartmentField', backref='details') + + def __repr__(self): + return f'' + +# 角色字段表 +class RoleField(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + description = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f'' + + @staticmethod + def init_default_fields(): + default_fields = [ + {'name': 'role_code', 'description': '角色代码'}, + {'name': 'description', 'description': '角色描述'}, + {'name': 'level', 'description': '角色级别'}, + {'name': 'permissions', 'description': '角色权限'}, + ] + for field in default_fields: + if not RoleField.query.filter_by(name=field['name']).first(): + new_field = RoleField(name=field['name'], description=field['description']) + db.session.add(new_field) + db.session.commit() + +# 角色详情表 +class RoleDetail(db.Model): + id = db.Column(db.Integer, primary_key=True) + role_id = db.Column(db.Integer, db.ForeignKey('role.id'), nullable=False) + field_id = db.Column(db.Integer, db.ForeignKey('role_field.id'), nullable=False) + value = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + field = relationship('RoleField', backref='details') + + def __repr__(self): + return f'' + +# 权限字段表 +class PermissionField(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + description = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f'' + + @staticmethod + def init_default_fields(): + default_fields = [ + {'name': 'description', 'description': '权限描述'}, + {'name': 'code', 'description': '权限代码'}, + {'name': 'category', 'description': '权限类别'}, + ] + for field in default_fields: + if not PermissionField.query.filter_by(name=field['name']).first(): + new_field = PermissionField(name=field['name'], description=field['description']) + db.session.add(new_field) + db.session.commit() + +# 权限详情表 +class PermissionDetail(db.Model): + id = db.Column(db.Integer, primary_key=True) + permission_id = db.Column(db.Integer, db.ForeignKey('permission.id'), nullable=False) + field_id = db.Column(db.Integer, db.ForeignKey('permission_field.id'), nullable=False) + value = db.Column(db.String(256)) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + field = relationship('PermissionField', backref='details') + + def __repr__(self): + return f'' + +# 新增关联表 +user_departments = db.Table('user_departments', + db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), + db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True) +) + +class RoleTemplate(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(64), unique=True, nullable=False) + description = db.Column(db.String(256)) + permissions = db.relationship('Permission', secondary='role_template_permissions', back_populates='role_templates') + + @classmethod + def create_template(cls, name, description, permissions): + template = cls(name=name, description=description) + template.permissions = permissions + db.session.add(template) + db.session.commit() + return template + +role_template_permissions = db.Table('role_template_permissions', + db.Column('role_template_id', db.Integer, db.ForeignKey('role_template.id'), primary_key=True), + db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True) +) + +class UserPasswordHistory(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False) + last_password_change = db.Column(db.DateTime, default=datetime.utcnow) + user = db.relationship('User', back_populates='password_history') + + def is_password_expired(self): + # 检查密码是否已经超过三个月未更改 + return datetime.utcnow() - self.last_password_change > timedelta(days=90) + + def update_password_change(self): + self.last_password_change = datetime.utcnow() + db.session.commit() \ No newline at end of file diff --git a/APP/permissions.py b/APP/permissions.py new file mode 100644 index 0000000..fc5d784 --- /dev/null +++ b/APP/permissions.py @@ -0,0 +1,95 @@ +from flask import Blueprint, request, jsonify +from app.models import Permission, Role, db +from sqlalchemy.exc import IntegrityError + +bp = Blueprint('permissions', __name__) + +@bp.route('', methods=['GET']) +def get_permissions(): + permissions = Permission.query.all() + permissions_list = [{ + 'id': permission.id, + 'name': permission.name, + 'description': permission.get_detail('description'), + 'created_at': permission.created_at.isoformat() if permission.created_at else None + } for permission in permissions] + return jsonify(permissions_list), 200 + +@bp.route('/', methods=['GET']) +def get_permission_details(permission_id): + permission = Permission.query.get(permission_id) + if not permission: + return jsonify({'message': '权限不存在'}), 404 + + roles = [{'id': r.id, 'name': r.name} for r in permission.roles] + + permission_details = { + 'id': permission.id, + 'name': permission.name, + 'description': permission.get_detail('description'), + 'created_at': permission.created_at.isoformat() if permission.created_at else None, + 'updated_at': permission.updated_at.isoformat() if permission.updated_at else None, + 'roles': roles + } + + return jsonify(permission_details), 200 + +@bp.route('', methods=['POST']) +def create_permission(): + data = request.get_json() + + if not data or 'name' not in data: + return jsonify({'message': '权限名称是必需的'}), 400 + + name = data['name'] + + # 检查权限名称是否已存在 + if Permission.query.filter_by(name=name).first(): + return jsonify({'message': '权限名称已存在'}), 400 + + new_permission = Permission(name=name) + db.session.add(new_permission) + + # 如果提供了描述,设置它 + if 'description' in data: + new_permission.set_detail('description', data['description']) + + try: + db.session.commit() + except IntegrityError: + db.session.rollback() + return jsonify({'message': '创建权限失败,可能是由于数据完整性问题'}), 500 + + return jsonify({'message': '权限创建成功', 'id': new_permission.id}), 201 + +@bp.route('/add_to_role', methods=['POST']) +def add_permission_to_role(): + data = request.get_json() + + if not data or 'permission_id' not in data or 'role_id' not in data: + return jsonify({'message': '权限ID和角色ID都是必需的'}), 400 + + permission_id = data['permission_id'] + role_id = data['role_id'] + + permission = Permission.query.get(permission_id) + if not permission: + return jsonify({'message': '权限不存在'}), 404 + + role = Role.query.get(role_id) + if not role: + return jsonify({'message': '角色不存在'}), 404 + + if permission not in role.permissions: + role.permissions.append(permission) + else: + return jsonify({'message': '该角色已经拥有此权限'}), 400 + + try: + db.session.commit() + return jsonify({'message': '权限成功添加到角色'}), 200 + except IntegrityError: + db.session.rollback() + return jsonify({'message': '添加权限到角色失败,可能是由于数据完整性问题'}), 500 + +# 你可以在这里添加更多的权限相关路由... diff --git a/APP/roles.py b/APP/roles.py new file mode 100644 index 0000000..4377d3f --- /dev/null +++ b/APP/roles.py @@ -0,0 +1,138 @@ +from flask import Blueprint, request, jsonify +from app.models import Role, User, db, RoleTemplate, Permission +from sqlalchemy.exc import IntegrityError +from datetime import datetime + +bp = Blueprint('roles', __name__) + +@bp.route('/add_user', methods=['POST']) +def add_user_to_role(role_id): + data = request.get_json() + + if not data or 'user_id' not in data: + return jsonify({'message': '用户ID是必需的'}), 400 + + user_id = data['user_id'] + + role = Role.query.get(role_id) + if not role: + return jsonify({'message': '角色不存在'}), 404 + + user = User.query.get(user_id) + if not user: + return jsonify({'message': '用户不存在'}), 404 + + if role not in user.roles: + user.roles.append(role) + else: + return jsonify({'message': '用户已经在该角色中'}), 400 + + try: + db.session.commit() + return jsonify({'message': '用户成功添加到角色'}), 200 + except IntegrityError: + db.session.rollback() + return jsonify({'message': '添加用户到角色失败,可能是由于数据完整性问题'}), 500 + +# 你可以在这里添加更多的角色相关路由... + +@bp.route('', methods=['POST']) +def create_role(): + data = request.get_json() + + if not data or 'name' not in data: + return jsonify({'message': '角色名称是必需的'}), 400 + + name = data['name'] + + # 检查角色名称是否已存在 + if Role.query.filter_by(name=name).first(): + return jsonify({'message': '角色名称已存在'}), 400 + + new_role = Role(name=name) + db.session.add(new_role) + + # 如果提供了描述,设置它 + if 'description' in data: + new_role.set_detail('description', data['description']) + + try: + db.session.commit() + except IntegrityError: + db.session.rollback() + return jsonify({'message': '创建角色失败,可能是由于数据完整性问题'}), 500 + + return jsonify({'message': '角色创建成功', 'id': new_role.id}), 201 + +@bp.route('/', methods=['GET']) +def get_role(role_id): + role = Role.query.get(role_id) + if not role: + return jsonify({'message': '角色不存在'}), 404 + + return jsonify({ + 'id': role.id, + 'name': role.name, + 'description': role.get_detail('description'), + 'users': [user.id for user in role.users], + 'parent_id': role.parent_id + }), 200 + +@bp.route('/templates', methods=['POST']) +def create_role_template(): + data = request.get_json() + if not data or 'name' not in data or 'permissions' not in data: + return jsonify({'message': '缺少必要的字段'}), 400 + + template = RoleTemplate.create_template( + name=data['name'], + description=data.get('description'), + permissions=[Permission.query.get(p_id) for p_id in data['permissions']] + ) + return jsonify({'message': '角色模板创建成功', 'id': template.id}), 201 + +@bp.route('/from_template', methods=['POST']) +def create_role_from_template(): + data = request.get_json() + if not data or 'template_id' not in data: + return jsonify({'message': '缺少必要的字段'}), 400 + + try: + role = Role.create_from_template( + template_id=data['template_id'], + name=data.get('name') + ) + return jsonify({'message': '角色创建成功', 'id': role.id}), 201 + except ValueError as e: + return jsonify({'message': str(e)}), 400 + +@bp.route('', methods=['GET']) +def get_roles(): + roles = Role.query.all() + roles_list = [{ + 'id': role.id, + 'name': role.name, + 'description': role.get_detail('description'), + 'created_at': role.created_at.isoformat() if role.created_at else None + } for role in roles] + return jsonify(roles_list), 200 + +@bp.route('/', methods=['GET']) +def get_role_details(role_id): + role = Role.query.get(role_id) + if not role: + return jsonify({'message': '角色不存在'}), 404 + + permissions = [{'id': p.id, 'name': p.name, 'description': p.get_detail('description')} for p in role.permissions] + users = [{'id': u.id, 'username': u.username} for u in role.users] + + role_details = { + 'id': role.id, + 'name': role.name, + 'description': role.get_detail('description'), + 'created_at': role.created_at.isoformat() if role.created_at else None, + 'permissions': permissions, + 'users': users + } + + return jsonify(role_details), 200 diff --git a/APP/users.py b/APP/users.py new file mode 100644 index 0000000..6592fd5 --- /dev/null +++ b/APP/users.py @@ -0,0 +1,57 @@ +from flask import Blueprint, jsonify, request +from app.models import User, UserDetail, UserField +from app import db + +bp = Blueprint('users', __name__) + +@bp.route('', methods=['GET']) +def get_users(): + page = request.args.get('page', 1, type=int) + per_page = request.args.get('per_page', 10, type=int) + users = User.query.paginate(page=page, per_page=per_page, error_out=False) + + users_list = [{ + 'id': user.id, + 'username': user.username, + 'primary_department': user.primary_department.name if user.primary_department else None, + 'roles': [role.name for role in user.roles] + } for user in users.items] + + return jsonify({ + 'users': users_list, + 'total': users.total, + 'pages': users.pages, + 'current_page': page + }), 200 + +@bp.route('/', methods=['GET']) +def get_user_details(user_id): + user = User.query.get(user_id) + if not user: + return jsonify({'message': '用户不存在'}), 404 + + user_fields = UserField.query.all() + user_details = {} + for field in user_fields: + detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first() + user_details[field.name] = detail.value if detail else None + + user_info = { + 'id': user.id, + 'username': user.username, + 'primary_department': user.primary_department.name if user.primary_department else None, + 'secondary_departments': [dept.name for dept in user.secondary_departments], + 'roles': [role.name for role in user.roles], + 'details': user_details, + 'login_info': { + 'register_time': user.login_info.register_time.isoformat(), + 'login_count': user.login_info.login_count, + 'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None, + 'is_new_user': user.login_info.is_new_user, + 'has_changed_initial_password': user.login_info.has_changed_initial_password + } + } + + return jsonify(user_info), 200 + +# 可以在这里添加更多用户相关的路由...