From 20753f1725db334d508095e8a28016886ea38d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E5=B0=8F=E4=B8=98?= Date: Fri, 19 Dec 2025 14:30:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=B8=A4=E6=AD=A5?= =?UTF-8?q?=E9=AA=8C=E8=AF=81=E5=8A=9F=E8=83=BD=EF=BC=8C=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E5=86=97=E4=BD=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/user.py | 3 +- pkg/__init__.py | 1 + pkg/password/pwd.py | 73 ++++++++++++++++++++++++++++++++++++- routers/controllers/user.py | 25 ++----------- service/user/__init__.py | 3 +- service/user/login.py | 3 +- service/user/totp.py | 13 ------- 7 files changed, 79 insertions(+), 42 deletions(-) delete mode 100644 service/user/totp.py diff --git a/models/user.py b/models/user.py index cbb37ac..97cb8dc 100644 --- a/models/user.py +++ b/models/user.py @@ -307,5 +307,4 @@ class User(UserBase, TableBase, table=True): def to_public(self) -> "UserPublic": """转换为公开 DTO,排除敏感字段""" - return UserPublic.model_validate(self) - \ No newline at end of file + return UserPublic.model_validate(self) \ No newline at end of file diff --git a/pkg/__init__.py b/pkg/__init__.py index e69de29..48cbfe3 100644 --- a/pkg/__init__.py +++ b/pkg/__init__.py @@ -0,0 +1 @@ +from .password.pwd import Password, PasswordStatus \ No newline at end of file diff --git a/pkg/password/pwd.py b/pkg/password/pwd.py index 3196dc8..102ad5b 100644 --- a/pkg/password/pwd.py +++ b/pkg/password/pwd.py @@ -3,6 +3,12 @@ from loguru import logger from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError from enum import StrEnum +import pyotp +from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired +from pydantic import BaseModel, Field + +from pkg.JWT.JWT import SECRET_KEY +from pkg.conf import appmeta _ph = PasswordHasher() @@ -18,6 +24,24 @@ class PasswordStatus(StrEnum): EXPIRED = "expired" """密码哈希已过时,建议重新哈希""" +class TwoFactorBase(BaseModel): + """两步验证请求 DTO""" + + setup_token: str + """用于验证的令牌""" + +class TwoFactorResponse(TwoFactorBase): + """两步验证-请求启用时的响应 DTO""" + + uri: str + """用于生成二维码的 URI""" + +class TwoFactorVerifyRequest(TwoFactorBase): + """两步验证-验证请求 DTO""" + + code: int = Field(..., ge=100000, le=999999) + """6 位验证码""" + class Password: """密码处理工具类,包含密码生成、哈希和验证功能""" @@ -76,4 +100,51 @@ class Password: except VerifyMismatchError: # 这是预期的异常,当密码不匹配时触发。 return PasswordStatus.INVALID - # 其他异常(如哈希格式错误)应该传播,让调用方感知系统问题 \ No newline at end of file + # 其他异常(如哈希格式错误)应该传播,让调用方感知系统问题 + + @staticmethod + async def generate_totp( + username: str + ) -> TwoFactorResponse: + """ + 生成 TOTP 密钥和对应的 URI,用于两步验证。 + + :return: 包含 TOTP 密钥和 URI 的元组 + """ + + serializer = URLSafeTimedSerializer(SECRET_KEY) + + secret = pyotp.random_base32() + + setup_token = serializer.dumps( + secret, + salt="2fa-setup-salt" + ) + + otp_uri = pyotp.totp.TOTP(secret).provisioning_uri( + name=username, + issuer_name=appmeta.APP_NAME + ) + + return TwoFactorResponse( + uri=otp_uri, + setup_token=setup_token + ) + + @staticmethod + def verify_totp( + secret: str, + code: str + ) -> PasswordStatus: + """ + 验证 TOTP 验证码。 + + :param secret: TOTP 密钥(Base32 编码) + :param code: 用户输入的 6 位验证码 + :return: 验证是否成功 + """ + totp = pyotp.TOTP(secret) + if totp.verify(code): + return PasswordStatus.VALID + else: + return PasswordStatus.INVALID \ No newline at end of file diff --git a/routers/controllers/user.py b/routers/controllers/user.py index 38f2e04..67e2056 100644 --- a/routers/controllers/user.py +++ b/routers/controllers/user.py @@ -13,6 +13,7 @@ import service from middleware.auth import AuthRequired from middleware.dependencies import SessionDep from pkg.JWT.JWT import SECRET_KEY +from pkg import Password user_router = APIRouter( prefix="/user", @@ -445,7 +446,6 @@ def router_user_settings_patch(option: str) -> models.response.ResponseModel: dependencies=[Depends(AuthRequired)], ) async def router_user_settings_2fa( - session: SessionDep, user: Annotated[models.user.User, Depends(AuthRequired)], ) -> models.response.ResponseModel: """ @@ -455,27 +455,8 @@ async def router_user_settings_2fa( dict: A dictionary containing two-factor authentication setup information. """ - serializer = URLSafeTimedSerializer(SECRET_KEY) - - secret = pyotp.random_base32() - - setup_token = serializer.dumps( - secret, - salt="2fa-setup-salt" - ) - - site_Name = await models.Setting.get(session, (models.Setting.type == models.SettingsType.BASIC) & (models.Setting.name == "siteName")) - - otp_uri = pyotp.totp.TOTP(secret).provisioning_uri( - name=user.username, - issuer_name=site_Name.value - ) - return models.response.ResponseModel( - data={ - "setup_token": setup_token, - "otp_uri": otp_uri, - } + data=await Password.generate_totp(user.username) ) @user_settings_router.post( @@ -508,7 +489,7 @@ async def router_user_settings_2fa_enable( raise HTTPException(status_code=400, detail="Invalid token") # 2. 验证用户输入的 6 位验证码 - if not service.user.verify_totp(secret, code): + if not Password.verify_totp(secret, code): raise HTTPException(status_code=400, detail="Invalid OTP code") # 3. 将 secret 存储到用户的数据库记录中,启用 2FA diff --git a/service/user/__init__.py b/service/user/__init__.py index e72eaf5..751474d 100644 --- a/service/user/__init__.py +++ b/service/user/__init__.py @@ -1,2 +1 @@ -from .login import Login -from .totp import verify_totp \ No newline at end of file +from .login import Login \ No newline at end of file diff --git a/service/user/login.py b/service/user/login.py index 578dcfa..422475f 100644 --- a/service/user/login.py +++ b/service/user/login.py @@ -5,7 +5,6 @@ from sqlmodel.ext.asyncio.session import AsyncSession from models import LoginRequest, TokenResponse, User from pkg.JWT.JWT import create_access_token, create_refresh_token -from .totp import verify_totp async def Login( @@ -61,7 +60,7 @@ async def Login( return "2fa_required" # 验证 OTP 码 - if not verify_totp(current_user.two_factor, login_request.two_fa_code): + if not Password.verify_totp(current_user.two_factor, login_request.two_fa_code): log.debug(f"Invalid 2FA code for user: {login_request.username}") return "2fa_invalid" diff --git a/service/user/totp.py b/service/user/totp.py deleted file mode 100644 index 6afd906..0000000 --- a/service/user/totp.py +++ /dev/null @@ -1,13 +0,0 @@ -import pyotp - - -def verify_totp(secret: str, code: str) -> bool: - """ - 验证 TOTP 验证码。 - - :param secret: TOTP 密钥(Base32 编码) - :param code: 用户输入的 6 位验证码 - :return: 验证是否成功 - """ - totp = pyotp.TOTP(secret) - return totp.verify(code)