from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm.session import Session from typing import List, Optional import uvicorn import os from database import engine, SessionLocal, Base from models import User, Document, UserOperationLog from schemas import ( UserCreate, UserLogin, UserResponse, UserUpdate, DocumentCreate, DocumentResponse, DocumentUpdate, UserOperationLogResponse ) from auth import create_access_token, verify_token, get_password_hash, verify_password # 创建数据库表 Base.metadata.create_all(bind=engine) app = FastAPI( title="文档在线管理系统", description="一个基于FastAPI的文档在线管理系统", version="1.0.0" ) # 配置CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 依赖注入 - 获取数据库会话 def get_db(): db = SessionLocal() try: yield db finally: db.close() # 依赖注入 - 获取当前用户 def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()), db: Session = Depends(get_db) ): token = credentials.credentials payload = verify_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的token" ) user_id = payload.get("sub") user = db.query(User).filter(User.id == user_id).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在" ) return user def create_operation_log(db: Session, user_id: int, operation: str, details: str = ""): """创建用户操作记录""" log = UserOperationLog( user_id=user_id, operation=operation, details=details ) db.add(log) db.commit() db.refresh(log) return log # 用户认证相关路由 @app.post("/auth/register", response_model=UserResponse) def register(user: UserCreate, db: Session = Depends(get_db)): # 检查用户名是否已存在 existing_user = db.query(User).filter(User.username == user.username).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在" ) # 检查邮箱是否已存在 existing_email = db.query(User).filter(User.email == user.email).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="邮箱已被注册" ) # 创建新用户 hashed_password = get_password_hash(user.password) db_user = User( username=user.username, email=user.email, hashed_password=hashed_password, full_name=user.full_name, is_admin=user.is_admin # 设置管理员权限 ) db.add(db_user) db.commit() db.refresh(db_user) # 记录操作日志 create_operation_log(db, db_user.id, "用户注册", f"用户 {user.username} 注册成功") return db_user @app.post("/auth/login") def login(user: UserLogin, db: Session = Depends(get_db)): # 验证用户 db_user = db.query(User).filter(User.username == user.username).first() if not db_user or not verify_password(user.password, db_user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误" ) # 生成token access_token = create_access_token(data={"sub": str(db_user.id)}) # 记录操作日志 create_operation_log(db, db_user.id, "用户登录", "用户登录成功") return { "access_token": access_token, "token_type": "bearer", "user": UserResponse.from_orm(db_user) } @app.post("/auth/logout") def logout(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): # 记录操作日志 create_operation_log(db, current_user.id, "用户登出", "用户登出成功") return {"message": "登出成功"} # 用户管理路由 @app.get("/users/me", response_model=UserResponse) def read_users_me(current_user: User = Depends(get_current_user)): return current_user @app.put("/users/me", response_model=UserResponse) def update_user_me( user_update: UserUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): update_data = user_update.dict(exclude_unset=True) # 如果更新密码,需要重新哈希 if "password" in update_data: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(current_user, field, value) db.commit() db.refresh(current_user) # 记录操作日志 create_operation_log(db, current_user.id, "更新用户信息", "用户更新个人信息") return current_user @app.get("/users", response_model=List[UserResponse]) def read_users( skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 只有管理员可以查看所有用户 if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="没有权限查看用户列表" ) users = db.query(User).offset(skip).limit(limit).all() # 记录操作日志 create_operation_log(db, current_user.id, "查看用户列表", "管理员查看所有用户") return users @app.delete("/users/{user_id}") def delete_user( user_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 只有管理员可以删除用户 if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="没有权限删除用户" ) if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="不能删除自己的账户" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) db.delete(user) db.commit() # 记录操作日志 create_operation_log(db, current_user.id, "删除用户", f"删除用户 {user.username}") return {"message": "用户删除成功"} # 文档管理路由 @app.post("/documents", response_model=DocumentResponse) def create_document( document: DocumentCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): db_document = Document( **document.dict(), owner_id=current_user.id ) db.add(db_document) db.commit() db.refresh(db_document) # 记录操作日志 create_operation_log(db, current_user.id, "创建文档", f"创建文档: {document.title}") return db_document @app.get("/documents", response_model=List[DocumentResponse]) def read_documents( skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 管理员可以查看所有文档,普通用户只能查看自己的文档 if current_user.is_admin: documents = db.query(Document).offset(skip).limit(limit).all() else: documents = db.query(Document).filter( Document.owner_id == current_user.id ).offset(skip).limit(limit).all() # 记录操作日志 log_action = "查看所有文档列表" if current_user.is_admin else "查看自己的文档" create_operation_log(db, current_user.id, "查看文档列表", log_action) return documents @app.get("/documents/{document_id}", response_model=DocumentResponse) def read_document( document_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 管理员可以查看任何文档,普通用户只能查看自己的文档 if current_user.is_admin: document = db.query(Document).filter(Document.id == document_id).first() else: document = db.query(Document).filter( Document.id == document_id, Document.owner_id == current_user.id ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="文档不存在" ) # 记录操作日志 log_action = f"管理员查看文档" if current_user.is_admin else f"查看文档: {document.title}" create_operation_log(db, current_user.id, "查看文档", log_action) return document @app.put("/documents/{document_id}", response_model=DocumentResponse) def update_document( document_id: int, document_update: DocumentUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 管理员可以编辑任何文档,普通用户只能编辑自己的文档 if current_user.is_admin: document = db.query(Document).filter(Document.id == document_id).first() else: document = db.query(Document).filter( Document.id == document_id, Document.owner_id == current_user.id ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="文档不存在" ) update_data = document_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(document, field, value) db.commit() db.refresh(document) # 记录操作日志 log_action = f"管理员更新文档" if current_user.is_admin else f"更新文档: {document.title}" create_operation_log(db, current_user.id, "更新文档", log_action) return document @app.delete("/documents/{document_id}") def delete_document( document_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 管理员可以删除任何文档,普通用户只能删除自己的文档 if current_user.is_admin: document = db.query(Document).filter(Document.id == document_id).first() else: document = db.query(Document).filter( Document.id == document_id, Document.owner_id == current_user.id ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="文档不存在" ) db.delete(document) db.commit() # 记录操作日志 log_action = f"管理员删除文档" if current_user.is_admin else f"删除文档: {document.title}" create_operation_log(db, current_user.id, "删除文档", log_action) return {"message": "文档删除成功"} # 操作日志路由 @app.get("/logs/my", response_model=List[UserOperationLogResponse]) def read_my_logs( skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): logs = db.query(UserOperationLog).filter( UserOperationLog.user_id == current_user.id ).order_by(UserOperationLog.created_at.desc()).offset(skip).limit(limit).all() return logs @app.get("/logs", response_model=List[UserOperationLogResponse]) def read_all_logs( skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 只有管理员可以查看所有日志 if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="没有权限查看所有操作日志" ) logs = db.query(UserOperationLog).order_by( UserOperationLog.created_at.desc() ).offset(skip).limit(limit).all() return logs @app.get("/") def read_root(): return {"message": "文档在线管理系统API"} if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)