Files
disknext/middleware/auth.py

62 lines
1.5 KiB
Python

from typing import Annotated
from fastapi import Depends, HTTPException
from jwt import InvalidTokenError
import jwt
from models.user import User
from pkg.JWT import JWT
from .dependencies import SessionDep
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def AuthRequired(
session: SessionDep,
token: Annotated[str, Depends(JWT.oauth2_scheme)],
) -> User:
"""
AuthRequired 需要登录
"""
try:
payload = jwt.decode(token, JWT.SECRET_KEY, algorithms=["HS256"])
username = payload.get("sub")
if username is None:
raise credentials_exception
# 从数据库获取用户信息
user = await User.get(session, User.username == username)
if not user:
raise credentials_exception
return user
except InvalidTokenError:
raise credentials_exception
async def SignRequired(
session: SessionDep,
token: Annotated[str, Depends(JWT.oauth2_scheme)],
) -> User | None:
"""
SignAuthRequired 需要验证请求签名
"""
pass
async def AdminRequired(
user: Annotated[User, Depends(AuthRequired)],
) -> User:
"""
验证是否为管理员。
使用方法:
>>> APIRouter(dependencies=[Depends(AdminRequired)])
"""
group = await user.awaitable_attrs.group
if group.admin:
return user
raise HTTPException(status_code=403, detail="Admin Required")