Refactor import statements for ResponseBase in API routers

- Updated import statements in the following files to import ResponseBase directly from models instead of models.response:
  - routers/api/v1/share/__init__.py
  - routers/api/v1/site/__init__.py
  - routers/api/v1/slave/__init__.py
  - routers/api/v1/tag/__init__.py
  - routers/api/v1/user/__init__.py
  - routers/api/v1/vas/__init__.py
  - routers/api/v1/webdav/__init__.py

Enhance user registration and related endpoints in user router

- Changed return type annotations from models.response.ResponseBase to models.ResponseBase in multiple functions.
- Updated return statements to reflect the new import structure.
- Improved documentation for clarity.

Add PhysicalFile model and storage service implementation

- Introduced PhysicalFile model to represent actual files on disk with reference counting logic.
- Created storage service module with local storage implementation, including file operations and error handling.
- Defined exceptions for storage operations to improve error handling.
- Implemented naming rule parser for generating file and directory names based on templates.

Update dependency management in uv.lock

- Added aiofiles version 25.1.0 to the project dependencies.
This commit is contained in:
2025-12-23 12:20:06 +08:00
parent 96bf447426
commit 446d219aca
26 changed files with 2155 additions and 399 deletions

View File

@@ -0,0 +1,20 @@
"""
存储服务模块
提供文件存储相关的服务,包括:
- 本地存储服务
- 命名规则解析器
- 存储异常定义
"""
from .exceptions import (
DirectoryCreationError,
FileReadError,
FileWriteError,
InvalidPathError,
StorageException,
StorageFileNotFoundError,
UploadSessionExpiredError,
UploadSessionNotFoundError,
)
from .local_storage import LocalStorageService
from .naming_rule import NamingContext, NamingRuleParser

View File

@@ -0,0 +1,45 @@
"""
存储服务异常定义
定义存储操作相关的异常类型,用于精确的错误处理和诊断。
"""
class StorageException(Exception):
"""存储服务基础异常"""
pass
class DirectoryCreationError(StorageException):
"""目录创建失败"""
pass
class StorageFileNotFoundError(StorageException):
"""文件不存在"""
pass
class FileWriteError(StorageException):
"""文件写入失败"""
pass
class FileReadError(StorageException):
"""文件读取失败"""
pass
class UploadSessionNotFoundError(StorageException):
"""上传会话不存在"""
pass
class UploadSessionExpiredError(StorageException):
"""上传会话已过期"""
pass
class InvalidPathError(StorageException):
"""无效的路径"""
pass

View File

@@ -0,0 +1,388 @@
"""
本地存储服务
负责本地文件系统的物理操作:
- 目录创建
- 文件写入/读取/删除
- 文件移动(软删除到 .trash
所有 IO 操作都使用 aiofiles 确保异步执行。
"""
from pathlib import Path
from uuid import UUID
import aiofiles
import aiofiles.os
from loguru import logger as l
from models.policy import Policy
from .exceptions import (
DirectoryCreationError,
FileReadError,
FileWriteError,
InvalidPathError,
StorageException,
StorageFileNotFoundError,
)
from .naming_rule import NamingContext, NamingRuleParser
class LocalStorageService:
"""
本地存储服务
实现本地文件系统的异步文件操作。
所有 IO 操作都使用 aiofiles 确保异步执行。
使用示例::
service = LocalStorageService(policy)
await service.ensure_base_directory()
dir_path, storage_name, full_path = await service.generate_file_path(
user_id=user.id,
original_filename="document.pdf",
)
await service.write_file(full_path, content)
"""
def __init__(self, policy: Policy):
"""
初始化本地存储服务
:param policy: 存储策略配置
:raises StorageException: 本地存储策略未指定 server 路径时抛出
"""
if not policy.server:
raise StorageException("本地存储策略必须指定 server 路径")
self._policy = policy
self._base_path = Path(policy.server).resolve()
@property
def base_path(self) -> Path:
"""存储根目录"""
return self._base_path
# ==================== 目录操作 ====================
async def ensure_base_directory(self) -> None:
"""
确保存储根目录存在
创建策略时调用,确保物理目录已创建。
:raises DirectoryCreationError: 目录创建失败时抛出
"""
try:
await aiofiles.os.makedirs(str(self._base_path), exist_ok=True)
l.info(f"已确保存储目录存在: {self._base_path}")
except OSError as e:
raise DirectoryCreationError(f"无法创建存储目录 {self._base_path}: {e}")
async def ensure_directory(self, relative_path: str) -> Path:
"""
确保相对路径的目录存在
:param relative_path: 相对于存储根目录的路径
:return: 完整的目录路径
:raises DirectoryCreationError: 目录创建失败时抛出
"""
try:
full_path = self._base_path / relative_path
await aiofiles.os.makedirs(str(full_path), exist_ok=True)
return full_path
except OSError as e:
raise DirectoryCreationError(f"无法创建目录 {relative_path}: {e}")
async def ensure_trash_directory(self, user_id: UUID) -> Path:
"""
确保用户的回收站目录存在
回收站路径格式: {storage_root}/{user_id}/.trash
:param user_id: 用户UUID
:return: 回收站目录路径
:raises DirectoryCreationError: 目录创建失败时抛出
"""
trash_path = self._base_path / str(user_id) / ".trash"
try:
await aiofiles.os.makedirs(str(trash_path), exist_ok=True)
return trash_path
except OSError as e:
raise DirectoryCreationError(f"无法创建回收站目录: {e}")
# ==================== 路径生成 ====================
async def generate_file_path(
self,
user_id: UUID,
original_filename: str,
) -> tuple[str, str, str]:
"""
根据命名规则生成文件存储路径
:param user_id: 用户UUID
:param original_filename: 原始文件名
:return: (相对目录路径, 存储文件名, 完整物理路径)
"""
context = NamingContext(
user_id=user_id,
original_filename=original_filename,
)
# 解析目录规则
dir_path = ""
if self._policy.dir_name_rule:
dir_path = NamingRuleParser.parse(self._policy.dir_name_rule, context)
# 解析文件名规则
if self._policy.auto_rename and self._policy.file_name_rule:
storage_name = NamingRuleParser.parse(self._policy.file_name_rule, context)
# 确保有扩展名
if '.' in original_filename and '.' not in storage_name:
ext = original_filename.rsplit('.', 1)[1]
storage_name = f"{storage_name}.{ext}"
else:
storage_name = original_filename
# 确保目录存在
if dir_path:
full_dir = await self.ensure_directory(dir_path)
else:
full_dir = self._base_path
full_path = str(full_dir / storage_name)
return dir_path, storage_name, full_path
# ==================== 文件写入 ====================
async def write_file(self, path: str, content: bytes) -> int:
"""
写入文件内容
:param path: 完整文件路径
:param content: 文件内容
:return: 写入的字节数
:raises FileWriteError: 写入失败时抛出
"""
try:
async with aiofiles.open(path, 'wb') as f:
await f.write(content)
return len(content)
except OSError as e:
raise FileWriteError(f"写入文件失败 {path}: {e}")
async def write_file_chunk(
self,
path: str,
content: bytes,
offset: int,
) -> int:
"""
写入文件分片
:param path: 完整文件路径
:param content: 分片内容
:param offset: 写入偏移量
:return: 写入的字节数
:raises FileWriteError: 写入失败时抛出
"""
try:
# 检查文件是否存在,决定打开模式
is_exists = await self.file_exists(path)
mode = 'r+b' if is_exists else 'wb'
async with aiofiles.open(path, mode) as f:
await f.seek(offset)
await f.write(content)
return len(content)
except OSError as e:
raise FileWriteError(f"写入文件分片失败 {path}: {e}")
async def create_empty_file(self, path: str) -> None:
"""
创建空白文件
:param path: 完整文件路径
:raises FileWriteError: 创建失败时抛出
"""
try:
async with aiofiles.open(path, 'wb'):
pass # 创建空文件
except OSError as e:
raise FileWriteError(f"创建空文件失败 {path}: {e}")
# ==================== 文件读取 ====================
async def read_file(self, path: str) -> bytes:
"""
读取完整文件
:param path: 完整文件路径
:return: 文件内容
:raises StorageFileNotFoundError: 文件不存在时抛出
:raises FileReadError: 读取失败时抛出
"""
if not await self.file_exists(path):
raise StorageFileNotFoundError(f"文件不存在: {path}")
try:
async with aiofiles.open(path, 'rb') as f:
return await f.read()
except OSError as e:
raise FileReadError(f"读取文件失败 {path}: {e}")
async def get_file_size(self, path: str) -> int:
"""
获取文件大小
:param path: 完整文件路径
:return: 文件大小(字节)
:raises StorageFileNotFoundError: 文件不存在时抛出
"""
if not await self.file_exists(path):
raise StorageFileNotFoundError(f"文件不存在: {path}")
stat = await aiofiles.os.stat(path)
return stat.st_size
async def file_exists(self, path: str) -> bool:
"""
检查文件是否存在
:param path: 完整文件路径
:return: 是否存在
"""
return await aiofiles.os.path.exists(path)
# ==================== 文件删除和移动 ====================
async def delete_file(self, path: str) -> None:
"""
删除文件(物理删除)
:param path: 完整文件路径
"""
if await self.file_exists(path):
try:
await aiofiles.os.remove(path)
l.debug(f"已删除文件: {path}")
except OSError as e:
l.warning(f"删除文件失败 {path}: {e}")
async def move_to_trash(
self,
source_path: str,
user_id: UUID,
object_id: UUID,
) -> str:
"""
将文件移动到回收站(软删除)
回收站中的文件名格式: {object_uuid}_{original_filename}
:param source_path: 源文件完整路径
:param user_id: 用户UUID
:param object_id: 对象UUID用于生成唯一的回收站文件名
:return: 回收站中的文件路径
:raises StorageFileNotFoundError: 源文件不存在时抛出
"""
if not await self.file_exists(source_path):
raise StorageFileNotFoundError(f"源文件不存在: {source_path}")
# 确保回收站目录存在
trash_dir = await self.ensure_trash_directory(user_id)
# 使用 object_id 作为回收站文件名前缀,避免冲突
source_filename = Path(source_path).name
trash_filename = f"{object_id}_{source_filename}"
trash_path = trash_dir / trash_filename
# 移动文件
try:
await aiofiles.os.rename(source_path, str(trash_path))
l.info(f"文件已移动到回收站: {source_path} -> {trash_path}")
return str(trash_path)
except OSError as e:
raise StorageException(f"移动文件到回收站失败: {e}")
async def restore_from_trash(
self,
trash_path: str,
restore_path: str,
) -> None:
"""
从回收站恢复文件
:param trash_path: 回收站中的文件路径
:param restore_path: 恢复目标路径
:raises StorageFileNotFoundError: 回收站文件不存在时抛出
"""
if not await self.file_exists(trash_path):
raise StorageFileNotFoundError(f"回收站文件不存在: {trash_path}")
# 确保目标目录存在
restore_dir = Path(restore_path).parent
await aiofiles.os.makedirs(str(restore_dir), exist_ok=True)
try:
await aiofiles.os.rename(trash_path, restore_path)
l.info(f"文件已从回收站恢复: {trash_path} -> {restore_path}")
except OSError as e:
raise StorageException(f"从回收站恢复文件失败: {e}")
async def empty_trash(self, user_id: UUID) -> int:
"""
清空用户回收站
:param user_id: 用户UUID
:return: 删除的文件数量
"""
trash_dir = self._base_path / str(user_id) / ".trash"
if not await aiofiles.os.path.exists(str(trash_dir)):
return 0
deleted_count = 0
try:
entries = await aiofiles.os.listdir(str(trash_dir))
for entry in entries:
file_path = trash_dir / entry
if await aiofiles.os.path.isfile(str(file_path)):
await aiofiles.os.remove(str(file_path))
deleted_count += 1
l.info(f"已清空用户 {user_id} 的回收站,删除 {deleted_count} 个文件")
except OSError as e:
l.warning(f"清空回收站时出错: {e}")
return deleted_count
# ==================== 路径验证 ====================
def validate_path(self, path: str) -> bool:
"""
验证路径是否在存储根目录下(防止路径遍历攻击)
:param path: 要验证的路径
:return: 路径是否有效
"""
try:
resolved = Path(path).resolve()
return str(resolved).startswith(str(self._base_path))
except (ValueError, OSError):
return False
def get_relative_path(self, full_path: str) -> str:
"""
获取相对于存储根目录的相对路径
:param full_path: 完整路径
:return: 相对路径
:raises InvalidPathError: 路径不在存储根目录下时抛出
"""
if not self.validate_path(full_path):
raise InvalidPathError(f"路径不在存储根目录下: {full_path}")
resolved = Path(full_path).resolve()
return str(resolved.relative_to(self._base_path))

View File

@@ -0,0 +1,144 @@
"""
命名规则解析器
将包含占位符的规则模板转换为实际的文件名/目录路径。
支持的占位符:
- {date}: 当前日期 YYYY-MM-DD
- {timestamp}: Unix 时间戳
- {year}: 年份 YYYY
- {month}: 月份 MM
- {day}: 日期 DD
- {hour}: 小时 HH
- {minute}: 分钟 MM
- {randomkey16}: 16位随机字符串
- {originname}: 原始文件名(不含扩展名)
- {ext}: 文件扩展名(不含点)
- {uid}: 用户UUID
- {uuid}: 新生成的UUID
"""
import re
import secrets
import string
from datetime import datetime
from uuid import UUID, uuid4
from models.base import SQLModelBase
class NamingContext(SQLModelBase):
"""
命名上下文
包含生成文件名/目录名所需的所有信息。
"""
user_id: UUID
"""用户UUID"""
original_filename: str
"""原始文件名(包含扩展名)"""
timestamp: datetime | None = None
"""时间戳,默认为当前时间"""
class NamingRuleParser:
"""
命名规则解析器
将包含占位符的规则模板转换为实际的文件名/目录路径。
使用示例::
context = NamingContext(
user_id=UUID("..."),
original_filename="document.pdf",
)
dir_path = NamingRuleParser.parse("{date}/{randomkey16}", context)
# -> "2025-12-23/a1b2c3d4e5f6g7h8"
file_name = NamingRuleParser.parse("{randomkey16}_{originname}.{ext}", context)
# -> "x9y8z7w6v5u4t3s2_document.pdf"
"""
# 支持的占位符正则
_PLACEHOLDER_PATTERN = re.compile(r'\{(\w+)\}')
# 随机字符集
_RANDOM_CHARS = string.ascii_lowercase + string.digits
@classmethod
def parse(cls, rule: str, context: NamingContext) -> str:
"""
解析命名规则,替换所有占位符
:param rule: 命名规则模板,如 "{date}/{randomkey16}"
:param context: 命名上下文
:return: 解析后的实际路径/文件名
"""
timestamp = context.timestamp or datetime.now()
# 解析原始文件名
origin_name, ext = cls._split_filename(context.original_filename)
# 占位符替换映射
replacements: dict[str, str] = {
'date': timestamp.strftime('%Y-%m-%d'),
'timestamp': str(int(timestamp.timestamp())),
'year': timestamp.strftime('%Y'),
'month': timestamp.strftime('%m'),
'day': timestamp.strftime('%d'),
'hour': timestamp.strftime('%H'),
'minute': timestamp.strftime('%M'),
'randomkey16': cls._generate_random_key(16),
'originname': origin_name,
'ext': ext,
'uid': str(context.user_id),
'uuid': str(uuid4()),
}
def replace_placeholder(match: re.Match[str]) -> str:
placeholder = match.group(1)
return replacements.get(placeholder, match.group(0))
return cls._PLACEHOLDER_PATTERN.sub(replace_placeholder, rule)
@classmethod
def _split_filename(cls, filename: str) -> tuple[str, str]:
"""
分离文件名和扩展名
:param filename: 完整文件名
:return: (文件名不含扩展名, 扩展名不含点)
"""
if '.' in filename:
parts = filename.rsplit('.', 1)
return parts[0], parts[1]
return filename, ''
@classmethod
def _generate_random_key(cls, length: int) -> str:
"""
生成随机字符串
:param length: 字符串长度
:return: 随机字符串
"""
return ''.join(secrets.choice(cls._RANDOM_CHARS) for _ in range(length))
@classmethod
def validate_rule(cls, rule: str) -> bool:
"""
验证命名规则是否有效
:param rule: 命名规则模板
:return: 是否有效
"""
valid_placeholders = {
'date', 'timestamp', 'year', 'month', 'day', 'hour', 'minute',
'randomkey16', 'originname', 'ext', 'uid', 'uuid',
}
placeholders = cls._PLACEHOLDER_PATTERN.findall(rule)
return all(p in valid_placeholders for p in placeholders)