Files
disknext/routers/api/v1/webdav/__init__.py
于小丘 40b6a31c98
All checks were successful
Test / test (push) Successful in 2m14s
feat: implement WebDAV protocol support with WsgiDAV + account management API
Add complete WebDAV support: management REST API (CRUD accounts at /api/v1/webdav/accounts)
and DAV protocol endpoint (/dav) using WsgiDAV + a2wsgi bridge for client access via
HTTP Basic Auth. Includes Redis+TTLCache auth caching and integration tests (24 cases).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 15:19:29 +08:00

208 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends
from loguru import logger as l
from middleware.auth import auth_required
from middleware.dependencies import SessionDep
from sqlmodels import (
Object,
User,
WebDAV,
WebDAVAccountResponse,
WebDAVCreateRequest,
WebDAVUpdateRequest,
)
from service.redis.webdav_auth_cache import WebDAVAuthCache
from utils import http_exceptions
from utils.password.pwd import Password
webdav_router = APIRouter(
prefix='/webdav',
tags=["webdav"],
)
def _check_webdav_enabled(user: User) -> None:
"""检查用户组是否启用了 WebDAV 功能"""
if not user.group.web_dav_enabled:
http_exceptions.raise_forbidden("WebDAV 功能未启用")
def _to_response(account: WebDAV) -> WebDAVAccountResponse:
"""将 WebDAV 数据库模型转换为响应 DTO"""
return WebDAVAccountResponse(
id=account.id,
name=account.name,
root=account.root,
readonly=account.readonly,
use_proxy=account.use_proxy,
created_at=str(account.created_at),
updated_at=str(account.updated_at),
)
@webdav_router.get(
path='/accounts',
summary='获取账号列表',
)
async def list_accounts(
session: SessionDep,
user: Annotated[User, Depends(auth_required)],
) -> list[WebDAVAccountResponse]:
"""
列出当前用户所有 WebDAV 账户
认证JWT Bearer Token
"""
_check_webdav_enabled(user)
user_id: UUID = user.id
accounts: list[WebDAV] = await WebDAV.get(
session,
WebDAV.user_id == user_id,
fetch_mode="all",
)
return [_to_response(a) for a in accounts]
@webdav_router.post(
path='/accounts',
summary='创建账号',
status_code=201,
)
async def create_account(
session: SessionDep,
user: Annotated[User, Depends(auth_required)],
request: WebDAVCreateRequest,
) -> WebDAVAccountResponse:
"""
创建 WebDAV 账户
认证JWT Bearer Token
错误处理:
- 403: WebDAV 功能未启用
- 400: 根目录路径不存在或不是目录
- 409: 账户名已存在
"""
_check_webdav_enabled(user)
user_id: UUID = user.id
# 验证账户名唯一
existing = await WebDAV.get(
session,
(WebDAV.name == request.name) & (WebDAV.user_id == user_id),
)
if existing:
http_exceptions.raise_conflict("账户名已存在")
# 验证 root 路径存在且为目录
root_obj = await Object.get_by_path(session, user_id, request.root)
if not root_obj or not root_obj.is_folder:
http_exceptions.raise_bad_request("根目录路径不存在或不是目录")
# 创建账户
account = WebDAV(
name=request.name,
password=Password.hash(request.password),
root=request.root,
readonly=request.readonly,
use_proxy=request.use_proxy,
user_id=user_id,
)
account = await account.save(session)
l.info(f"用户 {user_id} 创建 WebDAV 账户: {account.name}")
return _to_response(account)
@webdav_router.patch(
path='/accounts/{account_id}',
summary='更新账号',
)
async def update_account(
session: SessionDep,
user: Annotated[User, Depends(auth_required)],
account_id: int,
request: WebDAVUpdateRequest,
) -> WebDAVAccountResponse:
"""
更新 WebDAV 账户
认证JWT Bearer Token
错误处理:
- 403: WebDAV 功能未启用
- 404: 账户不存在
- 400: 根目录路径不存在或不是目录
"""
_check_webdav_enabled(user)
user_id: UUID = user.id
account = await WebDAV.get(
session,
(WebDAV.id == account_id) & (WebDAV.user_id == user_id),
)
if not account:
http_exceptions.raise_not_found("WebDAV 账户不存在")
# 验证 root 路径
if request.root is not None:
root_obj = await Object.get_by_path(session, user_id, request.root)
if not root_obj or not root_obj.is_folder:
http_exceptions.raise_bad_request("根目录路径不存在或不是目录")
# 密码哈希后原地替换update() 会通过 model_dump(exclude_unset=True) 只取已设置字段
is_password_changed = request.password is not None
if is_password_changed:
request.password = Password.hash(request.password)
account = await account.update(session, request)
# 密码变更时清除认证缓存
if is_password_changed:
await WebDAVAuthCache.invalidate_account(user_id, account.name)
l.info(f"用户 {user_id} 更新 WebDAV 账户: {account.name}")
return _to_response(account)
@webdav_router.delete(
path='/accounts/{account_id}',
summary='删除账号',
status_code=204,
)
async def delete_account(
session: SessionDep,
user: Annotated[User, Depends(auth_required)],
account_id: int,
) -> None:
"""
删除 WebDAV 账户
认证JWT Bearer Token
错误处理:
- 403: WebDAV 功能未启用
- 404: 账户不存在
"""
_check_webdav_enabled(user)
user_id: UUID = user.id
account = await WebDAV.get(
session,
(WebDAV.id == account_id) & (WebDAV.user_id == user_id),
)
if not account:
http_exceptions.raise_not_found("WebDAV 账户不存在")
account_name = account.name
await WebDAV.delete(session, account)
# 清除认证缓存
await WebDAVAuthCache.invalidate_account(user_id, account_name)
l.info(f"用户 {user_id} 删除 WebDAV 账户: {account_name}")