Files
disknext/utils/JWT/JWT.py
于小丘 44a8959aa5 feat: 更新验证码请求模型,添加 Google reCAPTCHA 和 Cloudflare Turnstile 验证功能
refactor: 修改用户状态字段类型,优化用户模型
fix: 修复启动服务的错误提示信息
refactor: 统一认证依赖,替换为 AuthRequired
docs: 添加用户会话刷新接口
2025-12-25 10:26:45 +08:00

53 lines
1.7 KiB
Python

from datetime import datetime, timedelta, timezone
import jwt
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(
scheme_name='获取 JWT Bearer 令牌',
description='用于获取 JWT Bearer 令牌,需要以表单的形式提交',
tokenUrl="/api/v1/user/session",
refreshUrl="/api/v1/user/session/refresh",
)
SECRET_KEY = ''
async def load_secret_key() -> None:
"""
从数据库读取 JWT 的密钥。
"""
# 延迟导入以避免循环依赖
from models.database import get_session
from models.setting import Setting
global SECRET_KEY
async for session in get_session():
setting = await Setting.get(
session,
(Setting.type == "auth") & (Setting.name == "secret_key")
)
if setting:
SECRET_KEY = setting.value
# 访问令牌
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, datetime]:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=3)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm='HS256')
return encoded_jwt, expire
# 刷新令牌
def create_refresh_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, datetime]:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(days=30)
to_encode.update({"exp": expire, "token_type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm='HS256')
return encoded_jwt, expire