清理项目配置文件,移除不再使用的.idea文件和更新文档中的Python版本要求

This commit is contained in:
2025-12-17 16:15:47 +08:00
parent 8ce34440d8
commit 35efbdf000
27 changed files with 123 additions and 221 deletions

View File

@@ -1,19 +1,18 @@
from typing import Annotated, Literal
from typing import Annotated
from fastapi import Depends
from fastapi import HTTPException
import JWT
import jwt
from jwt import InvalidTokenError
from model import database
from sqlmodel.ext.asyncio.session import AsyncSession
from model import User
from model.user import UserTypeEnum
from .user import get_current_user
from pkg import utils
from model import User
from model import database
# 验证是否为管理员
async def is_admin(
token: Annotated[str, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(database.Database.get_session)],
) -> Literal[True]:
) -> User:
'''
验证是否为管理员。
@@ -21,14 +20,25 @@ async def is_admin(
>>> APIRouter(dependencies=[Depends(is_admin)])
'''
not_admin_exception = HTTPException(
status_code=403,
detail="Admin access required",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_current_user(token, session)
if user.role == UserTypeEnum.normal_user:
utils.raise_forbidden("Admin access required")
else:
return user
async def is_super_admin(
token: Annotated[str, Depends(is_admin)],
session: Annotated[AsyncSession, Depends(database.Database.get_session)],
) -> User:
'''
验证是否为超级管理员。
使用方法:
>>> APIRouter(dependencies=[Depends(is_super_admin)])
'''
user = await get_current_user(token, session)
if not user.is_admin:
raise not_admin_exception
if user.role != UserTypeEnum.super_admin:
utils.raise_forbidden("Super admin access required")
else:
return True
return user

View File

@@ -2,16 +2,14 @@ from typing import Annotated
import jwt
from fastapi import Depends
from fastapi import HTTPException
from jwt import InvalidTokenError
from sqlmodel.ext.asyncio.session import AsyncSession
import JWT
from model import User
from model.database import Database
from pkg import utils
# 验证是否为管理员
async def get_current_user(
token: Annotated[str, Depends(JWT.oauth2_scheme)],
session: Annotated[AsyncSession, Depends(Database.get_session)],
@@ -19,18 +17,13 @@ async def get_current_user(
"""
验证用户身份并返回当前用户信息。
"""
not_login_exception = HTTPException(
status_code=401,
detail="Login required",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, await JWT.get_secret_key(), algorithms=[JWT.ALGORITHM])
username = payload.get("sub")
stored_account = await User.get(session, User.email == username)
if username is None or stored_account.email != username:
raise not_login_exception
utils.raise_unauthorized("Login required")
return stored_account
except InvalidTokenError:
raise not_login_exception
utils.raise_unauthorized("Login required")