From 800c85bf8daba818cfd526b11c2a0e56d60c5156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E5=B0=8F=E4=B8=98?= Date: Fri, 13 Feb 2026 12:56:46 +0800 Subject: [PATCH] feat: implement WebAuthn credential registration, login verification, and management Complete the WebAuthn/Passkey flow that was previously stubbed out: - Add ChallengeStore (Redis + TTLCache fallback) for challenge lifecycle - Add RP config helper to extract rp_id/origin from site settings - Fix registration start (exclude_credentials, user_id, challenge storage) - Implement registration finish (verify + create UserAuthn & AuthIdentity) - Add authentication options endpoint for Discoverable Credentials login - Fix passkey login to use challenge_token and base64url encoding - Add credential management endpoints (list/rename/delete) Co-Authored-By: Claude Opus 4.6 --- routers/api/v1/user/__init__.py | 164 ++++++++++++++++++++--- routers/api/v1/user/settings/__init__.py | 104 ++++++++++++++ service/redis/challenge_store.py | 68 ++++++++++ service/user/login.py | 61 +++++---- service/webauthn.py | 41 ++++++ sqlmodels/__init__.py | 7 +- sqlmodels/user.py | 4 +- sqlmodels/user_authn.py | 61 +++++++-- 8 files changed, 451 insertions(+), 59 deletions(-) create mode 100644 service/redis/challenge_store.py create mode 100644 service/webauthn.py diff --git a/routers/api/v1/user/__init__.py b/routers/api/v1/user/__init__.py index cb89015..335346c 100644 --- a/routers/api/v1/user/__init__.py +++ b/routers/api/v1/user/__init__.py @@ -1,20 +1,30 @@ from typing import Annotated, Literal from uuid import UUID, uuid4 +import json + import jwt from fastapi import APIRouter, Depends, HTTPException from itsdangerous import URLSafeTimedSerializer from loguru import logger -from webauthn import generate_registration_options -from webauthn.helpers import options_to_json_dict +from webauthn import ( + generate_authentication_options, + generate_registration_options, + verify_registration_response, +) +from webauthn.helpers import bytes_to_base64url, options_to_json +from webauthn.helpers.structs import PublicKeyCredentialDescriptor import service import sqlmodels from middleware.auth import auth_required from middleware.dependencies import SessionDep, require_captcha from service.captcha import CaptchaScene +from service.redis.challenge_store import ChallengeStore +from service.webauthn import get_rp_config from sqlmodels.auth_identity import AuthIdentity, AuthProviderType from sqlmodels.user import UserStatus +from sqlmodels.user_authn import UserAuthn from utils import JWT, Password, http_exceptions from .settings import user_settings_router @@ -445,7 +455,7 @@ async def router_user_storage( async def router_user_authn_start( session: SessionDep, user: Annotated[sqlmodels.user.User, Depends(auth_required)], -) -> sqlmodels.ResponseBase: +) -> dict: """ Passkey 注册初始化(需要登录) @@ -456,43 +466,159 @@ async def router_user_authn_start( """ authn_setting = await sqlmodels.Setting.get( session, - (sqlmodels.Setting.type == "authn") & (sqlmodels.Setting.name == "authn_enabled") + (sqlmodels.Setting.type == sqlmodels.SettingsType.AUTHN) + & (sqlmodels.Setting.name == "authn_enabled"), ) if not authn_setting or authn_setting.value != "1": - raise HTTPException(status_code=400, detail="Passkey 未启用") + http_exceptions.raise_bad_request("Passkey 未启用") - site_url_setting = await sqlmodels.Setting.get( + rp_id, rp_name, _origin = await get_rp_config(session) + + # 查询用户已注册凭证,用于 exclude_credentials + existing_authns: list[UserAuthn] = await UserAuthn.get( session, - (sqlmodels.Setting.type == "basic") & (sqlmodels.Setting.name == "siteURL") - ) - site_title_setting = await sqlmodels.Setting.get( - session, - (sqlmodels.Setting.type == "basic") & (sqlmodels.Setting.name == "siteTitle") + UserAuthn.user_id == user.id, + fetch_mode="all", ) + exclude_credentials: list[PublicKeyCredentialDescriptor] = [ + PublicKeyCredentialDescriptor( + id=authn.credential_id, + transports=authn.transports.split(",") if authn.transports else [], + ) + for authn in existing_authns + ] options = generate_registration_options( - rp_id=site_url_setting.value if site_url_setting else "", - rp_name=site_title_setting.value if site_title_setting else "", + rp_id=rp_id, + rp_name=rp_name, + user_id=user.id.bytes, user_name=user.email or str(user.id), user_display_name=user.nickname or user.email or str(user.id), + exclude_credentials=exclude_credentials if exclude_credentials else None, ) - return sqlmodels.ResponseBase(data=options_to_json_dict(options)) + # 存储 challenge + await ChallengeStore.store(f"reg:{user.id}", options.challenge) + + return json.loads(options_to_json(options)) + @user_router.put( path='/authn/finish', summary='注册 Passkey 凭证(完成)', description='Finish Passkey registration for a user.', dependencies=[Depends(auth_required)], + status_code=201, ) -def router_user_authn_finish() -> sqlmodels.ResponseBase: +async def router_user_authn_finish( + session: SessionDep, + user: Annotated[sqlmodels.user.User, Depends(auth_required)], + request: sqlmodels.AuthnFinishRequest, +) -> sqlmodels.AuthnDetailResponse: """ Passkey 注册完成(需要登录) 接收前端 navigator.credentials.create() 返回的凭证数据, - 创建 UserAuthn 行 + AuthIdentity(provider=passkey)。 + 验证后创建 UserAuthn 行 + AuthIdentity(provider=passkey)。 - Returns: - dict: A dictionary containing Passkey registration information. + 请求体: + - credential: navigator.credentials.create() 返回的 JSON 字符串 + - name: 凭证名称(可选) + + 错误处理: + - 400: challenge 已过期或无效 / 验证失败 """ - http_exceptions.raise_not_implemented() + # 取出 challenge(一次性) + challenge: bytes | None = await ChallengeStore.retrieve_and_delete(f"reg:{user.id}") + if challenge is None: + http_exceptions.raise_bad_request("注册会话已过期,请重新开始") + + rp_id, _rp_name, origin = await get_rp_config(session) + + # 验证注册响应 + try: + verification = verify_registration_response( + credential=request.credential, + expected_challenge=challenge, + expected_rp_id=rp_id, + expected_origin=origin, + ) + except Exception as e: + logger.warning(f"WebAuthn 注册验证失败: {e}") + http_exceptions.raise_bad_request("Passkey 验证失败") + + # 编码为 base64url 存储 + credential_id_b64: str = bytes_to_base64url(verification.credential_id) + credential_public_key_b64: str = bytes_to_base64url(verification.credential_public_key) + + # 提取 transports + credential_dict: dict = json.loads(request.credential) + response_dict: dict = credential_dict.get("response", {}) + transports_list: list[str] = response_dict.get("transports", []) + transports_str: str | None = ",".join(transports_list) if transports_list else None + + # 创建 UserAuthn 记录 + authn = UserAuthn( + credential_id=credential_id_b64, + credential_public_key=credential_public_key_b64, + sign_count=verification.sign_count, + credential_device_type=verification.credential_device_type, + credential_backed_up=verification.credential_backed_up, + transports=transports_str, + name=request.name, + user_id=user.id, + ) + authn = await authn.save(session) + + # 创建 AuthIdentity(provider=passkey,identifier=credential_id_b64) + identity = AuthIdentity( + provider=AuthProviderType.PASSKEY, + identifier=credential_id_b64, + is_primary=False, + is_verified=True, + user_id=user.id, + ) + await identity.save(session) + + return authn.to_detail_response() + + +@user_router.post( + path='/authn/options', + summary='获取 Passkey 登录 options(无需登录)', + description='Generate authentication options for Passkey login.', +) +async def router_user_authn_options( + session: SessionDep, +) -> dict: + """ + 获取 Passkey 登录的 authentication options(无需登录) + + 前端调用此端点获取 options 后使用 navigator.credentials.get() 处理。 + 使用 Discoverable Credentials 模式(空 allow_credentials), + 由浏览器/平台决定展示哪些凭证。 + + 返回值包含 ``challenge_token`` 字段,前端在登录请求中作为 ``identifier`` 传入。 + + 错误处理: + - 400: Passkey 未启用 + """ + authn_setting = await sqlmodels.Setting.get( + session, + (sqlmodels.Setting.type == sqlmodels.SettingsType.AUTHN) + & (sqlmodels.Setting.name == "authn_enabled"), + ) + if not authn_setting or authn_setting.value != "1": + http_exceptions.raise_bad_request("Passkey 未启用") + + rp_id, _rp_name, _origin = await get_rp_config(session) + + options = generate_authentication_options(rp_id=rp_id) + + # 生成 challenge_token 用于关联 challenge + challenge_token: str = str(uuid4()) + await ChallengeStore.store(f"auth:{challenge_token}", options.challenge) + + result: dict = json.loads(options_to_json(options)) + result["challenge_token"] = challenge_token + return result diff --git a/routers/api/v1/user/settings/__init__.py b/routers/api/v1/user/settings/__init__.py index 534b6bb..1883463 100644 --- a/routers/api/v1/user/settings/__init__.py +++ b/routers/api/v1/user/settings/__init__.py @@ -11,8 +11,10 @@ from sqlmodels import ( BUILTIN_DEFAULT_COLORS, ThemePreset, UserThemeUpdateRequest, SettingOption, UserSettingUpdateRequest, AuthIdentity, AuthIdentityResponse, AuthProviderType, BindIdentityRequest, + AuthnDetailResponse, AuthnRenameRequest, ) from sqlmodels.color import ThemeColorsBase +from sqlmodels.user_authn import UserAuthn from utils import JWT, Password, http_exceptions from utils.password.pwd import PasswordStatus, TwoFactorResponse, TwoFactorVerifyRequest @@ -449,3 +451,105 @@ async def router_user_settings_unbind_identity( http_exceptions.raise_bad_request("站长要求必须绑定手机号,不能解绑") await AuthIdentity.delete(session, identity) + + +# ==================== WebAuthn 凭证管理 ==================== + +@user_settings_router.get( + path='/authns', + summary='列出用户所有 WebAuthn 凭证', +) +async def router_user_settings_authns( + session: SessionDep, + user: Annotated[sqlmodels.user.User, Depends(auth_required)], +) -> list[AuthnDetailResponse]: + """ + 列出当前用户所有已注册的 WebAuthn 凭证 + + 返回: + - 凭证列表,包含 credential_id、name、device_type 等 + """ + authns: list[UserAuthn] = await UserAuthn.get( + session, + UserAuthn.user_id == user.id, + fetch_mode="all", + ) + return [authn.to_detail_response() for authn in authns] + + +@user_settings_router.patch( + path='/authn/{authn_id}', + summary='重命名 WebAuthn 凭证', +) +async def router_user_settings_rename_authn( + session: SessionDep, + user: Annotated[sqlmodels.user.User, Depends(auth_required)], + authn_id: int, + request: AuthnRenameRequest, +) -> AuthnDetailResponse: + """ + 重命名一个 WebAuthn 凭证 + + 错误处理: + - 404: 凭证不存在或不属于当前用户 + """ + authn: UserAuthn | None = await UserAuthn.get( + session, + (UserAuthn.id == authn_id) & (UserAuthn.user_id == user.id), + ) + if not authn: + http_exceptions.raise_not_found("WebAuthn 凭证不存在") + + authn.name = request.name + authn = await authn.save(session) + return authn.to_detail_response() + + +@user_settings_router.delete( + path='/authn/{authn_id}', + summary='删除 WebAuthn 凭证', + status_code=status.HTTP_204_NO_CONTENT, +) +async def router_user_settings_delete_authn( + session: SessionDep, + user: Annotated[sqlmodels.user.User, Depends(auth_required)], + authn_id: int, +) -> None: + """ + 删除一个 WebAuthn 凭证 + + 同时删除对应的 AuthIdentity(provider=passkey) 记录。 + 如果这是用户最后一个认证身份,拒绝删除。 + + 错误处理: + - 404: 凭证不存在或不属于当前用户 + - 400: 不能删除最后一个认证身份 + """ + authn: UserAuthn | None = await UserAuthn.get( + session, + (UserAuthn.id == authn_id) & (UserAuthn.user_id == user.id), + ) + if not authn: + http_exceptions.raise_not_found("WebAuthn 凭证不存在") + + # 检查是否为最后一个认证身份 + all_identities: list[AuthIdentity] = await AuthIdentity.get( + session, + AuthIdentity.user_id == user.id, + fetch_mode="all", + ) + if len(all_identities) <= 1: + http_exceptions.raise_bad_request("不能删除最后一个认证身份") + + # 删除对应的 AuthIdentity + passkey_identity: AuthIdentity | None = await AuthIdentity.get( + session, + (AuthIdentity.provider == AuthProviderType.PASSKEY) + & (AuthIdentity.identifier == authn.credential_id) + & (AuthIdentity.user_id == user.id), + ) + if passkey_identity: + await AuthIdentity.delete(session, passkey_identity, commit=False) + + # 删除 UserAuthn + await UserAuthn.delete(session, authn) diff --git a/service/redis/challenge_store.py b/service/redis/challenge_store.py new file mode 100644 index 0000000..2736c2d --- /dev/null +++ b/service/redis/challenge_store.py @@ -0,0 +1,68 @@ +""" +WebAuthn Challenge 一次性存储 + +支持 Redis(首选,使用 GETDEL 原子操作)和内存 TTLCache(降级)。 +Challenge 存储后 5 分钟过期,取出即删除(防重放)。 +""" +from typing import ClassVar + +from cachetools import TTLCache +from loguru import logger as l + +from . import RedisManager + +# Challenge 过期时间(秒) +_CHALLENGE_TTL: int = 300 + + +class ChallengeStore: + """ + WebAuthn Challenge 一次性存储管理器 + + 根据 Redis 可用性自动选择存储后端: + - Redis 可用:使用 Redis GETDEL 原子操作 + - Redis 不可用:使用内存 TTLCache(仅单实例) + + Key 约定: + - 注册: ``reg:{user_id}`` + - 登录: ``auth:{challenge_token}`` + """ + + _memory_cache: ClassVar[TTLCache[str, bytes]] = TTLCache( + maxsize=10000, + ttl=_CHALLENGE_TTL, + ) + """内存缓存降级方案""" + + @classmethod + async def store(cls, key: str, challenge: bytes) -> None: + """ + 存储 challenge,TTL 5 分钟。 + + :param key: 存储键(如 ``reg:{user_id}`` 或 ``auth:{token}``) + :param challenge: challenge 字节数据 + """ + client = RedisManager.get_client() + + if client is not None: + redis_key = f"webauthn_challenge:{key}" + await client.set(redis_key, challenge, ex=_CHALLENGE_TTL) + else: + cls._memory_cache[key] = challenge + + @classmethod + async def retrieve_and_delete(cls, key: str) -> bytes | None: + """ + 一次性取出并删除 challenge(防重放)。 + + :param key: 存储键 + :return: challenge 字节数据,过期或不存在时返回 None + """ + client = RedisManager.get_client() + + if client is not None: + redis_key = f"webauthn_challenge:{key}" + result: bytes | None = await client.getdel(redis_key) + return result + else: + return cls._memory_cache.pop(key, None) diff --git a/service/user/login.py b/service/user/login.py index 7515f3b..aace36c 100644 --- a/service/user/login.py +++ b/service/user/login.py @@ -276,52 +276,55 @@ async def _login_passkey( request: UnifiedLoginRequest, ) -> User: """ - Passkey/WebAuthn 登录 + Passkey/WebAuthn 登录(Discoverable Credentials 模式) - identifier 为 credential_id,credential 为 JSON 格式的 authenticator assertion response。 + identifier 为 challenge_token(前端从 ``POST /authn/options`` 获取), + credential 为 JSON 格式的 authenticator assertion response。 """ from webauthn import verify_authentication_response - from webauthn.helpers.structs import AuthenticationCredential + from webauthn.helpers import base64url_to_bytes + + from service.redis.challenge_store import ChallengeStore + from service.webauthn import get_rp_config + from sqlmodels.user_authn import UserAuthn if not request.credential: http_exceptions.raise_bad_request("WebAuthn assertion response 不能为空") - # 查找 AuthIdentity - identity: AuthIdentity | None = await AuthIdentity.get( - session, - (AuthIdentity.provider == AuthProviderType.PASSKEY) - & (AuthIdentity.identifier == request.identifier), - ) - if not identity: - http_exceptions.raise_unauthorized("Passkey 凭证未注册") + if not request.identifier: + http_exceptions.raise_bad_request("challenge_token 不能为空") - # 加载对应的 UserAuthn 记录 - from sqlmodels.user_authn import UserAuthn + # 从 ChallengeStore 取出 challenge(一次性,防重放) + challenge: bytes | None = await ChallengeStore.retrieve_and_delete(f"auth:{request.identifier}") + if challenge is None: + http_exceptions.raise_unauthorized("登录会话已过期,请重新获取 options") + + # 从 assertion JSON 中解析 credential_id(Discoverable Credentials 模式) + import orjson + credential_dict: dict = orjson.loads(request.credential) + credential_id_b64: str | None = credential_dict.get("id") + if not credential_id_b64: + http_exceptions.raise_bad_request("缺少凭证 ID") + + # 查找 UserAuthn 记录 authn: UserAuthn | None = await UserAuthn.get( session, - UserAuthn.credential_id == request.identifier, + UserAuthn.credential_id == credential_id_b64, ) if not authn: - http_exceptions.raise_unauthorized("Passkey 凭证数据不存在") + http_exceptions.raise_unauthorized("Passkey 凭证未注册") - # 获取 RP ID - site_url_setting = await Setting.get( - session, - (Setting.type == SettingsType.BASIC) & (Setting.name == "siteURL"), - ) - rp_id = site_url_setting.value if site_url_setting else "localhost" + # 获取 RP 配置 + rp_id, _rp_name, origin = await get_rp_config(session) # 验证 WebAuthn assertion - import orjson - credential = AuthenticationCredential.model_validate(orjson.loads(request.credential)) - try: verification = verify_authentication_response( - credential=credential, + credential=request.credential, expected_rp_id=rp_id, - expected_origin=f"https://{rp_id}", - expected_challenge=b"", # TODO: 从 session/cache 中获取 challenge - credential_public_key=bytes.fromhex(authn.credential_public_key), + expected_origin=origin, + expected_challenge=challenge, + credential_public_key=base64url_to_bytes(authn.credential_public_key), credential_current_sign_count=authn.sign_count, ) except Exception as e: @@ -333,7 +336,7 @@ async def _login_passkey( await authn.save(session) # 加载用户 - user: User = await User.get(session, User.id == identity.user_id, load=User.group) + user: User = await User.get(session, User.id == authn.user_id, load=User.group) if not user: http_exceptions.raise_unauthorized("用户不存在") if user.status != UserStatus.ACTIVE: diff --git a/service/webauthn.py b/service/webauthn.py new file mode 100644 index 0000000..7b5e28d --- /dev/null +++ b/service/webauthn.py @@ -0,0 +1,41 @@ +""" +WebAuthn RP(Relying Party)配置辅助模块 + +从数据库 Setting 中读取 siteURL / siteTitle, +解析出 rp_id、rp_name、origin,供注册/登录流程复用。 +""" +from urllib.parse import urlparse + +from sqlmodel.ext.asyncio.session import AsyncSession + +from sqlmodels.setting import Setting, SettingsType + + +async def get_rp_config(session: AsyncSession) -> tuple[str, str, str]: + """ + 获取 WebAuthn RP 配置。 + + :param session: 数据库会话 + :return: ``(rp_id, rp_name, origin)`` 元组 + + - ``rp_id``: 站点域名(从 siteURL 解析,如 ``example.com``) + - ``rp_name``: 站点标题 + - ``origin``: 完整 origin(如 ``https://example.com``) + """ + site_url_setting: Setting | None = await Setting.get( + session, + (Setting.type == SettingsType.BASIC) & (Setting.name == "siteURL"), + ) + site_title_setting: Setting | None = await Setting.get( + session, + (Setting.type == SettingsType.BASIC) & (Setting.name == "siteTitle"), + ) + + site_url: str = site_url_setting.value if site_url_setting and site_url_setting.value else "https://localhost" + rp_name: str = site_title_setting.value if site_title_setting and site_title_setting.value else "DiskNext" + + parsed = urlparse(site_url) + rp_id: str = parsed.hostname or "localhost" + origin: str = f"{parsed.scheme}://{parsed.netloc}" if parsed.netloc else site_url + + return rp_id, rp_name, origin diff --git a/sqlmodels/__init__.py b/sqlmodels/__init__.py index c0b1246..eb0a34e 100644 --- a/sqlmodels/__init__.py +++ b/sqlmodels/__init__.py @@ -30,7 +30,12 @@ from .user import ( UserCalibrateResponse, UserAdminDetailResponse, ) -from .user_authn import AuthnResponse, UserAuthn +from .user_authn import ( + AuthnDetailResponse, + AuthnFinishRequest, + AuthnRenameRequest, + UserAuthn, +) from .color import ChromaticColor, NeutralColor, ThemeColorsBase, BUILTIN_DEFAULT_COLORS from .theme_preset import ( ThemePreset, ThemePresetBase, diff --git a/sqlmodels/user.py b/sqlmodels/user.py index 9eacde9..77fdb86 100644 --- a/sqlmodels/user.py +++ b/sqlmodels/user.py @@ -296,7 +296,7 @@ class UserSettingResponse(SQLModelBase): timezone: int """时区""" - authn: "list[AuthnResponse] | None" = None + authn: "list[AuthnDetailResponse] | None" = None """认证信息""" group_expires: datetime | None = None @@ -448,7 +448,7 @@ class UserAdminDetailResponse(UserPublic): # 前向引用导入 from .group import GroupClaims, GroupResponse # noqa: E402 -from .user_authn import AuthnResponse # noqa: E402 +from .user_authn import AuthnDetailResponse # noqa: E402 # 更新前向引用 JWTPayload.model_rebuild() diff --git a/sqlmodels/user_authn.py b/sqlmodels/user_authn.py index 306243b..eac498e 100644 --- a/sqlmodels/user_authn.py +++ b/sqlmodels/user_authn.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import TYPE_CHECKING from uuid import UUID @@ -13,14 +14,46 @@ if TYPE_CHECKING: # ==================== DTO 模型 ==================== -class AuthnResponse(SQLModelBase): - """WebAuthn 响应 DTO""" +class AuthnFinishRequest(SQLModelBase): + """WebAuthn 注册完成请求 DTO""" - id: str - """凭证ID""" + credential: str + """前端 navigator.credentials.create() 返回的 JSON 字符串""" - fingerprint: str - """凭证指纹""" + name: str | None = None + """用户自定义的凭证名称""" + + +class AuthnDetailResponse(SQLModelBase): + """WebAuthn 凭证详情响应 DTO""" + + id: int + """凭证数据库 ID""" + + credential_id: str + """凭证 ID(Base64URL 编码)""" + + name: str | None = None + """用户自定义的凭证名称""" + + credential_device_type: str + """凭证设备类型""" + + credential_backed_up: bool + """凭证是否已备份""" + + transports: str | None = None + """支持的传输方式""" + + created_at: datetime + """创建时间""" + + +class AuthnRenameRequest(SQLModelBase): + """WebAuthn 凭证重命名请求 DTO""" + + name: str = Field(max_length=100) + """新的凭证名称""" # ==================== 数据库模型 ==================== @@ -29,10 +62,10 @@ class UserAuthn(SQLModelBase, TableBaseMixin): """用户 WebAuthn 凭证模型,与 User 为多对一关系""" credential_id: str = Field(max_length=255, unique=True, index=True) - """凭证 ID,Base64 编码""" + """凭证 ID,Base64URL 编码""" credential_public_key: str = Field(sa_column=Column(Text)) - """凭证公钥,Base64 编码""" + """凭证公钥,Base64URL 编码""" sign_count: int = Field(default=0, ge=0) """签名计数器,用于防重放攻击""" @@ -59,3 +92,15 @@ class UserAuthn(SQLModelBase, TableBaseMixin): # 关系 user: "User" = Relationship(back_populates="authns") + + def to_detail_response(self) -> AuthnDetailResponse: + """转换为详情响应 DTO""" + return AuthnDetailResponse( + id=self.id, + credential_id=self.credential_id, + name=self.name, + credential_device_type=self.credential_device_type, + credential_backed_up=self.credential_backed_up, + transports=self.transports, + created_at=self.created_at, + )