553 lines
25 KiB
Python
553 lines
25 KiB
Python
from app import db, login_manager
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.orm import relationship
|
|
|
|
class UserField(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
description = db.Column(db.String(255))
|
|
is_required = db.Column(db.Boolean, default=True)
|
|
|
|
@staticmethod
|
|
def init_default_fields():
|
|
default_fields = [
|
|
{'name': 'first_name', 'description': '名', 'is_required': True},
|
|
{'name': 'last_name', 'description': '姓', 'is_required': True},
|
|
{'name': 'id_number', 'description': '身份证号码', 'is_required': True},
|
|
{'name': 'phone', 'description': '电话', 'is_required': True},
|
|
{'name': 'gender', 'description': '性别', 'is_required': True},
|
|
{'name': 'email', 'description': '邮箱', 'is_required': False},
|
|
{'name': 'photo', 'description': '照片', 'is_required': False},
|
|
]
|
|
for field in default_fields:
|
|
if not UserField.query.filter_by(name=field['name']).first():
|
|
new_field = UserField(name=field['name'], description=field['description'], is_required=field['is_required'])
|
|
db.session.add(new_field)
|
|
db.session.commit()
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(64), index=True, unique=True, nullable=False)
|
|
roles = db.relationship('Role', secondary='user_roles', back_populates='users')
|
|
details = db.relationship('UserDetail', back_populates='user', cascade='all, delete-orphan')
|
|
login_info = db.relationship('UserLoginInfo', uselist=False, back_populates='user', cascade='all, delete-orphan')
|
|
password = db.relationship('UserPassword', uselist=False, back_populates='user', cascade='all, delete-orphan')
|
|
primary_department_id = db.Column(db.Integer, db.ForeignKey('department.id'))
|
|
primary_department = db.relationship('Department', foreign_keys=[primary_department_id])
|
|
secondary_departments = db.relationship('Department', secondary='user_departments', back_populates='users', lazy='dynamic')
|
|
password_history = db.relationship('UserPasswordHistory', uselist=False, back_populates='user', cascade='all, delete-orphan')
|
|
|
|
def set_password(self, password):
|
|
if self.password:
|
|
self.password.set_password(password)
|
|
else:
|
|
self.password = UserPassword(user=self)
|
|
self.password.set_password(password)
|
|
|
|
# 更新密码修改历史
|
|
if self.password_history:
|
|
self.password_history.update_password_change()
|
|
else:
|
|
self.password_history = UserPasswordHistory(user=self)
|
|
|
|
def needs_password_change(self):
|
|
if self.login_info.is_new_user or not self.login_info.has_changed_initial_password:
|
|
return True
|
|
if not self.password_history:
|
|
return True
|
|
return self.password_history.is_password_expired()
|
|
|
|
def set_detail(self, field_name, value):
|
|
field = UserField.query.filter_by(name=field_name).first()
|
|
if not field:
|
|
return False
|
|
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
|
|
if detail:
|
|
detail.value = value
|
|
else:
|
|
detail = UserDetail(user_id=self.id, field_id=field.id, value=value)
|
|
db.session.add(detail)
|
|
db.session.commit()
|
|
return True
|
|
|
|
def get_detail(self, field_name):
|
|
field = UserField.query.filter_by(name=field_name).first()
|
|
if not field:
|
|
return None
|
|
detail = UserDetail.query.filter_by(user_id=self.id, field_id=field.id).first()
|
|
return detail.value if detail else None
|
|
|
|
@staticmethod
|
|
def create_user_field(name, description=None, is_required=True):
|
|
field = UserField(name=name, description=description, is_required=is_required)
|
|
db.session.add(field)
|
|
db.session.commit()
|
|
return field
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(User, self).__init__(*args, **kwargs)
|
|
self.login_info = UserLoginInfo(user=self)
|
|
|
|
@classmethod
|
|
def find_by_username_or_email(cls, login):
|
|
return cls.query.filter(
|
|
db.or_(
|
|
cls.username == login,
|
|
cls.details.any(
|
|
db.and_(
|
|
UserDetail.field.has(UserField.name == 'email'),
|
|
UserDetail.value == login
|
|
)
|
|
)
|
|
)
|
|
).first()
|
|
|
|
def get_all_permissions(self):
|
|
"""获取用户的所有权限,包括角色权限和部门权限"""
|
|
all_permissions = set()
|
|
for role in self.roles:
|
|
all_permissions.update(role.permissions)
|
|
all_permissions.update(self.primary_department.get_effective_permissions())
|
|
for dept in self.secondary_departments:
|
|
all_permissions.update(dept.get_effective_permissions())
|
|
return list(all_permissions)
|
|
|
|
def check_password(self, password):
|
|
if self.password:
|
|
return self.password.check_password(password)
|
|
return False
|
|
|
|
class UserDetail(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
field_id = db.Column(db.Integer, db.ForeignKey('user_field.id'), nullable=False)
|
|
value = db.Column(db.String(255))
|
|
user = db.relationship('User', back_populates='details')
|
|
field = db.relationship('UserField')
|
|
|
|
class Role(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
parent_id = db.Column(db.Integer, db.ForeignKey('role.id'))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
users = relationship('User', secondary='user_roles', back_populates='roles')
|
|
permissions = relationship('Permission', secondary='role_permissions', back_populates='roles')
|
|
details = relationship('RoleDetail', backref='role', lazy='dynamic')
|
|
parent = relationship('Role', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
|
|
|
|
def __repr__(self):
|
|
return f'<Role {self.name}>'
|
|
|
|
def set_detail(self, field_name, value):
|
|
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=RoleField.query.filter_by(name=field_name).first().id).first()
|
|
if detail:
|
|
detail.value = value
|
|
else:
|
|
field = RoleField.query.filter_by(name=field_name).first()
|
|
if field:
|
|
detail = RoleDetail(role_id=self.id, field_id=field.id, value=value)
|
|
db.session.add(detail)
|
|
db.session.commit()
|
|
|
|
def get_detail(self, field_name):
|
|
field = RoleField.query.filter_by(name=field_name).first()
|
|
if field:
|
|
detail = RoleDetail.query.filter_by(role_id=self.id, field_id=field.id).first()
|
|
return detail.value if detail else None
|
|
return None
|
|
|
|
@staticmethod
|
|
def init_default_roles():
|
|
default_roles = [
|
|
{"name": "Global Administrator", "description": "Has all system permissions and can manage all functions and data"},
|
|
{"name": "Global Readonly", "description": "All data in the system can be viewed but not modified."},
|
|
{"name": "Global NO Permissions", "description": "Basic user, no special permissions."},
|
|
{"name": "frontline staff", "description": "Front-level employees responsible for daily operations."},
|
|
{"name": "manager", "description": "Middle managers responsible for managing teams and departments."},
|
|
{"name": "director", "description": "Senior managers responsible for multiple departments or large projects."},
|
|
{"name": "CEO", "description": "The company's top management is responsible for overall strategy and decision-making."}
|
|
]
|
|
for role_data in default_roles:
|
|
existing_role = Role.query.filter_by(name=role_data['name']).first()
|
|
if not existing_role:
|
|
new_role = Role(name=role_data['name'])
|
|
db.session.add(new_role)
|
|
db.session.flush() # 获取新创建的角色ID
|
|
new_role.set_detail('description', role_data['description'])
|
|
else:
|
|
existing_role.set_detail('description', role_data['description'])
|
|
db.session.commit()
|
|
|
|
@classmethod
|
|
def create_from_template(cls, template_id, name=None):
|
|
template = RoleTemplate.query.get(template_id)
|
|
if not template:
|
|
raise ValueError("Template not found")
|
|
|
|
role = cls(name=name or template.name)
|
|
role.permissions = template.permissions.copy()
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
return role
|
|
|
|
class Permission(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
roles = relationship('Role', secondary='role_permissions', back_populates='permissions')
|
|
details = relationship('PermissionDetail', backref='permission', lazy='dynamic')
|
|
departments = db.relationship('Department', secondary='department_permissions', back_populates='permissions')
|
|
role_templates = db.relationship('RoleTemplate', secondary='role_template_permissions', back_populates='permissions')
|
|
|
|
def __repr__(self):
|
|
return f'<Permission {self.name}>'
|
|
|
|
def set_detail(self, field_name, value):
|
|
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=PermissionField.query.filter_by(name=field_name).first().id).first()
|
|
if detail:
|
|
detail.value = value
|
|
else:
|
|
field = PermissionField.query.filter_by(name=field_name).first()
|
|
if field:
|
|
detail = PermissionDetail(permission_id=self.id, field_id=field.id, value=value)
|
|
db.session.add(detail)
|
|
db.session.commit()
|
|
|
|
def get_detail(self, field_name):
|
|
field = PermissionField.query.filter_by(name=field_name).first()
|
|
if field:
|
|
detail = PermissionDetail.query.filter_by(permission_id=self.id, field_id=field.id).first()
|
|
return detail.value if detail else None
|
|
return None
|
|
|
|
@staticmethod
|
|
def init_default_permissions():
|
|
default_permissions = [
|
|
# 基本操作权限
|
|
{"name": "Create", "description": "Create a new record or resource", "code": "CREATE"},
|
|
{"name": "Delete", "description": "Delete an existing record or resource", "code": "DELETE"},
|
|
{"name": "Read", "description": "View or read a record or resource", "code": "READ"},
|
|
{"name": "Update", "description": "Modify an existing record or resource", "code": "UPDATE"},
|
|
{"name": "Approval", "description": "Review and approve actions or changes", "code": "APPROVE"},
|
|
|
|
# 系统级权限
|
|
{"name": "User Management", "description": "Manage system users", "code": "MANAGE_USERS"},
|
|
{"name": "Role Management", "description": "Management system roles", "code": "MANAGE_ROLES"},
|
|
{"name": "Department Management", "description": "Manage company departments", "code": "MANAGE_DEPARTMENTS"},
|
|
{"name": "Permission Management", "description": "Manage system permissions", "code": "MANAGE_PERMISSIONS"},
|
|
{"name": "Log View", "description": "View system log", "code": "VIEW_LOGS"},
|
|
{"name": "Data Export", "description": "Export system data", "code": "EXPORT_DATA"},
|
|
{"name": "System Setting", "description": "Change system setting", "code": "SYSTEM_SETTINGS"},
|
|
]
|
|
for perm_data in default_permissions:
|
|
existing_perm = Permission.query.filter_by(name=perm_data['name']).first()
|
|
if not existing_perm:
|
|
new_perm = Permission(name=perm_data['name'])
|
|
db.session.add(new_perm)
|
|
db.session.flush() # 获取新创建的权限ID
|
|
new_perm.set_detail('description', perm_data['description'])
|
|
new_perm.set_detail('code', perm_data['code'])
|
|
else:
|
|
existing_perm.set_detail('description', perm_data['description'])
|
|
existing_perm.set_detail('code', perm_data['code'])
|
|
db.session.commit()
|
|
|
|
user_roles = db.Table('user_roles',
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
|
|
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True)
|
|
)
|
|
|
|
role_permissions = db.Table('role_permissions',
|
|
db.Column('role_id', db.Integer, db.ForeignKey('role.id'), primary_key=True),
|
|
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
|
|
)
|
|
|
|
class UserLoginInfo(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
|
|
register_time = db.Column(db.DateTime, default=datetime.utcnow)
|
|
login_count = db.Column(db.Integer, default=0)
|
|
last_login_time = db.Column(db.DateTime)
|
|
is_new_user = db.Column(db.Boolean, default=True)
|
|
has_changed_initial_password = db.Column(db.Boolean, default=False)
|
|
user = db.relationship('User', back_populates='login_info')
|
|
|
|
def update_login_info(self):
|
|
self.login_count += 1
|
|
self.last_login_time = datetime.utcnow()
|
|
self.is_new_user = False
|
|
db.session.commit()
|
|
|
|
class UserPassword(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(255)) # 将长度从 128 增加到 255
|
|
user = db.relationship('User', back_populates='password')
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
# 部门表
|
|
class Department(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), index=True, unique=True, nullable=False)
|
|
parent_id = db.Column(db.Integer, db.ForeignKey('department.id'))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
details = db.relationship('DepartmentDetail', backref='department', lazy='dynamic')
|
|
parent = db.relationship('Department', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
|
|
permissions = db.relationship('Permission', secondary='department_permissions', back_populates='departments')
|
|
users = db.relationship('User', secondary='user_departments', back_populates='secondary_departments', lazy='dynamic')
|
|
|
|
def __repr__(self):
|
|
return f'<Department {self.name}>'
|
|
|
|
def set_detail(self, field_name, value):
|
|
field = DepartmentField.query.filter_by(name=field_name).first()
|
|
if not field:
|
|
field = DepartmentField(name=field_name)
|
|
db.session.add(field)
|
|
db.session.flush() # 这会给新创建的 field 分配一个 id
|
|
|
|
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
|
|
if detail:
|
|
detail.value = value
|
|
else:
|
|
detail = DepartmentDetail(department_id=self.id, field_id=field.id, value=value)
|
|
db.session.add(detail)
|
|
|
|
# 不要在这里提交,让调用者决定何时提交
|
|
# db.session.commit()
|
|
|
|
def get_detail(self, field_name):
|
|
field = DepartmentField.query.filter_by(name=field_name).first()
|
|
if field:
|
|
detail = DepartmentDetail.query.filter_by(department_id=self.id, field_id=field.id).first()
|
|
return detail.value if detail else None
|
|
return None
|
|
|
|
def get_all_children(self):
|
|
children = self.children.all()
|
|
all_children = list(children)
|
|
for child in children:
|
|
all_children.extend(child.get_all_children())
|
|
return all_children
|
|
|
|
def get_ancestors(self):
|
|
ancestors = []
|
|
parent = self.parent
|
|
while parent:
|
|
ancestors.append(parent)
|
|
parent = parent.parent
|
|
return ancestors[::-1] # 返回反转后的列表,使得最顶层的祖先在前
|
|
|
|
@classmethod
|
|
def get_root_departments(cls):
|
|
return cls.query.filter_by(parent_id=None).all()
|
|
|
|
@staticmethod
|
|
def init_default_departments():
|
|
default_departments = [
|
|
"信息技术",
|
|
"人事",
|
|
"财务",
|
|
"行政",
|
|
"采购",
|
|
"仓储",
|
|
"生产",
|
|
"物流",
|
|
"法务",
|
|
"市场",
|
|
"销售",
|
|
"客服"
|
|
]
|
|
for dept_name in default_departments:
|
|
if not Department.query.filter_by(name=dept_name).first():
|
|
new_dept = Department(name=dept_name)
|
|
db.session.add(new_dept)
|
|
db.session.commit()
|
|
|
|
@staticmethod
|
|
def init_default_fields():
|
|
default_fields = [
|
|
{'name': 'department_code', 'description': '部门代码'},
|
|
{'name': 'manager', 'description': '部门经理'},
|
|
{'name': 'phone', 'description': '部门电话'},
|
|
{'name': 'email', 'description': '部门邮箱'},
|
|
{'name': 'location', 'description': '部门位置'},
|
|
]
|
|
for field in default_fields:
|
|
if not DepartmentField.query.filter_by(name=field['name']).first():
|
|
new_field = DepartmentField(name=field['name'], description=field['description'])
|
|
db.session.add(new_field)
|
|
db.session.commit()
|
|
|
|
def get_effective_permissions(self):
|
|
"""获取部门的有效权限,包括自身的权限和父部门的权限"""
|
|
all_permissions = set(self.permissions)
|
|
parent = self.parent
|
|
while parent:
|
|
all_permissions.update(parent.permissions)
|
|
parent = parent.parent
|
|
return list(all_permissions)
|
|
|
|
# 新增关联表
|
|
department_permissions = db.Table('department_permissions',
|
|
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True),
|
|
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
|
|
)
|
|
|
|
# 部门字段表
|
|
class DepartmentField(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
description = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
def __repr__(self):
|
|
return f'<DepartmentField {self.name}>'
|
|
|
|
# 部门详情表
|
|
class DepartmentDetail(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
department_id = db.Column(db.Integer, db.ForeignKey('department.id'), nullable=False)
|
|
field_id = db.Column(db.Integer, db.ForeignKey('department_field.id'), nullable=False)
|
|
value = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
field = db.relationship('DepartmentField', backref='details')
|
|
|
|
def __repr__(self):
|
|
return f'<DepartmentDetail {self.department_id} - {self.field.name}: {self.value}>'
|
|
|
|
# 角色字段表
|
|
class RoleField(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
description = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
def __repr__(self):
|
|
return f'<RoleField {self.name}>'
|
|
|
|
@staticmethod
|
|
def init_default_fields():
|
|
default_fields = [
|
|
{'name': 'role_code', 'description': '角色代码'},
|
|
{'name': 'description', 'description': '角色描述'},
|
|
{'name': 'level', 'description': '角色级别'},
|
|
{'name': 'permissions', 'description': '角色权限'},
|
|
]
|
|
for field in default_fields:
|
|
if not RoleField.query.filter_by(name=field['name']).first():
|
|
new_field = RoleField(name=field['name'], description=field['description'])
|
|
db.session.add(new_field)
|
|
db.session.commit()
|
|
|
|
# 角色详情表
|
|
class RoleDetail(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
role_id = db.Column(db.Integer, db.ForeignKey('role.id'), nullable=False)
|
|
field_id = db.Column(db.Integer, db.ForeignKey('role_field.id'), nullable=False)
|
|
value = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
field = relationship('RoleField', backref='details')
|
|
|
|
def __repr__(self):
|
|
return f'<RoleDetail {self.role_id} - {self.field.name}: {self.value}>'
|
|
|
|
# 权限字段表
|
|
class PermissionField(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
description = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
def __repr__(self):
|
|
return f'<PermissionField {self.name}>'
|
|
|
|
@staticmethod
|
|
def init_default_fields():
|
|
default_fields = [
|
|
{'name': 'description', 'description': '权限描述'},
|
|
{'name': 'code', 'description': '权限代码'},
|
|
{'name': 'category', 'description': '权限类别'},
|
|
]
|
|
for field in default_fields:
|
|
if not PermissionField.query.filter_by(name=field['name']).first():
|
|
new_field = PermissionField(name=field['name'], description=field['description'])
|
|
db.session.add(new_field)
|
|
db.session.commit()
|
|
|
|
# 权限详情表
|
|
class PermissionDetail(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
permission_id = db.Column(db.Integer, db.ForeignKey('permission.id'), nullable=False)
|
|
field_id = db.Column(db.Integer, db.ForeignKey('permission_field.id'), nullable=False)
|
|
value = db.Column(db.String(256))
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
field = relationship('PermissionField', backref='details')
|
|
|
|
def __repr__(self):
|
|
return f'<PermissionDetail {self.permission_id} - {self.field.name}: {self.value}>'
|
|
|
|
# 新增关联表
|
|
user_departments = db.Table('user_departments',
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
|
|
db.Column('department_id', db.Integer, db.ForeignKey('department.id'), primary_key=True)
|
|
)
|
|
|
|
class RoleTemplate(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False)
|
|
description = db.Column(db.String(256))
|
|
permissions = db.relationship('Permission', secondary='role_template_permissions', back_populates='role_templates')
|
|
|
|
@classmethod
|
|
def create_template(cls, name, description, permissions):
|
|
template = cls(name=name, description=description)
|
|
template.permissions = permissions
|
|
db.session.add(template)
|
|
db.session.commit()
|
|
return template
|
|
|
|
role_template_permissions = db.Table('role_template_permissions',
|
|
db.Column('role_template_id', db.Integer, db.ForeignKey('role_template.id'), primary_key=True),
|
|
db.Column('permission_id', db.Integer, db.ForeignKey('permission.id'), primary_key=True)
|
|
)
|
|
|
|
class UserPasswordHistory(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
|
|
last_password_change = db.Column(db.DateTime, default=datetime.utcnow)
|
|
user = db.relationship('User', back_populates='password_history')
|
|
|
|
def is_password_expired(self):
|
|
# 检查密码是否已经超过三个月未更改
|
|
return datetime.utcnow() - self.last_password_change > timedelta(days=90)
|
|
|
|
def update_password_change(self):
|
|
self.last_password_change = datetime.utcnow()
|
|
db.session.commit() |