优化路由结构

This commit is contained in:
2026-01-08 15:19:08 +08:00
parent 91208352f8
commit 01747cc3d7
9 changed files with 1628 additions and 1549 deletions

View File

@@ -0,0 +1,409 @@
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from loguru import logger as l
from sqlmodel import Field
from middleware.auth import admin_required
from middleware.dependencies import SessionDep
from models import (
Policy, PolicyType, ResponseBase,
Object, )
from models.base import SQLModelBase
from service.storage import DirectoryCreationError, LocalStorageService
admin_policy_router = APIRouter(
prefix='/policy',
tags=['admin', 'admin_policy']
)
class PolicyTestPathRequest(SQLModelBase):
"""测试本地路径请求 DTO"""
path: str = Field(max_length=512)
"""要测试的本地路径"""
class PolicyTestSlaveRequest(SQLModelBase):
"""测试从机通信请求 DTO"""
server: str = Field(max_length=255)
"""从机服务器地址"""
secret: str
"""从机通信密钥"""
class PolicyCreateRequest(SQLModelBase):
"""创建存储策略请求 DTO"""
name: str = Field(max_length=255)
"""策略名称"""
type: PolicyType
"""策略类型"""
server: str | None = Field(default=None, max_length=255)
"""服务器地址/本地路径(本地存储必填)"""
bucket_name: str | None = Field(default=None, max_length=255)
"""存储桶名称S3必填"""
is_private: bool = True
"""是否为私有空间"""
base_url: str | None = Field(default=None, max_length=255)
"""访问文件的基础URL"""
access_key: str | None = None
"""Access Key"""
secret_key: str | None = None
"""Secret Key"""
max_size: int = Field(default=0, ge=0)
"""允许上传的最大文件尺寸字节0表示不限制"""
auto_rename: bool = False
"""是否自动重命名"""
dir_name_rule: str | None = Field(default=None, max_length=255)
"""目录命名规则"""
file_name_rule: str | None = Field(default=None, max_length=255)
"""文件命名规则"""
is_origin_link_enable: bool = False
"""是否开启源链接访问"""
@admin_policy_router.get(
path='/list',
summary='列出存储策略',
description='List all storage policies',
dependencies=[Depends(admin_required)]
)
async def router_policy_list(
session: SessionDep,
page: int = 1,
page_size: int = 20,
) -> ResponseBase:
"""
获取所有存储策略列表。
:param session: 数据库会话
:param page: 页码
:param page_size: 每页数量
:return: 策略列表
"""
offset = (page - 1) * page_size
policies = await Policy.get(
session,
None,
fetch_mode="all",
offset=offset,
limit=page_size,
)
total = await Policy.count(session, None)
return ResponseBase(data={
"policies": [
{
"id": str(p.id),
"name": p.name,
"type": p.type.value,
"server": p.server,
"max_size": p.max_size,
"is_private": p.is_private,
}
for p in policies
],
"total": total,
})
@admin_policy_router.post(
path='/test/path',
summary='测试本地路径可用性',
description='Test local path availability',
dependencies=[Depends(admin_required)]
)
async def router_policy_test_path(
request: PolicyTestPathRequest,
) -> ResponseBase:
"""
测试本地存储路径是否可用。
:param request: 测试请求
:return: 测试结果
"""
import aiofiles.os
from pathlib import Path
path = Path(request.path).resolve()
# 检查路径是否存在
is_exists = await aiofiles.os.path.exists(str(path))
# 检查是否可写
is_writable = False
if is_exists:
test_file = path / ".write_test"
try:
async with aiofiles.open(str(test_file), 'w') as f:
await f.write("test")
await aiofiles.os.remove(str(test_file))
is_writable = True
except Exception:
pass
return ResponseBase(data={
"path": str(path),
"exists": is_exists,
"writable": is_writable,
})
@admin_policy_router.post(
path='/test/slave',
summary='测试从机通信',
description='Test slave node communication',
dependencies=[Depends(admin_required)]
)
async def router_policy_test_slave(
request: PolicyTestSlaveRequest,
) -> ResponseBase:
"""
测试从机RPC通信。
:param request: 测试请求
:return: 测试结果
"""
import aiohttp
try:
async with aiohttp.ClientSession() as client:
async with client.get(
f"{request.server}/api/slave/ping",
headers={"Authorization": request.secret},
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
return ResponseBase(data={"connected": True})
else:
return ResponseBase(
code=400,
msg=f"从机响应错误HTTP {resp.status}"
)
except Exception as e:
return ResponseBase(code=400, msg=f"连接失败: {str(e)}")
@admin_policy_router.post(
path='/',
summary='创建存储策略',
description='创建新的存储策略。对于本地存储策略,会自动创建物理目录。',
dependencies=[Depends(admin_required)]
)
async def router_policy_add_policy(
session: SessionDep,
request: PolicyCreateRequest,
) -> ResponseBase:
"""
创建存储策略端点
功能:
- 创建新的存储策略配置
- 对于 LOCAL 类型,自动创建物理目录
认证:
- 需要管理员权限
:param session: 数据库会话
:param request: 创建请求
:return: 创建结果
"""
# 验证本地存储策略必须指定 server 路径
if request.type == PolicyType.LOCAL:
if not request.server:
raise HTTPException(status_code=400, detail="本地存储策略必须指定 server 路径")
# 检查策略名称是否已存在
existing = await Policy.get(session, Policy.name == request.name)
if existing:
raise HTTPException(status_code=409, detail="策略名称已存在")
# 创建策略对象
policy = Policy(
name=request.name,
type=request.type,
server=request.server,
bucket_name=request.bucket_name,
is_private=request.is_private,
base_url=request.base_url,
access_key=request.access_key,
secret_key=request.secret_key,
max_size=request.max_size,
auto_rename=request.auto_rename,
dir_name_rule=request.dir_name_rule,
file_name_rule=request.file_name_rule,
is_origin_link_enable=request.is_origin_link_enable,
)
# 对于本地存储策略,创建物理目录
if policy.type == PolicyType.LOCAL:
try:
storage_service = LocalStorageService(policy)
await storage_service.ensure_base_directory()
l.info(f"已为本地存储策略 '{policy.name}' 创建目录: {policy.server}")
except DirectoryCreationError as e:
raise HTTPException(status_code=500, detail=f"创建存储目录失败: {e}")
# 保存到数据库
policy = await policy.save(session)
return ResponseBase(data={
"id": str(policy.id),
"name": policy.name,
"type": policy.type.value,
"server": policy.server,
})
@admin_policy_router.post(
path='/cors',
summary='创建跨域策略',
description='Create CORS policy for S3 storage',
dependencies=[Depends(admin_required)]
)
async def router_policy_add_cors() -> ResponseBase:
"""
创建CORS配置S3相关
此端点用于S3存储的跨域配置。
"""
# TODO: 实现S3 CORS配置
raise HTTPException(status_code=501, detail="S3 CORS配置暂未实现")
@admin_policy_router.post(
path='/scf',
summary='创建COS回调函数',
description='Create COS callback function',
dependencies=[Depends(admin_required)]
)
async def router_policy_add_scf() -> ResponseBase:
"""
创建COS回调函数。
此端点用于腾讯云COS的云函数回调配置。
"""
# TODO: 实现COS SCF配置
raise HTTPException(status_code=501, detail="COS回调函数配置暂未实现")
@admin_policy_router.get(
path='/{policy_id}/oauth',
summary='获取 OneDrive OAuth URL',
description='Get OneDrive OAuth URL',
dependencies=[Depends(admin_required)]
)
async def router_policy_onddrive_oauth(
session: SessionDep,
policy_id: UUID,
) -> ResponseBase:
"""
获取OneDrive OAuth授权URL。
:param session: 数据库会话
:param policy_id: 存储策略UUID
:return: OAuth URL
"""
policy = await Policy.get(session, Policy.id == policy_id)
if not policy:
raise HTTPException(status_code=404, detail="存储策略不存在")
# TODO: 实现OneDrive OAuth
raise HTTPException(status_code=501, detail="OneDrive OAuth暂未实现")
@admin_policy_router.get(
path='/{policy_id}',
summary='获取存储策略',
description='Get storage policy by ID',
dependencies=[Depends(admin_required)]
)
async def router_policy_get_policy(
session: SessionDep,
policy_id: UUID,
) -> ResponseBase:
"""
获取存储策略详情。
:param session: 数据库会话
:param policy_id: 存储策略UUID
:return: 策略详情
"""
policy = await Policy.get(session, Policy.id == policy_id, load=Policy.options)
if not policy:
raise HTTPException(status_code=404, detail="存储策略不存在")
# 获取使用此策略的用户组
groups = await policy.awaitable_attrs.groups
# 统计使用此策略的对象数量
object_count = await Object.count(session, Object.policy_id == policy_id)
return ResponseBase(data={
"id": str(policy.id),
"name": policy.name,
"type": policy.type.value,
"server": policy.server,
"bucket_name": policy.bucket_name,
"is_private": policy.is_private,
"base_url": policy.base_url,
"max_size": policy.max_size,
"auto_rename": policy.auto_rename,
"dir_name_rule": policy.dir_name_rule,
"file_name_rule": policy.file_name_rule,
"is_origin_link_enable": policy.is_origin_link_enable,
"options": policy.options.model_dump() if policy.options else None,
"groups": [{"id": str(g.id), "name": g.name} for g in groups],
"object_count": object_count,
})
@admin_policy_router.delete(
path='/{policy_id}',
summary='删除存储策略',
description='Delete storage policy by ID',
dependencies=[Depends(admin_required)]
)
async def router_policy_delete_policy(
session: SessionDep,
policy_id: UUID,
) -> ResponseBase:
"""
删除存储策略。
注意: 如果有文件使用此策略,会拒绝删除。
:param session: 数据库会话
:param policy_id: 存储策略UUID
:return: 删除结果
"""
policy = await Policy.get(session, Policy.id == policy_id)
if not policy:
raise HTTPException(status_code=404, detail="存储策略不存在")
# 检查是否有文件使用此策略
file_count = await Object.count(session, Object.policy_id == policy_id)
if file_count > 0:
raise HTTPException(
status_code=400,
detail=f"无法删除,还有 {file_count} 个文件使用此策略"
)
policy_name = policy.name
await Policy.delete(session, policy)
l.info(f"管理员删除了存储策略: {policy_name}")
return ResponseBase(data={"deleted": True})