feat: 添加两步验证功能,重构相关逻辑,移除冗余代码

This commit is contained in:
2025-12-19 14:30:13 +08:00
parent b7c5d5aec7
commit 20753f1725
7 changed files with 79 additions and 42 deletions

View File

@@ -308,4 +308,3 @@ class User(UserBase, TableBase, table=True):
def to_public(self) -> "UserPublic":
"""转换为公开 DTO排除敏感字段"""
return UserPublic.model_validate(self)

View File

@@ -0,0 +1 @@
from .password.pwd import Password, PasswordStatus

View File

@@ -3,6 +3,12 @@ from loguru import logger
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from enum import StrEnum
import pyotp
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from pydantic import BaseModel, Field
from pkg.JWT.JWT import SECRET_KEY
from pkg.conf import appmeta
_ph = PasswordHasher()
@@ -18,6 +24,24 @@ class PasswordStatus(StrEnum):
EXPIRED = "expired"
"""密码哈希已过时,建议重新哈希"""
class TwoFactorBase(BaseModel):
"""两步验证请求 DTO"""
setup_token: str
"""用于验证的令牌"""
class TwoFactorResponse(TwoFactorBase):
"""两步验证-请求启用时的响应 DTO"""
uri: str
"""用于生成二维码的 URI"""
class TwoFactorVerifyRequest(TwoFactorBase):
"""两步验证-验证请求 DTO"""
code: int = Field(..., ge=100000, le=999999)
"""6 位验证码"""
class Password:
"""密码处理工具类,包含密码生成、哈希和验证功能"""
@@ -77,3 +101,50 @@ class Password:
# 这是预期的异常,当密码不匹配时触发。
return PasswordStatus.INVALID
# 其他异常(如哈希格式错误)应该传播,让调用方感知系统问题
@staticmethod
async def generate_totp(
username: str
) -> TwoFactorResponse:
"""
生成 TOTP 密钥和对应的 URI用于两步验证。
:return: 包含 TOTP 密钥和 URI 的元组
"""
serializer = URLSafeTimedSerializer(SECRET_KEY)
secret = pyotp.random_base32()
setup_token = serializer.dumps(
secret,
salt="2fa-setup-salt"
)
otp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=username,
issuer_name=appmeta.APP_NAME
)
return TwoFactorResponse(
uri=otp_uri,
setup_token=setup_token
)
@staticmethod
def verify_totp(
secret: str,
code: str
) -> PasswordStatus:
"""
验证 TOTP 验证码。
:param secret: TOTP 密钥Base32 编码)
:param code: 用户输入的 6 位验证码
:return: 验证是否成功
"""
totp = pyotp.TOTP(secret)
if totp.verify(code):
return PasswordStatus.VALID
else:
return PasswordStatus.INVALID

View File

@@ -13,6 +13,7 @@ import service
from middleware.auth import AuthRequired
from middleware.dependencies import SessionDep
from pkg.JWT.JWT import SECRET_KEY
from pkg import Password
user_router = APIRouter(
prefix="/user",
@@ -445,7 +446,6 @@ def router_user_settings_patch(option: str) -> models.response.ResponseModel:
dependencies=[Depends(AuthRequired)],
)
async def router_user_settings_2fa(
session: SessionDep,
user: Annotated[models.user.User, Depends(AuthRequired)],
) -> models.response.ResponseModel:
"""
@@ -455,27 +455,8 @@ async def router_user_settings_2fa(
dict: A dictionary containing two-factor authentication setup information.
"""
serializer = URLSafeTimedSerializer(SECRET_KEY)
secret = pyotp.random_base32()
setup_token = serializer.dumps(
secret,
salt="2fa-setup-salt"
)
site_Name = await models.Setting.get(session, (models.Setting.type == models.SettingsType.BASIC) & (models.Setting.name == "siteName"))
otp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user.username,
issuer_name=site_Name.value
)
return models.response.ResponseModel(
data={
"setup_token": setup_token,
"otp_uri": otp_uri,
}
data=await Password.generate_totp(user.username)
)
@user_settings_router.post(
@@ -508,7 +489,7 @@ async def router_user_settings_2fa_enable(
raise HTTPException(status_code=400, detail="Invalid token")
# 2. 验证用户输入的 6 位验证码
if not service.user.verify_totp(secret, code):
if not Password.verify_totp(secret, code):
raise HTTPException(status_code=400, detail="Invalid OTP code")
# 3. 将 secret 存储到用户的数据库记录中,启用 2FA

View File

@@ -1,2 +1 @@
from .login import Login
from .totp import verify_totp

View File

@@ -5,7 +5,6 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from models import LoginRequest, TokenResponse, User
from pkg.JWT.JWT import create_access_token, create_refresh_token
from .totp import verify_totp
async def Login(
@@ -61,7 +60,7 @@ async def Login(
return "2fa_required"
# 验证 OTP 码
if not verify_totp(current_user.two_factor, login_request.two_fa_code):
if not Password.verify_totp(current_user.two_factor, login_request.two_fa_code):
log.debug(f"Invalid 2FA code for user: {login_request.username}")
return "2fa_invalid"

View File

@@ -1,13 +0,0 @@
import pyotp
def verify_totp(secret: str, code: str) -> bool:
"""
验证 TOTP 验证码。
:param secret: TOTP 密钥Base32 编码)
:param code: 用户输入的 6 位验证码
:return: 验证是否成功
"""
totp = pyotp.TOTP(secret)
return totp.verify(code)