上传文件至 APP

This commit is contained in:
wangwei 2024-10-14 14:12:37 +08:00
parent 706f6f5776
commit b52da71197
5 changed files with 710 additions and 0 deletions

93
APP/__init__.py Normal file
View File

@ -0,0 +1,93 @@
from flask import Flask, current_app
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from config import Config
import logging
from logging.handlers import RotatingFileHandler
import os
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
jwt = JWTManager()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
jwt.init_app(app)
# 设置日志记录器
if not app.debug:
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler('logs/app.log', maxBytes=10240, backupCount=10, delay=True)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Application startup')
from app import models
from app.models import RoleTemplate, Permission
def init_database():
db.create_all()
models.UserField.init_default_fields()
models.Department.init_default_departments()
models.Department.init_default_fields()
models.RoleField.init_default_fields()
models.Role.init_default_roles()
models.PermissionField.init_default_fields()
models.Permission.init_default_permissions()
def init_role_templates():
templates = [
{
'name': '管理员',
'description': '系统管理员,拥有所有权限',
'permissions': ['用户管理', '角色管理', '部门管理', '权限管理']
},
{
'name': '普通用户',
'description': '普通用户,拥有基本权限',
'permissions': ['查看个人信息', '修改个人信息']
},
]
for template in templates:
if not RoleTemplate.query.filter_by(name=template['name']).first():
permissions = [Permission.query.filter_by(name=p).first() for p in template['permissions']]
RoleTemplate.create_template(
name=template['name'],
description=template['description'],
permissions=[p for p in permissions if p]
)
with app.app_context():
init_database()
init_role_templates()
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp)
from app.departments import bp as departments_bp
app.register_blueprint(departments_bp, url_prefix='/departments')
from app.roles import bp as roles_bp
app.register_blueprint(roles_bp, url_prefix='/roles')
from app.permissions import bp as permissions_bp
app.register_blueprint(permissions_bp, url_prefix='/permissions')
from app.users import bp as users_bp
app.register_blueprint(users_bp, url_prefix='/users')
return app

356
APP/auth.py Normal file
View File

@ -0,0 +1,356 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from flask_login import login_user, logout_user, login_required, current_user
from app import db
from app.models import User, UserField, UserLoginInfo, UserPassword, UserDetail, UserPasswordHistory
from urllib.parse import urlparse # 替换 werkzeug.urls import
import traceback
import random
import string
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity, \
verify_jwt_in_request
from functools import wraps
from datetime import datetime, timedelta
bp = Blueprint('auth', __name__)
@bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@bp.route('/register', methods=['POST'])
def register():
current_app.logger.info('Received registration request')
data = request.get_json()
current_app.logger.info(f'Registration data: {data}')
email = data.get('email')
if email:
existing_user = User.find_by_username_or_email(email)
if existing_user:
return jsonify({'message': '该邮箱已被注册'}), 400
required_fields = ['first_name', 'last_name', 'id_number', 'phone', 'gender']
missing_fields = [field for field in required_fields if field not in data or not data[field]]
if missing_fields:
current_app.logger.warning(f'Registration failed: Missing required fields: {", ".join(missing_fields)}')
return jsonify({'message': f'以下字段是必填的: {", ".join(missing_fields)}'}), 400
try:
# 生成用户名和密码
username = generate_username(data['first_name'], data['last_name'])
password = generate_password()
current_app.logger.info(f'Generated username: {username}')
user = User(username=username)
user.set_password(password)
db.session.add(user)
# 设置用户详细信息
for field in UserField.query.all():
if field.name in data:
user.set_detail(field.name, data[field.name])
current_app.logger.info(f'Set user detail: {field.name} = {data[field.name]}')
# 设置新用户标记
user.login_info.is_new_user = True
user.login_info.has_changed_initial_password = False
db.session.commit()
current_app.logger.info(f'User {username} registered successfully')
return jsonify({
'message': '注册成功',
'username': username,
'password': password
}), 201
except Exception as e:
current_app.logger.error(f'Registration error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': f'注册失败,错误: {str(e)}'}), 500
def generate_username(first_name, last_name):
base_username = f"{first_name.lower()} {last_name.lower()}"
username = base_username
counter = 1
while User.query.filter_by(username=username).first():
username = f"{base_username} {counter}"
counter += 1
return username
def generate_password():
characters = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(characters) for i in range(16))
@bp.route('/login', methods=['POST'])
def login_api():
current_app.logger.info('Received login request')
data = request.get_json()
if not data:
current_app.logger.warning('Invalid request data')
return jsonify({'message': '无效的请求数据'}), 400
username_or_email = data.get('username')
password = data.get('password')
current_app.logger.info(f'Login attempt for user: {username_or_email}')
if not username_or_email or not password:
current_app.logger.warning('Missing username/email or password')
return jsonify({'message': '缺少用户名/邮箱或密码'}), 400
try:
user = User.find_by_username_or_email(username_or_email)
if user is None:
current_app.logger.warning(f'User not found: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
if not user.check_password(password):
current_app.logger.warning(f'Invalid password for user: {username_or_email}')
return jsonify({'message': '无效的用户名/邮箱或密码'}), 401
current_app.logger.info(f'User authenticated: {username_or_email}')
current_app.logger.info(f'Is new user: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password: {user.login_info.has_changed_initial_password}')
# 检查是否是新用户或未更改初始密码
if user.login_info.is_new_user or not user.login_info.has_changed_initial_password:
current_app.logger.info(f'New user or initial password not changed: {username_or_email}')
return jsonify({
'message': '首次登录,请修改密码',
'require_password_change': True
}), 403
# 检查密码是否过期
if user.needs_password_change():
current_app.logger.info(f'Password expired for user: {username_or_email}')
return jsonify({
'message': '密码已过期,请修改密码',
'require_password_change': True
}), 403
login_user(user)
user.login_info.update_login_info()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'User {user.username} logged in successfully')
# 获取用户详细信息
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()], # 添加这行
'details': user_details,
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify({
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user_info': user_info
}), 200
except Exception as e:
current_app.logger.error(f'Login error: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '登录失败,请稍后再试'}), 500
@bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
access_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': access_token}), 200
@bp.route('/logout', methods=['POST'])
@login_required
def logout_api():
current_app.logger.info(f'User {current_user.username} logged out')
logout_user()
return jsonify({'message': '登出成功'}), 200
@bp.route('/profile', methods=['GET'])
@login_required
def profile():
current_app.logger.info(f'Profile requested for user {current_user.username}')
try:
profile_data = {
'username': current_user.username,
'login_info': {
'register_time': current_user.login_info.register_time.isoformat(),
'login_count': current_user.login_info.login_count,
'last_login_time': current_user.login_info.last_login_time.isoformat() if current_user.login_info.last_login_time else None,
'is_new_user': current_user.login_info.is_new_user
}
}
current_app.logger.info(f'Profile data retrieved for user {current_user.username}')
return jsonify(profile_data)
except Exception as e:
current_app.logger.error(f'Profile retrieval error for user {current_user.username}: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户资料失败,请稍后再试'}), 500
@bp.route('/change_password', methods=['POST'])
@jwt_required()
def change_password():
current_app.logger.info('Received change password request')
current_user_id = get_jwt_identity()
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
try:
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
current_app.logger.info(f'Password changed successfully for user {user.username}')
return jsonify({'message': '密码修改成功'}), 200
except Exception as e:
current_app.logger.error(f'Password change error: {str(e)}')
current_app.logger.error(traceback.format_exc())
db.session.rollback()
return jsonify({'message': '密码修改失败,请稍后再试'}), 500
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
# 验证 JWT token
verify_jwt_in_request()
# 获取当前用户的身份通常是用户ID
current_user_id = get_jwt_identity()
# 从数据库中获取用户对象
current_user = User.query.get(current_user_id)
if not current_user:
return jsonify({'message': '找不到用户'}), 404
except Exception as e:
return jsonify({'message': '无效的令牌'}), 401
# 将当前用户对象传递给被装饰的函数
return f(current_user, *args, **kwargs)
return decorated
@bp.route('/change_initial_password', methods=['POST'])
def change_initial_password():
data = request.get_json()
if not data:
return jsonify({'message': '缺少请求数据'}), 400
username_or_email = data.get('username_or_email') or data.get('username')
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([username_or_email, old_password, new_password]):
return jsonify({'message': '缺少必要的字段'}), 400
user = User.find_by_username_or_email(username_or_email)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not user.check_password(old_password):
return jsonify({'message': '旧密码不正确'}), 401
if user.login_info.has_changed_initial_password:
return jsonify({'message': '初始密码已经被修改过'}), 400
user.set_password(new_password)
user.login_info.has_changed_initial_password = True
user.login_info.is_new_user = False
# 更新密码修改历史
if user.password_history:
user.password_history.update_password_change()
else:
user.password_history = UserPasswordHistory(user=user)
db.session.commit()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
current_app.logger.info(f'Initial password changed for user: {user.username}')
current_app.logger.info(f'New user status: {user.login_info.is_new_user}')
current_app.logger.info(f'Has changed initial password status: {user.login_info.has_changed_initial_password}')
return jsonify({
'message': '密码修改成功,现在可以登录',
'access_token': access_token,
'refresh_token': refresh_token
}), 200
@bp.route('/current_user', methods=['GET'])
@jwt_required()
def get_current_user():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
try:
user_fields = UserField.query.all()
user_details = {}
for field in user_fields:
detail = UserDetail.query.filter_by(user_id=user.id, field_id=field.id).first()
user_details[field.name] = detail.value if detail else None
user_info = {
'id': user.id,
'username': user.username,
'details': user_details,
'primary_department': user.primary_department.name if user.primary_department else None,
'secondary_departments': [dept.name for dept in user.secondary_departments],
'roles': [role.name for role in user.roles],
'permissions': [perm.name for perm in user.get_all_permissions()],
'login_info': {
'register_time': user.login_info.register_time.isoformat(),
'login_count': user.login_info.login_count,
'last_login_time': user.login_info.last_login_time.isoformat() if user.login_info.last_login_time else None,
'is_new_user': user.login_info.is_new_user,
'has_changed_initial_password': user.login_info.has_changed_initial_password
}
}
return jsonify(user_info), 200
except Exception as e:
current_app.logger.error(f'Error retrieving current user info: {str(e)}')
current_app.logger.error(traceback.format_exc())
return jsonify({'message': '获取用户信息失败,请稍后再试'}), 500

95
APP/authorization.py Normal file
View File

@ -0,0 +1,95 @@
from functools import wraps
from flask import jsonify, request, current_app
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from app.models import User, Permission
from .extensions import db
class Authorization:
@staticmethod
def check_permission(required_permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from .models import User # 在函数内部导入以避免循环导入
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if required_permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_role(required_role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_roles = [role.name for role in user.roles]
if required_role not in user_roles:
return jsonify({"message": "Role not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def check_department(required_department):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_departments = [user.primary_department.name] + [dept.name for dept in user.secondary_departments]
if required_department not in user_departments:
return jsonify({"message": "Department not authorized"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def api_permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
verify_jwt_in_request()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({"message": "User not found"}), 404
user_permissions = user.get_all_permissions()
if permission not in user_permissions:
return jsonify({"message": "Permission denied"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def init_permissions():
from .models import Permission # 在函数内部导入以避免循环导入
# 这个函数可以在应用启动时调用,用于初始化或更新权限
# 可以从配置文件、数据库或其他来源读取权限定义
permissions = [
"create_user", "edit_user", "delete_user",
"create_role", "edit_role", "delete_role",
"create_department", "edit_department", "delete_department",
# ... 其他权限 ...
]
for perm_name in permissions:
perm = Permission.query.filter_by(name=perm_name).first()
if not perm:
new_perm = Permission(name=perm_name)
db.session.add(new_perm)
db.session.commit()
# 可以添加更多的辅助函数,比如检查复杂的权限组合等

16
APP/decorators.py Normal file
View File

@ -0,0 +1,16 @@
from functools import wraps
from flask import abort
from flask_login import current_user
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
abort(403)
user_permissions = current_user.get_all_permissions()
if permission not in [p.name for p in user_permissions]:
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator

150
APP/departments.py Normal file
View File

@ -0,0 +1,150 @@
from flask import Blueprint, request, jsonify
from app.models import Department, User, db
from sqlalchemy.exc import IntegrityError
bp = Blueprint('departments', __name__)
@bp.route('', methods=['POST'])
def create_department():
data = request.get_json()
if not data or 'name' not in data:
return jsonify({'message': '部门名称是必需的'}), 400
name = data['name']
# 检查部门名称是否已存在
if Department.query.filter_by(name=name).first():
return jsonify({'message': '部门名称已存在'}), 400
new_department = Department(name=name)
db.session.add(new_department)
db.session.flush() # 这会给 new_department 分配一个 id
# 如果提供了其他字段,设置它们
for field in ['department_code', 'manager', 'phone', 'email', 'location']:
if field in data:
new_department.set_detail(field, data[field])
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return jsonify({'message': '创建部门失败,可能是由于数据完整性问题'}), 500
return jsonify({'message': '部门创建成功', 'id': new_department.id}), 201
@bp.route('/<int:parent_id>/children', methods=['POST'])
def create_child_department(parent_id):
parent_department = Department.query.get(parent_id)
if not parent_department:
return jsonify({'message': '父部门不存在'}), 404
data = request.get_json()
if not data or 'name' not in data:
return jsonify({'message': '子部门名称是必需的'}), 400
name = data['name']
# 检查部门名称是否已存在
if Department.query.filter_by(name=name).first():
return jsonify({'message': '部门名称已存在'}), 400
new_department = Department(name=name, parent_id=parent_id)
db.session.add(new_department)
db.session.flush() # 这会给 new_department 分配一个 id
# 如果提供了其他字段,设置它们
for field in ['department_code', 'manager', 'phone', 'email', 'location']:
if field in data:
new_department.set_detail(field, data[field])
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return jsonify({'message': '创建子部门失败,可能是由于数据完整性问题'}), 500
return jsonify({'message': '子部门创建成功', 'id': new_department.id}), 201
@bp.route('/<int:department_id>/add_user', methods=['POST'])
def add_user_to_department(department_id):
data = request.get_json()
if not data or 'user_id' not in data:
return jsonify({'message': '用户ID是必需的'}), 400
user_id = data['user_id']
department = Department.query.get(department_id)
if not department:
return jsonify({'message': '部门不存在'}), 404
user = User.query.get(user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if 'is_primary' in data and data['is_primary']:
# 如果是主要部门,直接设置
user.primary_department = department
else:
# 如果不是主要部门,添加到次要部门
if department not in user.secondary_departments:
user.secondary_departments.append(department)
try:
db.session.commit()
return jsonify({'message': '用户成功添加到部门'}), 200
except IntegrityError:
db.session.rollback()
return jsonify({'message': '添加用户到部门失败,可能是由于数据完整性问题'}), 500
@bp.route('', methods=['GET'])
def get_departments():
departments = Department.query.all()
departments_list = [{
'id': dept.id,
'name': dept.name,
'parent_id': dept.parent_id,
'created_at': dept.created_at.isoformat() if dept.created_at else None,
'updated_at': dept.updated_at.isoformat() if dept.updated_at else None
} for dept in departments]
return jsonify(departments_list), 200
@bp.route('/<int:department_id>', methods=['GET'])
def get_department_details(department_id):
department = Department.query.get(department_id)
if not department:
return jsonify({'message': '部门不存在'}), 404
# 获取主要部门的用户
primary_users = User.query.filter_by(primary_department_id=department.id).all()
# 获取次要部门的用户
secondary_users = department.users.all()
# 合并用户列表并去重
all_users = list(set(primary_users + secondary_users))
users = [{'id': u.id, 'username': u.username} for u in all_users]
children = [{'id': c.id, 'name': c.name} for c in department.children]
department_details = {
'id': department.id,
'name': department.name,
'parent_id': department.parent_id,
'created_at': department.created_at.isoformat() if department.created_at else None,
'updated_at': department.updated_at.isoformat() if department.updated_at else None,
'department_code': department.get_detail('department_code'),
'manager': department.get_detail('manager'),
'phone': department.get_detail('phone'),
'email': department.get_detail('email'),
'location': department.get_detail('location'),
'users': users,
'children': children
}
return jsonify(department_details), 200
# 你可以在这里添加更多的部门相关路由...