feat: 添加文档管理系统前后端基础功能

- 新增后端FastAPI应用,包含用户管理、文档管理和操作日志功能
- 实现JWT认证机制,支持用户注册、登录、登出操作
- 添加数据库模型定义,包括用户表、文档表和操作日志表
- 实现文档的增删改查功能及权限控制
- 添加管理员功能,支持用户管理和全局操作日志查看
- 新增前端界面,实现完整的用户交互体验
- 配置环境变量示例和Git忽略规则
- 编写详细的README文档,包含安装和使用说明
This commit is contained in:
starsac
2026-01-11 15:56:55 +08:00
commit bb9e208381
12 changed files with 2411 additions and 0 deletions

399
main.py Normal file
View File

@@ -0,0 +1,399 @@
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm.session import Session
from typing import List, Optional
import uvicorn
import os
from database import engine, SessionLocal, Base
from models import User, Document, UserOperationLog
from schemas import (
UserCreate, UserLogin, UserResponse, UserUpdate,
DocumentCreate, DocumentResponse, DocumentUpdate,
UserOperationLogResponse
)
from auth import create_access_token, verify_token, get_password_hash, verify_password
# 创建数据库表
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="文档在线管理系统",
description="一个基于FastAPI的文档在线管理系统",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 依赖注入 - 获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 依赖注入 - 获取当前用户
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()),
db: Session = Depends(get_db)
):
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的token"
)
user_id = payload.get("sub")
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
return user
def create_operation_log(db: Session, user_id: int, operation: str, details: str = ""):
"""创建用户操作记录"""
log = UserOperationLog(
user_id=user_id,
operation=operation,
details=details
)
db.add(log)
db.commit()
db.refresh(log)
return log
# 用户认证相关路由
@app.post("/auth/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
# 检查用户名是否已存在
existing_user = db.query(User).filter(User.username == user.username).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名已存在"
)
# 检查邮箱是否已存在
existing_email = db.query(User).filter(User.email == user.email).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已被注册"
)
# 创建新用户
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
email=user.email,
hashed_password=hashed_password,
full_name=user.full_name,
is_admin=user.is_admin # 设置管理员权限
)
db.add(db_user)
db.commit()
db.refresh(db_user)
# 记录操作日志
create_operation_log(db, db_user.id, "用户注册", f"用户 {user.username} 注册成功")
return db_user
@app.post("/auth/login")
def login(user: UserLogin, db: Session = Depends(get_db)):
# 验证用户
db_user = db.query(User).filter(User.username == user.username).first()
if not db_user or not verify_password(user.password, db_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
# 生成token
access_token = create_access_token(data={"sub": str(db_user.id)})
# 记录操作日志
create_operation_log(db, db_user.id, "用户登录", "用户登录成功")
return {
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse.from_orm(db_user)
}
@app.post("/auth/logout")
def logout(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
# 记录操作日志
create_operation_log(db, current_user.id, "用户登出", "用户登出成功")
return {"message": "登出成功"}
# 用户管理路由
@app.get("/users/me", response_model=UserResponse)
def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@app.put("/users/me", response_model=UserResponse)
def update_user_me(
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
update_data = user_update.dict(exclude_unset=True)
# 如果更新密码,需要重新哈希
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
# 记录操作日志
create_operation_log(db, current_user.id, "更新用户信息", "用户更新个人信息")
return current_user
@app.get("/users", response_model=List[UserResponse])
def read_users(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 只有管理员可以查看所有用户
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限查看用户列表"
)
users = db.query(User).offset(skip).limit(limit).all()
# 记录操作日志
create_operation_log(db, current_user.id, "查看用户列表", "管理员查看所有用户")
return users
@app.delete("/users/{user_id}")
def delete_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 只有管理员可以删除用户
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限删除用户"
)
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="不能删除自己的账户"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
db.delete(user)
db.commit()
# 记录操作日志
create_operation_log(db, current_user.id, "删除用户", f"删除用户 {user.username}")
return {"message": "用户删除成功"}
# 文档管理路由
@app.post("/documents", response_model=DocumentResponse)
def create_document(
document: DocumentCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
db_document = Document(
**document.dict(),
owner_id=current_user.id
)
db.add(db_document)
db.commit()
db.refresh(db_document)
# 记录操作日志
create_operation_log(db, current_user.id, "创建文档", f"创建文档: {document.title}")
return db_document
@app.get("/documents", response_model=List[DocumentResponse])
def read_documents(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 管理员可以查看所有文档,普通用户只能查看自己的文档
if current_user.is_admin:
documents = db.query(Document).offset(skip).limit(limit).all()
else:
documents = db.query(Document).filter(
Document.owner_id == current_user.id
).offset(skip).limit(limit).all()
# 记录操作日志
log_action = "查看所有文档列表" if current_user.is_admin else "查看自己的文档"
create_operation_log(db, current_user.id, "查看文档列表", log_action)
return documents
@app.get("/documents/{document_id}", response_model=DocumentResponse)
def read_document(
document_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 管理员可以查看任何文档,普通用户只能查看自己的文档
if current_user.is_admin:
document = db.query(Document).filter(Document.id == document_id).first()
else:
document = db.query(Document).filter(
Document.id == document_id,
Document.owner_id == current_user.id
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文档不存在"
)
# 记录操作日志
log_action = f"管理员查看文档" if current_user.is_admin else f"查看文档: {document.title}"
create_operation_log(db, current_user.id, "查看文档", log_action)
return document
@app.put("/documents/{document_id}", response_model=DocumentResponse)
def update_document(
document_id: int,
document_update: DocumentUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 管理员可以编辑任何文档,普通用户只能编辑自己的文档
if current_user.is_admin:
document = db.query(Document).filter(Document.id == document_id).first()
else:
document = db.query(Document).filter(
Document.id == document_id,
Document.owner_id == current_user.id
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文档不存在"
)
update_data = document_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(document, field, value)
db.commit()
db.refresh(document)
# 记录操作日志
log_action = f"管理员更新文档" if current_user.is_admin else f"更新文档: {document.title}"
create_operation_log(db, current_user.id, "更新文档", log_action)
return document
@app.delete("/documents/{document_id}")
def delete_document(
document_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 管理员可以删除任何文档,普通用户只能删除自己的文档
if current_user.is_admin:
document = db.query(Document).filter(Document.id == document_id).first()
else:
document = db.query(Document).filter(
Document.id == document_id,
Document.owner_id == current_user.id
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文档不存在"
)
db.delete(document)
db.commit()
# 记录操作日志
log_action = f"管理员删除文档" if current_user.is_admin else f"删除文档: {document.title}"
create_operation_log(db, current_user.id, "删除文档", log_action)
return {"message": "文档删除成功"}
# 操作日志路由
@app.get("/logs/my", response_model=List[UserOperationLogResponse])
def read_my_logs(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
logs = db.query(UserOperationLog).filter(
UserOperationLog.user_id == current_user.id
).order_by(UserOperationLog.created_at.desc()).offset(skip).limit(limit).all()
return logs
@app.get("/logs", response_model=List[UserOperationLogResponse])
def read_all_logs(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 只有管理员可以查看所有日志
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限查看所有操作日志"
)
logs = db.query(UserOperationLog).order_by(
UserOperationLog.created_at.desc()
).offset(skip).limit(limit).all()
return logs
@app.get("/")
def read_root():
return {"message": "文档在线管理系统API"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)