OA/APP/models.py
2024-10-14 14:14:23 +08:00

553 lines
25 KiB
Python

from app import db, login_manager
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
from sqlalchemy.orm import relationship
class UserField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(255))
is_required = db.Column(db.Boolean, default=True)
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'first_name', 'description': '', 'is_required': True},
{'name': 'last_name', 'description': '', 'is_required': True},
{'name': 'id_number', 'description': '身份证号码', 'is_required': True},
{'name': 'phone', 'description': '电话', 'is_required': True},
{'name': 'gender', 'description': '性别', 'is_required': True},
{'name': 'email', 'description': '邮箱', 'is_required': False},
{'name': 'photo', 'description': '照片', 'is_required': False},
]
for field in default_fields:
if not UserField.query.filter_by(name=field['name']).first():
new_field = UserField(name=field['name'], description=field['description'], is_required=field['is_required'])
db.session.add(new_field)
db.session.commit()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True, nullable=False)
roles = db.relationship('Role', secondary='user_roles', back_populates='users')
details = db.relationship('UserDetail', back_populates='user', cascade='all, delete-orphan')
login_info = db.relationship('UserLoginInfo', uselist=False, back_populates='user', cascade='all, delete-orphan')
password = db.relationship('UserPassword', uselist=False, back_populates='user', cascade='all, delete-orphan')
primary_department_id = db.Column(db.Integer, db.ForeignKey('department.id'))
primary_department = db.relationship('Department', foreign_keys=[primary_department_id])
secondary_departments = db.relationship('Department', secondary='user_departments', back_populates='users', lazy='dynamic')
password_history = db.relationship('UserPasswordHistory', uselist=False, back_populates='user', cascade='all, delete-orphan')
def set_password(self, password):
if self.password:
self.password.set_password(password)
else:
self.password = UserPassword(user=self)
self.password.set_password(password)
# 更新密码修改历史
if self.password_history:
self.password_history.update_password_change()
else:
self.password_history = UserPasswordHistory(user=self)
def needs_password_change(self):
if self.login_info.is_new_user or not self.login_info.has_changed_initial_password:
return True
if not self.password_history:
return True
return self.password_history.is_password_expired()
def set_detail(self, field_name, value):
field = UserField.query.filter_by(name=field_name).first()
if not field:
return False
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
if detail:
detail.value = value
else:
detail = UserDetail(user_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
return True
def get_detail(self, field_name):
field = UserField.query.filter_by(name=field_name).first()
if not field:
return None
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
return detail.value if detail else None
@staticmethod
def create_user_field(name, description=None, is_required=True):
field = UserField(name=name, description=description, is_required=is_required)
db.session.add(field)
db.session.commit()
return field
def __init__(self, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self.login_info = UserLoginInfo(user=self)
@classmethod
def find_by_username_or_email(cls, login):
return cls.query.filter(
db.or_(
cls.username == login,
cls.details.any(
db.and_(
UserDetail.field.has(UserField.name == 'email'),
UserDetail.value == login
)
)
)
).first()
def get_all_permissions(self):
"""获取用户的所有权限,包括角色权限和部门权限"""
all_permissions = set()
for role in self.roles:
all_permissions.update(role.permissions)
all_permissions.update(self.primary_department.get_effective_permissions())
for dept in self.secondary_departments:
all_permissions.update(dept.get_effective_permissions())
return list(all_permissions)
def check_password(self, password):
if self.password:
return self.password.check_password(password)
return False
class UserDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('user_field.id'), nullable=False)
value = db.Column(db.String(255))
user = db.relationship('User', back_populates='details')
field = db.relationship('UserField')
class Role(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('role.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
users = relationship('User', secondary='user_roles', back_populates='roles')
permissions = relationship('Permission', secondary='role_permissions', back_populates='roles')
details = relationship('RoleDetail', backref='role', lazy='dynamic')
parent = relationship('Role', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
def __repr__(self):
return f'<Role {self.name}>'
def set_detail(self, field_name, value):
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=RoleField.query.filter_by(name=field_name).first().id).first()
if detail:
detail.value = value
else:
field = RoleField.query.filter_by(name=field_name).first()
if field:
detail = RoleDetail(role_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
def get_detail(self, field_name):
field = RoleField.query.filter_by(name=field_name).first()
if field:
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
@staticmethod
def init_default_roles():
default_roles = [
{"name": "Global Administrator", "description": "Has all system permissions and can manage all functions and data"},
{"name": "Global Readonly", "description": "All data in the system can be viewed but not modified."},
{"name": "Global NO Permissions", "description": "Basic user, no special permissions."},
{"name": "frontline staff", "description": "Front-level employees responsible for daily operations."},
{"name": "manager", "description": "Middle managers responsible for managing teams and departments."},
{"name": "director", "description": "Senior managers responsible for multiple departments or large projects."},
{"name": "CEO", "description": "The company's top management is responsible for overall strategy and decision-making."}
]
for role_data in default_roles:
existing_role = Role.query.filter_by(name=role_data['name']).first()
if not existing_role:
new_role = Role(name=role_data['name'])
db.session.add(new_role)
db.session.flush() # 获取新创建的角色ID
new_role.set_detail('description', role_data['description'])
else:
existing_role.set_detail('description', role_data['description'])
db.session.commit()
@classmethod
def create_from_template(cls, template_id, name=None):
template = RoleTemplate.query.get(template_id)
if not template:
raise ValueError("Template not found")
role = cls(name=name or template.name)
role.permissions = template.permissions.copy()
db.session.add(role)
db.session.commit()
return role
class Permission(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
roles = relationship('Role', secondary='role_permissions', back_populates='permissions')
details = relationship('PermissionDetail', backref='permission', lazy='dynamic')
departments = db.relationship('Department', secondary='department_permissions', back_populates='permissions')
role_templates = db.relationship('RoleTemplate', secondary='role_template_permissions', back_populates='permissions')
def __repr__(self):
return f'<Permission {self.name}>'
def set_detail(self, field_name, value):
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=PermissionField.query.filter_by(name=field_name).first().id).first()
if detail:
detail.value = value
else:
field = PermissionField.query.filter_by(name=field_name).first()
if field:
detail = PermissionDetail(permission_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
db.session.commit()
def get_detail(self, field_name):
field = PermissionField.query.filter_by(name=field_name).first()
if field:
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
@staticmethod
def init_default_permissions():
default_permissions = [
# 基本操作权限
{"name": "Create", "description": "Create a new record or resource", "code": "CREATE"},
{"name": "Delete", "description": "Delete an existing record or resource", "code": "DELETE"},
{"name": "Read", "description": "View or read a record or resource", "code": "READ"},
{"name": "Update", "description": "Modify an existing record or resource", "code": "UPDATE"},
{"name": "Approval", "description": "Review and approve actions or changes", "code": "APPROVE"},
# 系统级权限
{"name": "User Management", "description": "Manage system users", "code": "MANAGE_USERS"},
{"name": "Role Management", "description": "Management system roles", "code": "MANAGE_ROLES"},
{"name": "Department Management", "description": "Manage company departments", "code": "MANAGE_DEPARTMENTS"},
{"name": "Permission Management", "description": "Manage system permissions", "code": "MANAGE_PERMISSIONS"},
{"name": "Log View", "description": "View system log", "code": "VIEW_LOGS"},
{"name": "Data Export", "description": "Export system data", "code": "EXPORT_DATA"},
{"name": "System Setting", "description": "Change system setting", "code": "SYSTEM_SETTINGS"},
]
for perm_data in default_permissions:
existing_perm = Permission.query.filter_by(name=perm_data['name']).first()
if not existing_perm:
new_perm = Permission(name=perm_data['name'])
db.session.add(new_perm)
db.session.flush() # 获取新创建的权限ID
new_perm.set_detail('description', perm_data['description'])
new_perm.set_detail('code', perm_data['code'])
else:
existing_perm.set_detail('description', perm_data['description'])
existing_perm.set_detail('code', perm_data['code'])
db.session.commit()
user_roles = db.Table('user_roles',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True)
)
role_permissions = db.Table('role_permissions',
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
class UserLoginInfo(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
register_time = db.Column(db.DateTime, default=datetime.utcnow)
login_count = db.Column(db.Integer, default=0)
last_login_time = db.Column(db.DateTime)
is_new_user = db.Column(db.Boolean, default=True)
has_changed_initial_password = db.Column(db.Boolean, default=False)
user = db.relationship('User', back_populates='login_info')
def update_login_info(self):
self.login_count += 1
self.last_login_time = datetime.utcnow()
self.is_new_user = False
db.session.commit()
class UserPassword(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
password_hash = db.Column(db.String(255)) # 将长度从 128 增加到 255
user = db.relationship('User', back_populates='password')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# 部门表
class Department(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), index=True, unique=True, nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('department.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
details = db.relationship('DepartmentDetail', backref='department', lazy='dynamic')
parent = db.relationship('Department', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
permissions = db.relationship('Permission', secondary='department_permissions', back_populates='departments')
users = db.relationship('User', secondary='user_departments', back_populates='secondary_departments', lazy='dynamic')
def __repr__(self):
return f'<Department {self.name}>'
def set_detail(self, field_name, value):
field = DepartmentField.query.filter_by(name=field_name).first()
if not field:
field = DepartmentField(name=field_name)
db.session.add(field)
db.session.flush() # 这会给新创建的 field 分配一个 id
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
if detail:
detail.value = value
else:
detail = DepartmentDetail(department_id=self.id, field_id=field.id, value=value)
db.session.add(detail)
# 不要在这里提交,让调用者决定何时提交
# db.session.commit()
def get_detail(self, field_name):
field = DepartmentField.query.filter_by(name=field_name).first()
if field:
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
return detail.value if detail else None
return None
def get_all_children(self):
children = self.children.all()
all_children = list(children)
for child in children:
all_children.extend(child.get_all_children())
return all_children
def get_ancestors(self):
ancestors = []
parent = self.parent
while parent:
ancestors.append(parent)
parent = parent.parent
return ancestors[::-1] # 返回反转后的列表,使得最顶层的祖先在前
@classmethod
def get_root_departments(cls):
return cls.query.filter_by(parent_id=None).all()
@staticmethod
def init_default_departments():
default_departments = [
"信息技术",
"人事",
"财务",
"行政",
"采购",
"仓储",
"生产",
"物流",
"法务",
"市场",
"销售",
"客服"
]
for dept_name in default_departments:
if not Department.query.filter_by(name=dept_name).first():
new_dept = Department(name=dept_name)
db.session.add(new_dept)
db.session.commit()
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'department_code', 'description': '部门代码'},
{'name': 'manager', 'description': '部门经理'},
{'name': 'phone', 'description': '部门电话'},
{'name': 'email', 'description': '部门邮箱'},
{'name': 'location', 'description': '部门位置'},
]
for field in default_fields:
if not DepartmentField.query.filter_by(name=field['name']).first():
new_field = DepartmentField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
def get_effective_permissions(self):
"""获取部门的有效权限,包括自身的权限和父部门的权限"""
all_permissions = set(self.permissions)
parent = self.parent
while parent:
all_permissions.update(parent.permissions)
parent = parent.parent
return list(all_permissions)
# 新增关联表
department_permissions = db.Table('department_permissions',
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
# 部门字段表
class DepartmentField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<DepartmentField {self.name}>'
# 部门详情表
class DepartmentDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
department_id = db.Column(db.Integer, db.ForeignKey('department.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('department_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = db.relationship('DepartmentField', backref='details')
def __repr__(self):
return f'<DepartmentDetail {self.department_id} - {self.field.name}: {self.value}>'
# 角色字段表
class RoleField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<RoleField {self.name}>'
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'role_code', 'description': '角色代码'},
{'name': 'description', 'description': '角色描述'},
{'name': 'level', 'description': '角色级别'},
{'name': 'permissions', 'description': '角色权限'},
]
for field in default_fields:
if not RoleField.query.filter_by(name=field['name']).first():
new_field = RoleField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
# 角色详情表
class RoleDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey('role.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('role_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = relationship('RoleField', backref='details')
def __repr__(self):
return f'<RoleDetail {self.role_id} - {self.field.name}: {self.value}>'
# 权限字段表
class PermissionField(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<PermissionField {self.name}>'
@staticmethod
def init_default_fields():
default_fields = [
{'name': 'description', 'description': '权限描述'},
{'name': 'code', 'description': '权限代码'},
{'name': 'category', 'description': '权限类别'},
]
for field in default_fields:
if not PermissionField.query.filter_by(name=field['name']).first():
new_field = PermissionField(name=field['name'], description=field['description'])
db.session.add(new_field)
db.session.commit()
# 权限详情表
class PermissionDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
permission_id = db.Column(db.Integer, db.ForeignKey('permission.id'), nullable=False)
field_id = db.Column(db.Integer, db.ForeignKey('permission_field.id'), nullable=False)
value = db.Column(db.String(256))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
field = relationship('PermissionField', backref='details')
def __repr__(self):
return f'<PermissionDetail {self.permission_id} - {self.field.name}: {self.value}>'
# 新增关联表
user_departments = db.Table('user_departments',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True)
)
class RoleTemplate(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
description = db.Column(db.String(256))
permissions = db.relationship('Permission', secondary='role_template_permissions', back_populates='role_templates')
@classmethod
def create_template(cls, name, description, permissions):
template = cls(name=name, description=description)
template.permissions = permissions
db.session.add(template)
db.session.commit()
return template
role_template_permissions = db.Table('role_template_permissions',
db.Column('role_template_id', db.Integer, db.ForeignKey('role_template.id'), primary_key=True),
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
)
class UserPasswordHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
last_password_change = db.Column(db.DateTime, default=datetime.utcnow)
user = db.relationship('User', back_populates='password_history')
def is_password_expired(self):
# 检查密码是否已经超过三个月未更改
return datetime.utcnow() - self.last_password_change > timedelta(days=90)
def update_password_change(self):
self.last_password_change = datetime.utcnow()
db.session.commit()