上传文件至 APP

This commit is contained in:
wangwei 2024-10-14 14:14:23 +08:00
parent 4394f04ea3
commit be8628dceb
5 changed files with 850 additions and 0 deletions

7
APP/extensions.py Normal file
View File

@ -0,0 +1,7 @@
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_jwt_extended import JWTManager
db = SQLAlchemy()
login_manager = LoginManager()
jwt = JWTManager()

553
APP/models.py Normal file
View File

@ -0,0 +1,553 @@
from app import db, login_manager
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
from sqlalchemy.orm import relationship
class UserField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(255))
is_required = db.Column(db.Boolean, default=True)
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'first_name', 'description': '', 'is_required': True},
{'name': 'last_name', 'description': '', 'is_required': True},
{'name': 'id_number', 'description': '身份证号码', 'is_required': True},
{'name': 'phone', 'description': '电话', 'is_required': True},
{'name': 'gender', 'description': '性别', 'is_required': True},
{'name': 'email', 'description': '邮箱', 'is_required': False},
{'name': 'photo', 'description': '照片', 'is_required': False},
]
for field in default_fields:
if not UserField.query.filter_by(name=field['name']).first():
new_field = UserField(name=field['name'], description=field['description'], is_required=field['is_required'])
db.session.add(new_field)
db.session.commit()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True, nullable=False)
roles = db.relationship('Role', secondary='user_roles', back_populates='users')
details = db.relationship('UserDetail', back_populates='user', cascade='all, delete-orphan')
login_info = db.relationship('UserLoginInfo', uselist=False, back_populates='user', cascade='all, delete-orphan')
password = db.relationship('UserPassword', uselist=False, back_populates='user', cascade='all, delete-orphan')
primary_department_id = db.Column(db.Integer, db.ForeignKey('department.id'))
primary_department = db.relationship('Department', foreign_keys=[primary_department_id])
secondary_departments = db.relationship('Department', secondary='user_departments', back_populates='users', lazy='dynamic')
password_history = db.relationship('UserPasswordHistory', uselist=False, back_populates='user', cascade='all, delete-orphan')
def set_password(self, password):
if self.password:
self.password.set_password(password)
else:
self.password = UserPassword(user=self)
self.password.set_password(password)
# 更新密码修改历史
if self.password_history:
self.password_history.update_password_change()
else:
self.password_history = UserPasswordHistory(user=self)
def needs_password_change(self):
if self.login_info.is_new_user or not self.login_info.has_changed_initial_password:
return True
if not self.password_history:
return True
return self.password_history.is_password_expired()
def set_detail(self, field_name, value):
field = UserField.query.filter_by(name=field_name).first()
if not field:
return False
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
if detail:
detail.value = value
else:
detail = UserDetail(user_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
return True
def get_detail(self, field_name):
field = UserField.query.filter_by(name=field_name).first()
if not field:
return None
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
return detail.value if detail else None
@staticmethod
def create_user_field(name, description=None, is_required=True):
field = UserField(name=name, description=description, is_required=is_required)
db.session.add(field)
db.session.commit()
return field
def __init__(self, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self.login_info = UserLoginInfo(user=self)
@classmethod
def find_by_username_or_email(cls, login):
return cls.query.filter(
db.or_(
cls.username == login,
cls.details.any(
db.and_(
UserDetail.field.has(UserField.name == 'email'),
UserDetail.value == login
)
)
)
).first()
def get_all_permissions(self):
"""获取用户的所有权限,包括角色权限和部门权限"""
all_permissions = set()
for role in self.roles:
all_permissions.update(role.permissions)
all_permissions.update(self.primary_department.get_effective_permissions())
for dept in self.secondary_departments:
all_permissions.update(dept.get_effective_permissions())
return list(all_permissions)
def check_password(self, password):
if self.password:
return self.password.check_password(password)
return False
class UserDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('user_field.id'), nullable=False)
value = db.Column(db.String(255))
user = db.relationship('User', back_populates='details')
field = db.relationship('UserField')
class Role(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('role.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
users = relationship('User', secondary='user_roles', back_populates='roles')
permissions = relationship('Permission', secondary='role_permissions', back_populates='roles')
details = relationship('RoleDetail', backref='role', lazy='dynamic')
parent = relationship('Role', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
def __repr__(self):
return f'<Role {self.name}>'
def set_detail(self, field_name, value):
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=RoleField.query.filter_by(name=field_name).first().id).first()
if detail:
detail.value = value
else:
field = RoleField.query.filter_by(name=field_name).first()
if field:
detail = RoleDetail(role_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
def get_detail(self, field_name):
field = RoleField.query.filter_by(name=field_name).first()
if field:
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
@staticmethod
def init_default_roles():
default_roles = [
{"name": "Global Administrator", "description": "Has all system permissions and can manage all functions and data"},
{"name": "Global Readonly", "description": "All data in the system can be viewed but not modified."},
{"name": "Global NO Permissions", "description": "Basic user, no special permissions."},
{"name": "frontline staff", "description": "Front-level employees responsible for daily operations."},
{"name": "manager", "description": "Middle managers responsible for managing teams and departments."},
{"name": "director", "description": "Senior managers responsible for multiple departments or large projects."},
{"name": "CEO", "description": "The company's top management is responsible for overall strategy and decision-making."}
]
for role_data in default_roles:
existing_role = Role.query.filter_by(name=role_data['name']).first()
if not existing_role:
new_role = Role(name=role_data['name'])
db.session.add(new_role)
db.session.flush() # 获取新创建的角色ID
new_role.set_detail('description', role_data['description'])
else:
existing_role.set_detail('description', role_data['description'])
db.session.commit()
@classmethod
def create_from_template(cls, template_id, name=None):
template = RoleTemplate.query.get(template_id)
if not template:
raise ValueError("Template not found")
role = cls(name=name or template.name)
role.permissions = template.permissions.copy()
db.session.add(role)
db.session.commit()
return role
class Permission(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
roles = relationship('Role', secondary='role_permissions', back_populates='permissions')
details = relationship('PermissionDetail', backref='permission', lazy='dynamic')
departments = db.relationship('Department', secondary='department_permissions', back_populates='permissions')
role_templates = db.relationship('RoleTemplate', secondary='role_template_permissions', back_populates='permissions')
def __repr__(self):
return f'<Permission {self.name}>'
def set_detail(self, field_name, value):
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=PermissionField.query.filter_by(name=field_name).first().id).first()
if detail:
detail.value = value
else:
field = PermissionField.query.filter_by(name=field_name).first()
if field:
detail = PermissionDetail(permission_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
def get_detail(self, field_name):
field = PermissionField.query.filter_by(name=field_name).first()
if field:
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
@staticmethod
def init_default_permissions():
default_permissions = [
# 基本操作权限
{"name": "Create", "description": "Create a new record or resource", "code": "CREATE"},
{"name": "Delete", "description": "Delete an existing record or resource", "code": "DELETE"},
{"name": "Read", "description": "View or read a record or resource", "code": "READ"},
{"name": "Update", "description": "Modify an existing record or resource", "code": "UPDATE"},
{"name": "Approval", "description": "Review and approve actions or changes", "code": "APPROVE"},
# 系统级权限
{"name": "User Management", "description": "Manage system users", "code": "MANAGE_USERS"},
{"name": "Role Management", "description": "Management system roles", "code": "MANAGE_ROLES"},
{"name": "Department Management", "description": "Manage company departments", "code": "MANAGE_DEPARTMENTS"},
{"name": "Permission Management", "description": "Manage system permissions", "code": "MANAGE_PERMISSIONS"},
{"name": "Log View", "description": "View system log", "code": "VIEW_LOGS"},
{"name": "Data Export", "description": "Export system data", "code": "EXPORT_DATA"},
{"name": "System Setting", "description": "Change system setting", "code": "SYSTEM_SETTINGS"},
]
for perm_data in default_permissions:
existing_perm = Permission.query.filter_by(name=perm_data['name']).first()
if not existing_perm:
new_perm = Permission(name=perm_data['name'])
db.session.add(new_perm)
db.session.flush() # 获取新创建的权限ID
new_perm.set_detail('description', perm_data['description'])
new_perm.set_detail('code', perm_data['code'])
else:
existing_perm.set_detail('description', perm_data['description'])
existing_perm.set_detail('code', perm_data['code'])
db.session.commit()
user_roles = db.Table('user_roles',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True)
)
role_permissions = db.Table('role_permissions',
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
class UserLoginInfo(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
register_time = db.Column(db.DateTime, default=datetime.utcnow)
login_count = db.Column(db.Integer, default=0)
last_login_time = db.Column(db.DateTime)
is_new_user = db.Column(db.Boolean, default=True)
has_changed_initial_password = db.Column(db.Boolean, default=False)
user = db.relationship('User', back_populates='login_info')
def update_login_info(self):
self.login_count += 1
self.last_login_time = datetime.utcnow()
self.is_new_user = False
db.session.commit()
class UserPassword(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
password_hash = db.Column(db.String(255)) # 将长度从 128 增加到 255
user = db.relationship('User', back_populates='password')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# 部门表
class Department(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), index=True, unique=True, nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('department.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
details = db.relationship('DepartmentDetail', backref='department', lazy='dynamic')
parent = db.relationship('Department', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
permissions = db.relationship('Permission', secondary='department_permissions', back_populates='departments')
users = db.relationship('User', secondary='user_departments', back_populates='secondary_departments', lazy='dynamic')
def __repr__(self):
return f'<Department {self.name}>'
def set_detail(self, field_name, value):
field = DepartmentField.query.filter_by(name=field_name).first()
if not field:
field = DepartmentField(name=field_name)
db.session.add(field)
db.session.flush() # 这会给新创建的 field 分配一个 id
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
if detail:
detail.value = value
else:
detail = DepartmentDetail(department_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
# 不要在这里提交,让调用者决定何时提交
# db.session.commit()
def get_detail(self, field_name):
field = DepartmentField.query.filter_by(name=field_name).first()
if field:
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
def get_all_children(self):
children = self.children.all()
all_children = list(children)
for child in children:
all_children.extend(child.get_all_children())
return all_children
def get_ancestors(self):
ancestors = []
parent = self.parent
while parent:
ancestors.append(parent)
parent = parent.parent
return ancestors[::-1] # 返回反转后的列表,使得最顶层的祖先在前
@classmethod
def get_root_departments(cls):
return cls.query.filter_by(parent_id=None).all()
@staticmethod
def init_default_departments():
default_departments = [
"信息技术",
"人事",
"财务",
"行政",
"采购",
"仓储",
"生产",
"物流",
"法务",
"市场",
"销售",
"客服"
]
for dept_name in default_departments:
if not Department.query.filter_by(name=dept_name).first():
new_dept = Department(name=dept_name)
db.session.add(new_dept)
db.session.commit()
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'department_code', 'description': '部门代码'},
{'name': 'manager', 'description': '部门经理'},
{'name': 'phone', 'description': '部门电话'},
{'name': 'email', 'description': '部门邮箱'},
{'name': 'location', 'description': '部门位置'},
]
for field in default_fields:
if not DepartmentField.query.filter_by(name=field['name']).first():
new_field = DepartmentField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
def get_effective_permissions(self):
"""获取部门的有效权限,包括自身的权限和父部门的权限"""
all_permissions = set(self.permissions)
parent = self.parent
while parent:
all_permissions.update(parent.permissions)
parent = parent.parent
return list(all_permissions)
# 新增关联表
department_permissions = db.Table('department_permissions',
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
# 部门字段表
class DepartmentField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<DepartmentField {self.name}>'
# 部门详情表
class DepartmentDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
department_id = db.Column(db.Integer, db.ForeignKey('department.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('department_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = db.relationship('DepartmentField', backref='details')
def __repr__(self):
return f'<DepartmentDetail {self.department_id} - {self.field.name}: {self.value}>'
# 角色字段表
class RoleField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<RoleField {self.name}>'
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'role_code', 'description': '角色代码'},
{'name': 'description', 'description': '角色描述'},
{'name': 'level', 'description': '角色级别'},
{'name': 'permissions', 'description': '角色权限'},
]
for field in default_fields:
if not RoleField.query.filter_by(name=field['name']).first():
new_field = RoleField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
# 角色详情表
class RoleDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey('role.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('role_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = relationship('RoleField', backref='details')
def __repr__(self):
return f'<RoleDetail {self.role_id} - {self.field.name}: {self.value}>'
# 权限字段表
class PermissionField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<PermissionField {self.name}>'
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'description', 'description': '权限描述'},
{'name': 'code', 'description': '权限代码'},
{'name': 'category', 'description': '权限类别'},
]
for field in default_fields:
if not PermissionField.query.filter_by(name=field['name']).first():
new_field = PermissionField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
# 权限详情表
class PermissionDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
permission_id = db.Column(db.Integer, db.ForeignKey('permission.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('permission_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = relationship('PermissionField', backref='details')
def __repr__(self):
return f'<PermissionDetail {self.permission_id} - {self.field.name}: {self.value}>'
# 新增关联表
user_departments = db.Table('user_departments',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True)
)
class RoleTemplate(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
permissions = db.relationship('Permission', secondary='role_template_permissions', back_populates='role_templates')
@classmethod
def create_template(cls, name, description, permissions):
template = cls(name=name, description=description)
template.permissions = permissions
db.session.add(template)
db.session.commit()
return template
role_template_permissions = db.Table('role_template_permissions',
db.Column('role_template_id', db.Integer, db.ForeignKey('role_template.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
class UserPasswordHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
last_password_change = db.Column(db.DateTime, default=datetime.utcnow)
user = db.relationship('User', back_populates='password_history')
def is_password_expired(self):
# 检查密码是否已经超过三个月未更改
return datetime.utcnow() - self.last_password_change > timedelta(days=90)
def update_password_change(self):
self.last_password_change = datetime.utcnow()
db.session.commit()

95
APP/permissions.py Normal file
View File

@ -0,0 +1,95 @@
from flask import Blueprint, request, jsonify
from app.models import Permission, Role, db
from sqlalchemy.exc import IntegrityError
bp = Blueprint('permissions', __name__)
@bp.route('', methods=['GET'])
def get_permissions():
permissions = Permission.query.all()
permissions_list = [{
'id': permission.id,
'name': permission.name,
'description': permission.get_detail('description'),
'created_at': permission.created_at.isoformat() if permission.created_at else None
} for permission in permissions]
return jsonify(permissions_list), 200
@bp.route('/<int:permission_id>', methods=['GET'])
def get_permission_details(permission_id):
permission = Permission.query.get(permission_id)
if not permission:
return jsonify({'message': '权限不存在'}), 404
roles = [{'id': r.id, 'name': r.name} for r in permission.roles]
permission_details = {
'id': permission.id,
'name': permission.name,
'description': permission.get_detail('description'),
'created_at': permission.created_at.isoformat() if permission.created_at else None,
'updated_at': permission.updated_at.isoformat() if permission.updated_at else None,
'roles': roles
}
return jsonify(permission_details), 200
@bp.route('', methods=['POST'])
def create_permission():
data = request.get_json()
if not data or 'name' not in data:
return jsonify({'message': '权限名称是必需的'}), 400
name = data['name']
# 检查权限名称是否已存在
if Permission.query.filter_by(name=name).first():
return jsonify({'message': '权限名称已存在'}), 400
new_permission = Permission(name=name)
db.session.add(new_permission)
# 如果提供了描述,设置它
if 'description' in data:
new_permission.set_detail('description', data['description'])
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return jsonify({'message': '创建权限失败,可能是由于数据完整性问题'}), 500
return jsonify({'message': '权限创建成功', 'id': new_permission.id}), 201
@bp.route('/add_to_role', methods=['POST'])
def add_permission_to_role():
data = request.get_json()
if not data or 'permission_id' not in data or 'role_id' not in data:
return jsonify({'message': '权限ID和角色ID都是必需的'}), 400
permission_id = data['permission_id']
role_id = data['role_id']
permission = Permission.query.get(permission_id)
if not permission:
return jsonify({'message': '权限不存在'}), 404
role = Role.query.get(role_id)
if not role:
return jsonify({'message': '角色不存在'}), 404
if permission not in role.permissions:
role.permissions.append(permission)
else:
return jsonify({'message': '该角色已经拥有此权限'}), 400
try:
db.session.commit()
return jsonify({'message': '权限成功添加到角色'}), 200
except IntegrityError:
db.session.rollback()
return jsonify({'message': '添加权限到角色失败,可能是由于数据完整性问题'}), 500
# 你可以在这里添加更多的权限相关路由...

138
APP/roles.py Normal file
View File

@ -0,0 +1,138 @@
from flask import Blueprint, request, jsonify
from app.models import Role, User, db, RoleTemplate, Permission
from sqlalchemy.exc import IntegrityError
from datetime import datetime
bp = Blueprint('roles', __name__)
@bp.route('<int:role_id>/add_user', methods=['POST'])
def add_user_to_role(role_id):
data = request.get_json()
if not data or 'user_id' not in data:
return jsonify({'message': '用户ID是必需的'}), 400
user_id = data['user_id']
role = Role.query.get(role_id)
if not role:
return jsonify({'message': '角色不存在'}), 404
user = User.query.get(user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if role not in user.roles:
user.roles.append(role)
else:
return jsonify({'message': '用户已经在该角色中'}), 400
try:
db.session.commit()
return jsonify({'message': '用户成功添加到角色'}), 200
except IntegrityError:
db.session.rollback()
return jsonify({'message': '添加用户到角色失败,可能是由于数据完整性问题'}), 500
# 你可以在这里添加更多的角色相关路由...
@bp.route('', methods=['POST'])
def create_role():
data = request.get_json()
if not data or 'name' not in data:
return jsonify({'message': '角色名称是必需的'}), 400
name = data['name']
# 检查角色名称是否已存在
if Role.query.filter_by(name=name).first():
return jsonify({'message': '角色名称已存在'}), 400
new_role = Role(name=name)
db.session.add(new_role)
# 如果提供了描述,设置它
if 'description' in data:
new_role.set_detail('description', data['description'])
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return jsonify({'message': '创建角色失败,可能是由于数据完整性问题'}), 500
return jsonify({'message': '角色创建成功', 'id': new_role.id}), 201
@bp.route('/<int:role_id>', methods=['GET'])
def get_role(role_id):
role = Role.query.get(role_id)
if not role:
return jsonify({'message': '角色不存在'}), 404
return jsonify({
'id': role.id,
'name': role.name,
'description': role.get_detail('description'),
'users': [user.id for user in role.users],
'parent_id': role.parent_id
}), 200
@bp.route('/templates', methods=['POST'])
def create_role_template():
data = request.get_json()
if not data or 'name' not in data or 'permissions' not in data:
return jsonify({'message': '缺少必要的字段'}), 400
template = RoleTemplate.create_template(
name=data['name'],
description=data.get('description'),
permissions=[Permission.query.get(p_id) for p_id in data['permissions']]
)
return jsonify({'message': '角色模板创建成功', 'id': template.id}), 201
@bp.route('/from_template', methods=['POST'])
def create_role_from_template():
data = request.get_json()
if not data or 'template_id' not in data:
return jsonify({'message': '缺少必要的字段'}), 400
try:
role = Role.create_from_template(
template_id=data['template_id'],
name=data.get('name')
)
return jsonify({'message': '角色创建成功', 'id': role.id}), 201
except ValueError as e:
return jsonify({'message': str(e)}), 400
@bp.route('', methods=['GET'])
def get_roles():
roles = Role.query.all()
roles_list = [{
'id': role.id,
'name': role.name,
'description': role.get_detail('description'),
'created_at': role.created_at.isoformat() if role.created_at else None
} for role in roles]
return jsonify(roles_list), 200
@bp.route('/<int:role_id>', methods=['GET'])
def get_role_details(role_id):
role = Role.query.get(role_id)
if not role:
return jsonify({'message': '角色不存在'}), 404
permissions = [{'id': p.id, 'name': p.name, 'description': p.get_detail('description')} for p in role.permissions]
users = [{'id': u.id, 'username': u.username} for u in role.users]
role_details = {
'id': role.id,
'name': role.name,
'description': role.get_detail('description'),
'created_at': role.created_at.isoformat() if role.created_at else None,
'permissions': permissions,
'users': users
}
return jsonify(role_details), 200

57
APP/users.py Normal file
View File

@ -0,0 +1,57 @@
from flask import Blueprint, jsonify, request
from app.models import User, UserDetail, UserField
from app import db
bp = Blueprint('users', __name__)
@bp.route('', methods=['GET'])
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
users = User.query.paginate(page=page, per_page=per_page, error_out=False)
users_list = [{
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'roles': [role.name for role in user.roles]
} for user in users.items]
return jsonify({
'users': users_list,
'total': users.total,
'pages': users.pages,
'current_page': page
}), 200
@bp.route('/<int:user_id>', methods=['GET'])
def get_user_details(user_id):
user = User.query.get(user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'details': user_details,
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify(user_info), 200
# 可以在这里添加更多用户相关的路由...