Files
disknext/middleware/auth.py
于小丘 b12aad4e73 feat: Enhance file management and user features
- Add file deduplication mechanism based on PhysicalFile reference counting.
- Implement chunked upload support for large files with resumable uploads.
- Update sharing page to automatically render README and preview content.
- Integrate Redis for caching and token storage (optional).
- Refactor project structure to include new models for download tasks, nodes, and tasks.
- Introduce user filtering parameters for admin user management.
- Add CORS middleware for handling cross-origin requests.
- Improve error messages for authentication failures.
- Update user model to include two-factor authentication key management.
- Enhance API documentation and response models for clarity.
- Implement admin checks for user management and permissions.
2026-01-13 15:29:52 +08:00

69 lines
1.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Annotated
from uuid import UUID
from fastapi import Depends
import jwt
from models.user import User
from utils import JWT
from .dependencies import SessionDep
from utils import http_exceptions
async def auth_required(
session: SessionDep,
token: Annotated[str, Depends(JWT.oauth2_scheme)],
) -> User:
"""
AuthRequired 需要登录
"""
try:
payload = jwt.decode(token, JWT.SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if user_id is None:
http_exceptions.raise_unauthorized("账号或密码错误")
user_id = UUID(user_id)
# 从数据库获取用户信息
user = await User.get(session, User.id == user_id)
if not user:
http_exceptions.raise_unauthorized("账号或密码错误")
return user
except jwt.InvalidTokenError:
http_exceptions.raise_unauthorized("凭据过期或无效")
async def admin_required(
user: Annotated[User, Depends(auth_required)],
) -> User:
"""
验证是否为管理员。
使用方法:
>>> APIRouter(dependencies=[Depends(admin_required)])
"""
group = await user.awaitable_attrs.group
if group.admin:
return user
raise http_exceptions.raise_forbidden("Admin Required")
def verify_download_token(token: str) -> tuple[str, UUID, UUID] | None:
"""
验证下载令牌并返回 (jti, file_id, owner_id)。
:param token: JWT 令牌字符串
:return: (jti, file_id, owner_id) 或 None验证失败
"""
try:
payload = jwt.decode(token, JWT.SECRET_KEY, algorithms=["HS256"])
if payload.get("type") != "download":
return None
jti = payload.get("jti")
if not jti:
return None
return jti, UUID(payload["file_id"]), UUID(payload["owner_id"])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None