feat: implement WebAuthn credential registration, login verification, and management
Complete the WebAuthn/Passkey flow that was previously stubbed out: - Add ChallengeStore (Redis + TTLCache fallback) for challenge lifecycle - Add RP config helper to extract rp_id/origin from site settings - Fix registration start (exclude_credentials, user_id, challenge storage) - Implement registration finish (verify + create UserAuthn & AuthIdentity) - Add authentication options endpoint for Discoverable Credentials login - Fix passkey login to use challenge_token and base64url encoding - Add credential management endpoints (list/rename/delete) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,20 +1,30 @@
|
||||
from typing import Annotated, Literal
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import json
|
||||
|
||||
import jwt
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from itsdangerous import URLSafeTimedSerializer
|
||||
from loguru import logger
|
||||
from webauthn import generate_registration_options
|
||||
from webauthn.helpers import options_to_json_dict
|
||||
from webauthn import (
|
||||
generate_authentication_options,
|
||||
generate_registration_options,
|
||||
verify_registration_response,
|
||||
)
|
||||
from webauthn.helpers import bytes_to_base64url, options_to_json
|
||||
from webauthn.helpers.structs import PublicKeyCredentialDescriptor
|
||||
|
||||
import service
|
||||
import sqlmodels
|
||||
from middleware.auth import auth_required
|
||||
from middleware.dependencies import SessionDep, require_captcha
|
||||
from service.captcha import CaptchaScene
|
||||
from service.redis.challenge_store import ChallengeStore
|
||||
from service.webauthn import get_rp_config
|
||||
from sqlmodels.auth_identity import AuthIdentity, AuthProviderType
|
||||
from sqlmodels.user import UserStatus
|
||||
from sqlmodels.user_authn import UserAuthn
|
||||
from utils import JWT, Password, http_exceptions
|
||||
from .settings import user_settings_router
|
||||
|
||||
@@ -445,7 +455,7 @@ async def router_user_storage(
|
||||
async def router_user_authn_start(
|
||||
session: SessionDep,
|
||||
user: Annotated[sqlmodels.user.User, Depends(auth_required)],
|
||||
) -> sqlmodels.ResponseBase:
|
||||
) -> dict:
|
||||
"""
|
||||
Passkey 注册初始化(需要登录)
|
||||
|
||||
@@ -456,43 +466,159 @@ async def router_user_authn_start(
|
||||
"""
|
||||
authn_setting = await sqlmodels.Setting.get(
|
||||
session,
|
||||
(sqlmodels.Setting.type == "authn") & (sqlmodels.Setting.name == "authn_enabled")
|
||||
(sqlmodels.Setting.type == sqlmodels.SettingsType.AUTHN)
|
||||
& (sqlmodels.Setting.name == "authn_enabled"),
|
||||
)
|
||||
if not authn_setting or authn_setting.value != "1":
|
||||
raise HTTPException(status_code=400, detail="Passkey 未启用")
|
||||
http_exceptions.raise_bad_request("Passkey 未启用")
|
||||
|
||||
site_url_setting = await sqlmodels.Setting.get(
|
||||
rp_id, rp_name, _origin = await get_rp_config(session)
|
||||
|
||||
# 查询用户已注册凭证,用于 exclude_credentials
|
||||
existing_authns: list[UserAuthn] = await UserAuthn.get(
|
||||
session,
|
||||
(sqlmodels.Setting.type == "basic") & (sqlmodels.Setting.name == "siteURL")
|
||||
)
|
||||
site_title_setting = await sqlmodels.Setting.get(
|
||||
session,
|
||||
(sqlmodels.Setting.type == "basic") & (sqlmodels.Setting.name == "siteTitle")
|
||||
UserAuthn.user_id == user.id,
|
||||
fetch_mode="all",
|
||||
)
|
||||
exclude_credentials: list[PublicKeyCredentialDescriptor] = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
id=authn.credential_id,
|
||||
transports=authn.transports.split(",") if authn.transports else [],
|
||||
)
|
||||
for authn in existing_authns
|
||||
]
|
||||
|
||||
options = generate_registration_options(
|
||||
rp_id=site_url_setting.value if site_url_setting else "",
|
||||
rp_name=site_title_setting.value if site_title_setting else "",
|
||||
rp_id=rp_id,
|
||||
rp_name=rp_name,
|
||||
user_id=user.id.bytes,
|
||||
user_name=user.email or str(user.id),
|
||||
user_display_name=user.nickname or user.email or str(user.id),
|
||||
exclude_credentials=exclude_credentials if exclude_credentials else None,
|
||||
)
|
||||
|
||||
return sqlmodels.ResponseBase(data=options_to_json_dict(options))
|
||||
# 存储 challenge
|
||||
await ChallengeStore.store(f"reg:{user.id}", options.challenge)
|
||||
|
||||
return json.loads(options_to_json(options))
|
||||
|
||||
|
||||
@user_router.put(
|
||||
path='/authn/finish',
|
||||
summary='注册 Passkey 凭证(完成)',
|
||||
description='Finish Passkey registration for a user.',
|
||||
dependencies=[Depends(auth_required)],
|
||||
status_code=201,
|
||||
)
|
||||
def router_user_authn_finish() -> sqlmodels.ResponseBase:
|
||||
async def router_user_authn_finish(
|
||||
session: SessionDep,
|
||||
user: Annotated[sqlmodels.user.User, Depends(auth_required)],
|
||||
request: sqlmodels.AuthnFinishRequest,
|
||||
) -> sqlmodels.AuthnDetailResponse:
|
||||
"""
|
||||
Passkey 注册完成(需要登录)
|
||||
|
||||
接收前端 navigator.credentials.create() 返回的凭证数据,
|
||||
创建 UserAuthn 行 + AuthIdentity(provider=passkey)。
|
||||
验证后创建 UserAuthn 行 + AuthIdentity(provider=passkey)。
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing Passkey registration information.
|
||||
请求体:
|
||||
- credential: navigator.credentials.create() 返回的 JSON 字符串
|
||||
- name: 凭证名称(可选)
|
||||
|
||||
错误处理:
|
||||
- 400: challenge 已过期或无效 / 验证失败
|
||||
"""
|
||||
http_exceptions.raise_not_implemented()
|
||||
# 取出 challenge(一次性)
|
||||
challenge: bytes | None = await ChallengeStore.retrieve_and_delete(f"reg:{user.id}")
|
||||
if challenge is None:
|
||||
http_exceptions.raise_bad_request("注册会话已过期,请重新开始")
|
||||
|
||||
rp_id, _rp_name, origin = await get_rp_config(session)
|
||||
|
||||
# 验证注册响应
|
||||
try:
|
||||
verification = verify_registration_response(
|
||||
credential=request.credential,
|
||||
expected_challenge=challenge,
|
||||
expected_rp_id=rp_id,
|
||||
expected_origin=origin,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"WebAuthn 注册验证失败: {e}")
|
||||
http_exceptions.raise_bad_request("Passkey 验证失败")
|
||||
|
||||
# 编码为 base64url 存储
|
||||
credential_id_b64: str = bytes_to_base64url(verification.credential_id)
|
||||
credential_public_key_b64: str = bytes_to_base64url(verification.credential_public_key)
|
||||
|
||||
# 提取 transports
|
||||
credential_dict: dict = json.loads(request.credential)
|
||||
response_dict: dict = credential_dict.get("response", {})
|
||||
transports_list: list[str] = response_dict.get("transports", [])
|
||||
transports_str: str | None = ",".join(transports_list) if transports_list else None
|
||||
|
||||
# 创建 UserAuthn 记录
|
||||
authn = UserAuthn(
|
||||
credential_id=credential_id_b64,
|
||||
credential_public_key=credential_public_key_b64,
|
||||
sign_count=verification.sign_count,
|
||||
credential_device_type=verification.credential_device_type,
|
||||
credential_backed_up=verification.credential_backed_up,
|
||||
transports=transports_str,
|
||||
name=request.name,
|
||||
user_id=user.id,
|
||||
)
|
||||
authn = await authn.save(session)
|
||||
|
||||
# 创建 AuthIdentity(provider=passkey,identifier=credential_id_b64)
|
||||
identity = AuthIdentity(
|
||||
provider=AuthProviderType.PASSKEY,
|
||||
identifier=credential_id_b64,
|
||||
is_primary=False,
|
||||
is_verified=True,
|
||||
user_id=user.id,
|
||||
)
|
||||
await identity.save(session)
|
||||
|
||||
return authn.to_detail_response()
|
||||
|
||||
|
||||
@user_router.post(
|
||||
path='/authn/options',
|
||||
summary='获取 Passkey 登录 options(无需登录)',
|
||||
description='Generate authentication options for Passkey login.',
|
||||
)
|
||||
async def router_user_authn_options(
|
||||
session: SessionDep,
|
||||
) -> dict:
|
||||
"""
|
||||
获取 Passkey 登录的 authentication options(无需登录)
|
||||
|
||||
前端调用此端点获取 options 后使用 navigator.credentials.get() 处理。
|
||||
使用 Discoverable Credentials 模式(空 allow_credentials),
|
||||
由浏览器/平台决定展示哪些凭证。
|
||||
|
||||
返回值包含 ``challenge_token`` 字段,前端在登录请求中作为 ``identifier`` 传入。
|
||||
|
||||
错误处理:
|
||||
- 400: Passkey 未启用
|
||||
"""
|
||||
authn_setting = await sqlmodels.Setting.get(
|
||||
session,
|
||||
(sqlmodels.Setting.type == sqlmodels.SettingsType.AUTHN)
|
||||
& (sqlmodels.Setting.name == "authn_enabled"),
|
||||
)
|
||||
if not authn_setting or authn_setting.value != "1":
|
||||
http_exceptions.raise_bad_request("Passkey 未启用")
|
||||
|
||||
rp_id, _rp_name, _origin = await get_rp_config(session)
|
||||
|
||||
options = generate_authentication_options(rp_id=rp_id)
|
||||
|
||||
# 生成 challenge_token 用于关联 challenge
|
||||
challenge_token: str = str(uuid4())
|
||||
await ChallengeStore.store(f"auth:{challenge_token}", options.challenge)
|
||||
|
||||
result: dict = json.loads(options_to_json(options))
|
||||
result["challenge_token"] = challenge_token
|
||||
return result
|
||||
|
||||
@@ -11,8 +11,10 @@ from sqlmodels import (
|
||||
BUILTIN_DEFAULT_COLORS, ThemePreset, UserThemeUpdateRequest,
|
||||
SettingOption, UserSettingUpdateRequest,
|
||||
AuthIdentity, AuthIdentityResponse, AuthProviderType, BindIdentityRequest,
|
||||
AuthnDetailResponse, AuthnRenameRequest,
|
||||
)
|
||||
from sqlmodels.color import ThemeColorsBase
|
||||
from sqlmodels.user_authn import UserAuthn
|
||||
from utils import JWT, Password, http_exceptions
|
||||
from utils.password.pwd import PasswordStatus, TwoFactorResponse, TwoFactorVerifyRequest
|
||||
|
||||
@@ -449,3 +451,105 @@ async def router_user_settings_unbind_identity(
|
||||
http_exceptions.raise_bad_request("站长要求必须绑定手机号,不能解绑")
|
||||
|
||||
await AuthIdentity.delete(session, identity)
|
||||
|
||||
|
||||
# ==================== WebAuthn 凭证管理 ====================
|
||||
|
||||
@user_settings_router.get(
|
||||
path='/authns',
|
||||
summary='列出用户所有 WebAuthn 凭证',
|
||||
)
|
||||
async def router_user_settings_authns(
|
||||
session: SessionDep,
|
||||
user: Annotated[sqlmodels.user.User, Depends(auth_required)],
|
||||
) -> list[AuthnDetailResponse]:
|
||||
"""
|
||||
列出当前用户所有已注册的 WebAuthn 凭证
|
||||
|
||||
返回:
|
||||
- 凭证列表,包含 credential_id、name、device_type 等
|
||||
"""
|
||||
authns: list[UserAuthn] = await UserAuthn.get(
|
||||
session,
|
||||
UserAuthn.user_id == user.id,
|
||||
fetch_mode="all",
|
||||
)
|
||||
return [authn.to_detail_response() for authn in authns]
|
||||
|
||||
|
||||
@user_settings_router.patch(
|
||||
path='/authn/{authn_id}',
|
||||
summary='重命名 WebAuthn 凭证',
|
||||
)
|
||||
async def router_user_settings_rename_authn(
|
||||
session: SessionDep,
|
||||
user: Annotated[sqlmodels.user.User, Depends(auth_required)],
|
||||
authn_id: int,
|
||||
request: AuthnRenameRequest,
|
||||
) -> AuthnDetailResponse:
|
||||
"""
|
||||
重命名一个 WebAuthn 凭证
|
||||
|
||||
错误处理:
|
||||
- 404: 凭证不存在或不属于当前用户
|
||||
"""
|
||||
authn: UserAuthn | None = await UserAuthn.get(
|
||||
session,
|
||||
(UserAuthn.id == authn_id) & (UserAuthn.user_id == user.id),
|
||||
)
|
||||
if not authn:
|
||||
http_exceptions.raise_not_found("WebAuthn 凭证不存在")
|
||||
|
||||
authn.name = request.name
|
||||
authn = await authn.save(session)
|
||||
return authn.to_detail_response()
|
||||
|
||||
|
||||
@user_settings_router.delete(
|
||||
path='/authn/{authn_id}',
|
||||
summary='删除 WebAuthn 凭证',
|
||||
status_code=status.HTTP_204_NO_CONTENT,
|
||||
)
|
||||
async def router_user_settings_delete_authn(
|
||||
session: SessionDep,
|
||||
user: Annotated[sqlmodels.user.User, Depends(auth_required)],
|
||||
authn_id: int,
|
||||
) -> None:
|
||||
"""
|
||||
删除一个 WebAuthn 凭证
|
||||
|
||||
同时删除对应的 AuthIdentity(provider=passkey) 记录。
|
||||
如果这是用户最后一个认证身份,拒绝删除。
|
||||
|
||||
错误处理:
|
||||
- 404: 凭证不存在或不属于当前用户
|
||||
- 400: 不能删除最后一个认证身份
|
||||
"""
|
||||
authn: UserAuthn | None = await UserAuthn.get(
|
||||
session,
|
||||
(UserAuthn.id == authn_id) & (UserAuthn.user_id == user.id),
|
||||
)
|
||||
if not authn:
|
||||
http_exceptions.raise_not_found("WebAuthn 凭证不存在")
|
||||
|
||||
# 检查是否为最后一个认证身份
|
||||
all_identities: list[AuthIdentity] = await AuthIdentity.get(
|
||||
session,
|
||||
AuthIdentity.user_id == user.id,
|
||||
fetch_mode="all",
|
||||
)
|
||||
if len(all_identities) <= 1:
|
||||
http_exceptions.raise_bad_request("不能删除最后一个认证身份")
|
||||
|
||||
# 删除对应的 AuthIdentity
|
||||
passkey_identity: AuthIdentity | None = await AuthIdentity.get(
|
||||
session,
|
||||
(AuthIdentity.provider == AuthProviderType.PASSKEY)
|
||||
& (AuthIdentity.identifier == authn.credential_id)
|
||||
& (AuthIdentity.user_id == user.id),
|
||||
)
|
||||
if passkey_identity:
|
||||
await AuthIdentity.delete(session, passkey_identity, commit=False)
|
||||
|
||||
# 删除 UserAuthn
|
||||
await UserAuthn.delete(session, authn)
|
||||
|
||||
68
service/redis/challenge_store.py
Normal file
68
service/redis/challenge_store.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
WebAuthn Challenge 一次性存储
|
||||
|
||||
支持 Redis(首选,使用 GETDEL 原子操作)和内存 TTLCache(降级)。
|
||||
Challenge 存储后 5 分钟过期,取出即删除(防重放)。
|
||||
"""
|
||||
from typing import ClassVar
|
||||
|
||||
from cachetools import TTLCache
|
||||
from loguru import logger as l
|
||||
|
||||
from . import RedisManager
|
||||
|
||||
# Challenge 过期时间(秒)
|
||||
_CHALLENGE_TTL: int = 300
|
||||
|
||||
|
||||
class ChallengeStore:
|
||||
"""
|
||||
WebAuthn Challenge 一次性存储管理器
|
||||
|
||||
根据 Redis 可用性自动选择存储后端:
|
||||
- Redis 可用:使用 Redis GETDEL 原子操作
|
||||
- Redis 不可用:使用内存 TTLCache(仅单实例)
|
||||
|
||||
Key 约定:
|
||||
- 注册: ``reg:{user_id}``
|
||||
- 登录: ``auth:{challenge_token}``
|
||||
"""
|
||||
|
||||
_memory_cache: ClassVar[TTLCache[str, bytes]] = TTLCache(
|
||||
maxsize=10000,
|
||||
ttl=_CHALLENGE_TTL,
|
||||
)
|
||||
"""内存缓存降级方案"""
|
||||
|
||||
@classmethod
|
||||
async def store(cls, key: str, challenge: bytes) -> None:
|
||||
"""
|
||||
存储 challenge,TTL 5 分钟。
|
||||
|
||||
:param key: 存储键(如 ``reg:{user_id}`` 或 ``auth:{token}``)
|
||||
:param challenge: challenge 字节数据
|
||||
"""
|
||||
client = RedisManager.get_client()
|
||||
|
||||
if client is not None:
|
||||
redis_key = f"webauthn_challenge:{key}"
|
||||
await client.set(redis_key, challenge, ex=_CHALLENGE_TTL)
|
||||
else:
|
||||
cls._memory_cache[key] = challenge
|
||||
|
||||
@classmethod
|
||||
async def retrieve_and_delete(cls, key: str) -> bytes | None:
|
||||
"""
|
||||
一次性取出并删除 challenge(防重放)。
|
||||
|
||||
:param key: 存储键
|
||||
:return: challenge 字节数据,过期或不存在时返回 None
|
||||
"""
|
||||
client = RedisManager.get_client()
|
||||
|
||||
if client is not None:
|
||||
redis_key = f"webauthn_challenge:{key}"
|
||||
result: bytes | None = await client.getdel(redis_key)
|
||||
return result
|
||||
else:
|
||||
return cls._memory_cache.pop(key, None)
|
||||
@@ -276,52 +276,55 @@ async def _login_passkey(
|
||||
request: UnifiedLoginRequest,
|
||||
) -> User:
|
||||
"""
|
||||
Passkey/WebAuthn 登录
|
||||
Passkey/WebAuthn 登录(Discoverable Credentials 模式)
|
||||
|
||||
identifier 为 credential_id,credential 为 JSON 格式的 authenticator assertion response。
|
||||
identifier 为 challenge_token(前端从 ``POST /authn/options`` 获取),
|
||||
credential 为 JSON 格式的 authenticator assertion response。
|
||||
"""
|
||||
from webauthn import verify_authentication_response
|
||||
from webauthn.helpers.structs import AuthenticationCredential
|
||||
from webauthn.helpers import base64url_to_bytes
|
||||
|
||||
from service.redis.challenge_store import ChallengeStore
|
||||
from service.webauthn import get_rp_config
|
||||
from sqlmodels.user_authn import UserAuthn
|
||||
|
||||
if not request.credential:
|
||||
http_exceptions.raise_bad_request("WebAuthn assertion response 不能为空")
|
||||
|
||||
# 查找 AuthIdentity
|
||||
identity: AuthIdentity | None = await AuthIdentity.get(
|
||||
session,
|
||||
(AuthIdentity.provider == AuthProviderType.PASSKEY)
|
||||
& (AuthIdentity.identifier == request.identifier),
|
||||
)
|
||||
if not identity:
|
||||
http_exceptions.raise_unauthorized("Passkey 凭证未注册")
|
||||
if not request.identifier:
|
||||
http_exceptions.raise_bad_request("challenge_token 不能为空")
|
||||
|
||||
# 加载对应的 UserAuthn 记录
|
||||
from sqlmodels.user_authn import UserAuthn
|
||||
# 从 ChallengeStore 取出 challenge(一次性,防重放)
|
||||
challenge: bytes | None = await ChallengeStore.retrieve_and_delete(f"auth:{request.identifier}")
|
||||
if challenge is None:
|
||||
http_exceptions.raise_unauthorized("登录会话已过期,请重新获取 options")
|
||||
|
||||
# 从 assertion JSON 中解析 credential_id(Discoverable Credentials 模式)
|
||||
import orjson
|
||||
credential_dict: dict = orjson.loads(request.credential)
|
||||
credential_id_b64: str | None = credential_dict.get("id")
|
||||
if not credential_id_b64:
|
||||
http_exceptions.raise_bad_request("缺少凭证 ID")
|
||||
|
||||
# 查找 UserAuthn 记录
|
||||
authn: UserAuthn | None = await UserAuthn.get(
|
||||
session,
|
||||
UserAuthn.credential_id == request.identifier,
|
||||
UserAuthn.credential_id == credential_id_b64,
|
||||
)
|
||||
if not authn:
|
||||
http_exceptions.raise_unauthorized("Passkey 凭证数据不存在")
|
||||
http_exceptions.raise_unauthorized("Passkey 凭证未注册")
|
||||
|
||||
# 获取 RP ID
|
||||
site_url_setting = await Setting.get(
|
||||
session,
|
||||
(Setting.type == SettingsType.BASIC) & (Setting.name == "siteURL"),
|
||||
)
|
||||
rp_id = site_url_setting.value if site_url_setting else "localhost"
|
||||
# 获取 RP 配置
|
||||
rp_id, _rp_name, origin = await get_rp_config(session)
|
||||
|
||||
# 验证 WebAuthn assertion
|
||||
import orjson
|
||||
credential = AuthenticationCredential.model_validate(orjson.loads(request.credential))
|
||||
|
||||
try:
|
||||
verification = verify_authentication_response(
|
||||
credential=credential,
|
||||
credential=request.credential,
|
||||
expected_rp_id=rp_id,
|
||||
expected_origin=f"https://{rp_id}",
|
||||
expected_challenge=b"", # TODO: 从 session/cache 中获取 challenge
|
||||
credential_public_key=bytes.fromhex(authn.credential_public_key),
|
||||
expected_origin=origin,
|
||||
expected_challenge=challenge,
|
||||
credential_public_key=base64url_to_bytes(authn.credential_public_key),
|
||||
credential_current_sign_count=authn.sign_count,
|
||||
)
|
||||
except Exception as e:
|
||||
@@ -333,7 +336,7 @@ async def _login_passkey(
|
||||
await authn.save(session)
|
||||
|
||||
# 加载用户
|
||||
user: User = await User.get(session, User.id == identity.user_id, load=User.group)
|
||||
user: User = await User.get(session, User.id == authn.user_id, load=User.group)
|
||||
if not user:
|
||||
http_exceptions.raise_unauthorized("用户不存在")
|
||||
if user.status != UserStatus.ACTIVE:
|
||||
|
||||
41
service/webauthn.py
Normal file
41
service/webauthn.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
WebAuthn RP(Relying Party)配置辅助模块
|
||||
|
||||
从数据库 Setting 中读取 siteURL / siteTitle,
|
||||
解析出 rp_id、rp_name、origin,供注册/登录流程复用。
|
||||
"""
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from sqlmodels.setting import Setting, SettingsType
|
||||
|
||||
|
||||
async def get_rp_config(session: AsyncSession) -> tuple[str, str, str]:
|
||||
"""
|
||||
获取 WebAuthn RP 配置。
|
||||
|
||||
:param session: 数据库会话
|
||||
:return: ``(rp_id, rp_name, origin)`` 元组
|
||||
|
||||
- ``rp_id``: 站点域名(从 siteURL 解析,如 ``example.com``)
|
||||
- ``rp_name``: 站点标题
|
||||
- ``origin``: 完整 origin(如 ``https://example.com``)
|
||||
"""
|
||||
site_url_setting: Setting | None = await Setting.get(
|
||||
session,
|
||||
(Setting.type == SettingsType.BASIC) & (Setting.name == "siteURL"),
|
||||
)
|
||||
site_title_setting: Setting | None = await Setting.get(
|
||||
session,
|
||||
(Setting.type == SettingsType.BASIC) & (Setting.name == "siteTitle"),
|
||||
)
|
||||
|
||||
site_url: str = site_url_setting.value if site_url_setting and site_url_setting.value else "https://localhost"
|
||||
rp_name: str = site_title_setting.value if site_title_setting and site_title_setting.value else "DiskNext"
|
||||
|
||||
parsed = urlparse(site_url)
|
||||
rp_id: str = parsed.hostname or "localhost"
|
||||
origin: str = f"{parsed.scheme}://{parsed.netloc}" if parsed.netloc else site_url
|
||||
|
||||
return rp_id, rp_name, origin
|
||||
@@ -30,7 +30,12 @@ from .user import (
|
||||
UserCalibrateResponse,
|
||||
UserAdminDetailResponse,
|
||||
)
|
||||
from .user_authn import AuthnResponse, UserAuthn
|
||||
from .user_authn import (
|
||||
AuthnDetailResponse,
|
||||
AuthnFinishRequest,
|
||||
AuthnRenameRequest,
|
||||
UserAuthn,
|
||||
)
|
||||
from .color import ChromaticColor, NeutralColor, ThemeColorsBase, BUILTIN_DEFAULT_COLORS
|
||||
from .theme_preset import (
|
||||
ThemePreset, ThemePresetBase,
|
||||
|
||||
@@ -296,7 +296,7 @@ class UserSettingResponse(SQLModelBase):
|
||||
timezone: int
|
||||
"""时区"""
|
||||
|
||||
authn: "list[AuthnResponse] | None" = None
|
||||
authn: "list[AuthnDetailResponse] | None" = None
|
||||
"""认证信息"""
|
||||
|
||||
group_expires: datetime | None = None
|
||||
@@ -448,7 +448,7 @@ class UserAdminDetailResponse(UserPublic):
|
||||
|
||||
# 前向引用导入
|
||||
from .group import GroupClaims, GroupResponse # noqa: E402
|
||||
from .user_authn import AuthnResponse # noqa: E402
|
||||
from .user_authn import AuthnDetailResponse # noqa: E402
|
||||
|
||||
# 更新前向引用
|
||||
JWTPayload.model_rebuild()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
from uuid import UUID
|
||||
|
||||
@@ -13,14 +14,46 @@ if TYPE_CHECKING:
|
||||
|
||||
# ==================== DTO 模型 ====================
|
||||
|
||||
class AuthnResponse(SQLModelBase):
|
||||
"""WebAuthn 响应 DTO"""
|
||||
class AuthnFinishRequest(SQLModelBase):
|
||||
"""WebAuthn 注册完成请求 DTO"""
|
||||
|
||||
id: str
|
||||
"""凭证ID"""
|
||||
credential: str
|
||||
"""前端 navigator.credentials.create() 返回的 JSON 字符串"""
|
||||
|
||||
fingerprint: str
|
||||
"""凭证指纹"""
|
||||
name: str | None = None
|
||||
"""用户自定义的凭证名称"""
|
||||
|
||||
|
||||
class AuthnDetailResponse(SQLModelBase):
|
||||
"""WebAuthn 凭证详情响应 DTO"""
|
||||
|
||||
id: int
|
||||
"""凭证数据库 ID"""
|
||||
|
||||
credential_id: str
|
||||
"""凭证 ID(Base64URL 编码)"""
|
||||
|
||||
name: str | None = None
|
||||
"""用户自定义的凭证名称"""
|
||||
|
||||
credential_device_type: str
|
||||
"""凭证设备类型"""
|
||||
|
||||
credential_backed_up: bool
|
||||
"""凭证是否已备份"""
|
||||
|
||||
transports: str | None = None
|
||||
"""支持的传输方式"""
|
||||
|
||||
created_at: datetime
|
||||
"""创建时间"""
|
||||
|
||||
|
||||
class AuthnRenameRequest(SQLModelBase):
|
||||
"""WebAuthn 凭证重命名请求 DTO"""
|
||||
|
||||
name: str = Field(max_length=100)
|
||||
"""新的凭证名称"""
|
||||
|
||||
|
||||
# ==================== 数据库模型 ====================
|
||||
@@ -29,10 +62,10 @@ class UserAuthn(SQLModelBase, TableBaseMixin):
|
||||
"""用户 WebAuthn 凭证模型,与 User 为多对一关系"""
|
||||
|
||||
credential_id: str = Field(max_length=255, unique=True, index=True)
|
||||
"""凭证 ID,Base64 编码"""
|
||||
"""凭证 ID,Base64URL 编码"""
|
||||
|
||||
credential_public_key: str = Field(sa_column=Column(Text))
|
||||
"""凭证公钥,Base64 编码"""
|
||||
"""凭证公钥,Base64URL 编码"""
|
||||
|
||||
sign_count: int = Field(default=0, ge=0)
|
||||
"""签名计数器,用于防重放攻击"""
|
||||
@@ -59,3 +92,15 @@ class UserAuthn(SQLModelBase, TableBaseMixin):
|
||||
|
||||
# 关系
|
||||
user: "User" = Relationship(back_populates="authns")
|
||||
|
||||
def to_detail_response(self) -> AuthnDetailResponse:
|
||||
"""转换为详情响应 DTO"""
|
||||
return AuthnDetailResponse(
|
||||
id=self.id,
|
||||
credential_id=self.credential_id,
|
||||
name=self.name,
|
||||
credential_device_type=self.credential_device_type,
|
||||
credential_backed_up=self.credential_backed_up,
|
||||
transports=self.transports,
|
||||
created_at=self.created_at,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user