Files
disknext/service/redis/challenge_store.py
于小丘 800c85bf8d feat: implement WebAuthn credential registration, login verification, and management
Complete the WebAuthn/Passkey flow that was previously stubbed out:
- Add ChallengeStore (Redis + TTLCache fallback) for challenge lifecycle
- Add RP config helper to extract rp_id/origin from site settings
- Fix registration start (exclude_credentials, user_id, challenge storage)
- Implement registration finish (verify + create UserAuthn & AuthIdentity)
- Add authentication options endpoint for Discoverable Credentials login
- Fix passkey login to use challenge_token and base64url encoding
- Add credential management endpoints (list/rename/delete)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:56:46 +08:00

69 lines
1.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WebAuthn Challenge 一次性存储
支持 Redis首选使用 GETDEL 原子操作)和内存 TTLCache降级
Challenge 存储后 5 分钟过期,取出即删除(防重放)。
"""
from typing import ClassVar
from cachetools import TTLCache
from loguru import logger as l
from . import RedisManager
# Challenge 过期时间(秒)
_CHALLENGE_TTL: int = 300
class ChallengeStore:
"""
WebAuthn Challenge 一次性存储管理器
根据 Redis 可用性自动选择存储后端:
- Redis 可用:使用 Redis GETDEL 原子操作
- Redis 不可用:使用内存 TTLCache仅单实例
Key 约定:
- 注册: ``reg:{user_id}``
- 登录: ``auth:{challenge_token}``
"""
_memory_cache: ClassVar[TTLCache[str, bytes]] = TTLCache(
maxsize=10000,
ttl=_CHALLENGE_TTL,
)
"""内存缓存降级方案"""
@classmethod
async def store(cls, key: str, challenge: bytes) -> None:
"""
存储 challengeTTL 5 分钟。
:param key: 存储键(如 ``reg:{user_id}`` 或 ``auth:{token}``
:param challenge: challenge 字节数据
"""
client = RedisManager.get_client()
if client is not None:
redis_key = f"webauthn_challenge:{key}"
await client.set(redis_key, challenge, ex=_CHALLENGE_TTL)
else:
cls._memory_cache[key] = challenge
@classmethod
async def retrieve_and_delete(cls, key: str) -> bytes | None:
"""
一次性取出并删除 challenge防重放
:param key: 存储键
:return: challenge 字节数据,过期或不存在时返回 None
"""
client = RedisManager.get_client()
if client is not None:
redis_key = f"webauthn_challenge:{key}"
result: bytes | None = await client.getdel(redis_key)
return result
else:
return cls._memory_cache.pop(key, None)