Some checks failed
Test / test (push) Failing after 3m47s
- 替换 Field(max_length=X) 为 StrX/TextX 类型别名(21 个 sqlmodels 文件) - 替换 get + 404 检查为 get_exist_one()(17 个路由文件,约 50 处) - 替换 save + session.refresh 为 save(load=...) - 替换 session.add + commit 为 save()(dav/provider.py) - 更新所有依赖至最新版本 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
506 lines
16 KiB
Python
506 lines
16 KiB
Python
from datetime import datetime
|
||
from uuid import UUID
|
||
|
||
from loguru import logger as l
|
||
from sqlalchemy import update as sql_update
|
||
from sqlalchemy.sql.functions import func
|
||
from middleware.dependencies import SessionDep
|
||
|
||
from .local_storage import LocalStorageService
|
||
from .s3_storage import S3StorageService
|
||
from sqlmodels import (
|
||
Object,
|
||
PhysicalFile,
|
||
Policy,
|
||
PolicyType,
|
||
User,
|
||
)
|
||
|
||
|
||
async def adjust_user_storage(
|
||
session: SessionDep,
|
||
user_id: UUID,
|
||
delta: int,
|
||
commit: bool = True,
|
||
) -> None:
|
||
"""
|
||
原子更新用户已用存储空间
|
||
|
||
使用 SQL UPDATE SET storage = GREATEST(0, storage + delta) 避免竞态条件。
|
||
|
||
:param session: 数据库会话
|
||
:param user_id: 用户UUID
|
||
:param delta: 变化量(正数增加,负数减少)
|
||
:param commit: 是否立即提交
|
||
"""
|
||
if delta == 0:
|
||
return
|
||
|
||
stmt = (
|
||
sql_update(User)
|
||
.where(User.id == user_id)
|
||
.values(storage=func.greatest(0, User.storage + delta))
|
||
)
|
||
await session.execute(stmt)
|
||
|
||
if commit:
|
||
await session.commit()
|
||
|
||
l.debug(f"用户 {user_id} 存储配额变更: {'+' if delta > 0 else ''}{delta} bytes")
|
||
|
||
|
||
# ==================== 软删除 ====================
|
||
|
||
async def soft_delete_objects(
|
||
session: SessionDep,
|
||
objects: list[Object],
|
||
) -> int:
|
||
"""
|
||
软删除对象列表
|
||
|
||
只标记顶层对象:设置 deleted_at、保存原 parent_id 到 deleted_original_parent_id、
|
||
将 parent_id 置 NULL 脱离文件树。子对象保持不变,物理文件不移动。
|
||
|
||
:param session: 数据库会话
|
||
:param objects: 待软删除的对象列表
|
||
:return: 软删除的对象数量
|
||
"""
|
||
deleted_count = 0
|
||
now = datetime.now()
|
||
|
||
for obj in objects:
|
||
obj.deleted_at = now
|
||
obj.deleted_original_parent_id = obj.parent_id
|
||
obj.parent_id = None
|
||
await obj.save(session, commit=False, refresh=False)
|
||
deleted_count += 1
|
||
|
||
await session.commit()
|
||
return deleted_count
|
||
|
||
|
||
# ==================== 恢复 ====================
|
||
|
||
async def _resolve_name_conflict(
|
||
session: SessionDep,
|
||
user_id: UUID,
|
||
parent_id: UUID,
|
||
name: str,
|
||
) -> str:
|
||
"""
|
||
解决同名冲突,返回不冲突的名称
|
||
|
||
命名规则:原名称 → 原名称 (1) → 原名称 (2) → ...
|
||
对于有扩展名的文件:name.ext → name (1).ext → name (2).ext → ...
|
||
|
||
:param session: 数据库会话
|
||
:param user_id: 用户UUID
|
||
:param parent_id: 父目录UUID
|
||
:param name: 原始名称
|
||
:return: 不冲突的名称
|
||
"""
|
||
existing = await Object.get(
|
||
session,
|
||
(Object.owner_id == user_id) &
|
||
(Object.parent_id == parent_id) &
|
||
(Object.name == name) &
|
||
(Object.deleted_at == None)
|
||
)
|
||
if not existing:
|
||
return name
|
||
|
||
# 分离文件名和扩展名
|
||
if '.' in name:
|
||
base, ext = name.rsplit('.', 1)
|
||
ext = f".{ext}"
|
||
else:
|
||
base = name
|
||
ext = ""
|
||
|
||
counter = 1
|
||
while True:
|
||
new_name = f"{base} ({counter}){ext}"
|
||
existing = await Object.get(
|
||
session,
|
||
(Object.owner_id == user_id) &
|
||
(Object.parent_id == parent_id) &
|
||
(Object.name == new_name) &
|
||
(Object.deleted_at == None)
|
||
)
|
||
if not existing:
|
||
return new_name
|
||
counter += 1
|
||
|
||
|
||
async def restore_objects(
|
||
session: SessionDep,
|
||
objects: list[Object],
|
||
user_id: UUID,
|
||
) -> int:
|
||
"""
|
||
从回收站恢复对象
|
||
|
||
检查原父目录是否存在且未删除:
|
||
- 存在 → 恢复到原位置
|
||
- 不存在 → 恢复到用户根目录
|
||
处理同名冲突(自动重命名)。
|
||
|
||
:param session: 数据库会话
|
||
:param objects: 待恢复的对象列表(必须是回收站中的顶层对象)
|
||
:param user_id: 用户UUID
|
||
:return: 恢复的对象数量
|
||
"""
|
||
root = await Object.get_root(session, user_id)
|
||
if not root:
|
||
raise ValueError("用户根目录不存在")
|
||
|
||
restored_count = 0
|
||
|
||
for obj in objects:
|
||
if not obj.deleted_at:
|
||
continue
|
||
|
||
# 确定恢复目标目录
|
||
target_parent_id = root.id
|
||
if obj.deleted_original_parent_id:
|
||
original_parent = await Object.get(
|
||
session,
|
||
(Object.id == obj.deleted_original_parent_id) & (Object.deleted_at == None)
|
||
)
|
||
if original_parent:
|
||
target_parent_id = original_parent.id
|
||
|
||
# 解决同名冲突
|
||
resolved_name = await _resolve_name_conflict(
|
||
session, user_id, target_parent_id, obj.name
|
||
)
|
||
|
||
# 恢复对象
|
||
obj.parent_id = target_parent_id
|
||
obj.deleted_at = None
|
||
obj.deleted_original_parent_id = None
|
||
if resolved_name != obj.name:
|
||
obj.name = resolved_name
|
||
await obj.save(session, commit=False, refresh=False)
|
||
restored_count += 1
|
||
|
||
await session.commit()
|
||
return restored_count
|
||
|
||
|
||
# ==================== 永久删除 ====================
|
||
|
||
async def _collect_file_entries_all(
|
||
session: SessionDep,
|
||
user_id: UUID,
|
||
root: Object,
|
||
) -> tuple[list[tuple[UUID, str, UUID]], int, int]:
|
||
"""
|
||
BFS 收集子树中所有文件的物理文件信息(包含已删除和未删除的子对象)
|
||
|
||
只执行 SELECT 查询,不触发 commit,ORM 对象始终有效。
|
||
|
||
:param session: 数据库会话
|
||
:param user_id: 用户UUID
|
||
:param root: 根对象
|
||
:return: (文件条目列表[(obj_id, name, physical_file_id)], 总对象数, 总文件大小)
|
||
"""
|
||
file_entries: list[tuple[UUID, str, UUID]] = []
|
||
total_count = 1
|
||
total_file_size = 0
|
||
|
||
# 根对象本身是文件
|
||
if root.is_file and root.physical_file_id:
|
||
file_entries.append((root.id, root.name, root.physical_file_id))
|
||
total_file_size += root.size
|
||
|
||
# BFS 遍历子目录(使用 get_all_children 包含所有子对象)
|
||
if root.is_folder:
|
||
queue: list[UUID] = [root.id]
|
||
while queue:
|
||
parent_id = queue.pop(0)
|
||
children = await Object.get_all_children(session, user_id, parent_id)
|
||
for child in children:
|
||
total_count += 1
|
||
if child.is_file and child.physical_file_id:
|
||
file_entries.append((child.id, child.name, child.physical_file_id))
|
||
total_file_size += child.size
|
||
elif child.is_folder:
|
||
queue.append(child.id)
|
||
|
||
return file_entries, total_count, total_file_size
|
||
|
||
|
||
async def permanently_delete_objects(
|
||
session: SessionDep,
|
||
objects: list[Object],
|
||
user_id: UUID,
|
||
) -> int:
|
||
"""
|
||
永久删除回收站中的对象
|
||
|
||
验证对象在回收站中(deleted_at IS NOT NULL),
|
||
BFS 收集所有子文件的 PhysicalFile 信息,
|
||
处理引用计数,引用为 0 时物理删除文件,
|
||
最后硬删除根 Object(CASCADE 自动清理子对象)。
|
||
|
||
:param session: 数据库会话
|
||
:param objects: 待永久删除的对象列表
|
||
:param user_id: 用户UUID
|
||
:return: 永久删除的对象数量
|
||
"""
|
||
total_deleted = 0
|
||
|
||
for obj in objects:
|
||
if not obj.deleted_at:
|
||
l.warning(f"对象 {obj.id} 不在回收站中,跳过永久删除")
|
||
continue
|
||
|
||
root_id = obj.id
|
||
file_entries, obj_count, total_file_size = await _collect_file_entries_all(
|
||
session, user_id, obj
|
||
)
|
||
|
||
# 处理 PhysicalFile 引用计数
|
||
for obj_id, obj_name, physical_file_id in file_entries:
|
||
physical_file = await PhysicalFile.get(session, PhysicalFile.id == physical_file_id)
|
||
if not physical_file:
|
||
continue
|
||
|
||
physical_file.decrement_reference()
|
||
|
||
if physical_file.can_be_deleted:
|
||
# 物理删除文件
|
||
policy = await Policy.get(session, Policy.id == physical_file.policy_id)
|
||
if policy:
|
||
try:
|
||
if policy.type == PolicyType.LOCAL:
|
||
storage_service = LocalStorageService(policy)
|
||
await storage_service.delete_file(physical_file.storage_path)
|
||
elif policy.type == PolicyType.S3:
|
||
s3_service = await S3StorageService.from_policy(policy)
|
||
await s3_service.delete_file(physical_file.storage_path)
|
||
l.debug(f"物理文件已删除: {obj_name}")
|
||
except Exception as e:
|
||
l.warning(f"物理删除文件失败: {obj_name}, 错误: {e}")
|
||
|
||
await PhysicalFile.delete(session, physical_file, commit=False)
|
||
l.debug(f"物理文件记录已删除: {physical_file.storage_path}")
|
||
else:
|
||
physical_file = await physical_file.save(session, commit=False)
|
||
l.debug(f"物理文件仍有 {physical_file.reference_count} 个引用: {physical_file.storage_path}")
|
||
|
||
# 更新用户存储配额
|
||
if total_file_size > 0:
|
||
await adjust_user_storage(session, user_id, -total_file_size, commit=False)
|
||
|
||
# 硬删除根对象,CASCADE 自动删除所有子对象(不立即提交,避免其余对象过期)
|
||
await Object.delete(session, condition=Object.id == root_id, commit=False)
|
||
|
||
total_deleted += obj_count
|
||
|
||
# 统一提交所有变更
|
||
await session.commit()
|
||
return total_deleted
|
||
|
||
|
||
# ==================== 旧接口(保持向后兼容) ====================
|
||
|
||
async def _collect_file_entries(
|
||
session: SessionDep,
|
||
user_id: UUID,
|
||
root: Object,
|
||
) -> tuple[list[tuple[UUID, str, UUID]], int, int]:
|
||
"""
|
||
BFS 收集子树中所有文件的物理文件信息
|
||
|
||
只执行 SELECT 查询,不触发 commit,ORM 对象始终有效。
|
||
|
||
:param session: 数据库会话
|
||
:param user_id: 用户UUID
|
||
:param root: 根对象
|
||
:return: (文件条目列表[(obj_id, name, physical_file_id)], 总对象数, 总文件大小)
|
||
"""
|
||
file_entries: list[tuple[UUID, str, UUID]] = []
|
||
total_count = 1
|
||
total_file_size = 0
|
||
|
||
# 根对象本身是文件
|
||
if root.is_file and root.physical_file_id:
|
||
file_entries.append((root.id, root.name, root.physical_file_id))
|
||
total_file_size += root.size
|
||
|
||
# BFS 遍历子目录
|
||
if root.is_folder:
|
||
queue: list[UUID] = [root.id]
|
||
while queue:
|
||
parent_id = queue.pop(0)
|
||
children = await Object.get_children(session, user_id, parent_id)
|
||
for child in children:
|
||
total_count += 1
|
||
if child.is_file and child.physical_file_id:
|
||
file_entries.append((child.id, child.name, child.physical_file_id))
|
||
total_file_size += child.size
|
||
elif child.is_folder:
|
||
queue.append(child.id)
|
||
|
||
return file_entries, total_count, total_file_size
|
||
|
||
|
||
async def delete_object_recursive(
|
||
session: SessionDep,
|
||
obj: Object,
|
||
user_id: UUID,
|
||
) -> int:
|
||
"""
|
||
删除对象及其所有子对象(硬删除)
|
||
|
||
两阶段策略:
|
||
1. BFS 只读收集所有文件的 PhysicalFile 信息
|
||
2. 批量处理引用计数(commit=False),最后删除根对象触发 CASCADE
|
||
|
||
:param session: 数据库会话
|
||
:param obj: 要删除的对象
|
||
:param user_id: 用户UUID
|
||
:return: 删除的对象数量
|
||
"""
|
||
# 阶段一:只读收集(不触发任何 commit)
|
||
root_id = obj.id
|
||
file_entries, total_count, total_file_size = await _collect_file_entries(session, user_id, obj)
|
||
|
||
# 阶段二:批量处理 PhysicalFile 引用(全部 commit=False)
|
||
for obj_id, obj_name, physical_file_id in file_entries:
|
||
physical_file = await PhysicalFile.get(session, PhysicalFile.id == physical_file_id)
|
||
if not physical_file:
|
||
continue
|
||
|
||
physical_file.decrement_reference()
|
||
|
||
if physical_file.can_be_deleted:
|
||
# 物理删除文件
|
||
policy = await Policy.get(session, Policy.id == physical_file.policy_id)
|
||
if policy:
|
||
try:
|
||
if policy.type == PolicyType.LOCAL:
|
||
storage_service = LocalStorageService(policy)
|
||
await storage_service.delete_file(physical_file.storage_path)
|
||
elif policy.type == PolicyType.S3:
|
||
options = await policy.awaitable_attrs.options
|
||
s3_service = S3StorageService(
|
||
policy,
|
||
region=options.s3_region if options else 'us-east-1',
|
||
is_path_style=options.s3_path_style if options else False,
|
||
)
|
||
await s3_service.delete_file(physical_file.storage_path)
|
||
l.debug(f"物理文件已删除: {obj_name}")
|
||
except Exception as e:
|
||
l.warning(f"物理删除文件失败: {obj_name}, 错误: {e}")
|
||
|
||
await PhysicalFile.delete(session, physical_file, commit=False)
|
||
l.debug(f"物理文件记录已删除: {physical_file.storage_path}")
|
||
else:
|
||
physical_file = await physical_file.save(session, commit=False)
|
||
l.debug(f"物理文件仍有 {physical_file.reference_count} 个引用: {physical_file.storage_path}")
|
||
|
||
# 阶段三:更新用户存储配额(与删除在同一事务中)
|
||
if total_file_size > 0:
|
||
await adjust_user_storage(session, user_id, -total_file_size, commit=False)
|
||
|
||
# 阶段四:删除根对象,数据库 CASCADE 自动删除所有子对象
|
||
# commit=True(默认),一次性提交所有 PhysicalFile 变更 + Object 删除 + 配额更新
|
||
await Object.delete(session, condition=Object.id == root_id)
|
||
|
||
return total_count
|
||
|
||
|
||
# ==================== 复制 ====================
|
||
|
||
async def _copy_object_recursive(
|
||
session: SessionDep,
|
||
src: Object,
|
||
dst_parent_id: UUID,
|
||
user_id: UUID,
|
||
) -> tuple[int, list[UUID], int]:
|
||
"""
|
||
递归复制对象(内部实现)
|
||
|
||
:param session: 数据库会话
|
||
:param src: 源对象
|
||
:param dst_parent_id: 目标父目录UUID
|
||
:param user_id: 用户UUID
|
||
:return: (复制数量, 新对象UUID列表, 复制的总文件大小)
|
||
"""
|
||
copied_count = 0
|
||
new_ids: list[UUID] = []
|
||
total_copied_size = 0
|
||
|
||
# 在 save() 之前保存需要的属性值,避免 commit 后对象过期导致懒加载失败
|
||
src_is_folder = src.is_folder
|
||
src_is_file = src.is_file
|
||
src_id = src.id
|
||
src_size = src.size
|
||
src_physical_file_id = src.physical_file_id
|
||
|
||
# 创建新的 Object 记录
|
||
new_obj = Object(
|
||
name=src.name,
|
||
type=src.type,
|
||
size=src.size,
|
||
password=src.password,
|
||
parent_id=dst_parent_id,
|
||
owner_id=user_id,
|
||
policy_id=src.policy_id,
|
||
physical_file_id=src.physical_file_id,
|
||
)
|
||
|
||
# 如果是文件,增加物理文件引用计数
|
||
if src_is_file and src_physical_file_id:
|
||
physical_file = await PhysicalFile.get(session, PhysicalFile.id == src_physical_file_id)
|
||
if physical_file:
|
||
physical_file.increment_reference()
|
||
physical_file = await physical_file.save(session)
|
||
total_copied_size += src_size
|
||
|
||
new_obj = await new_obj.save(session)
|
||
copied_count += 1
|
||
new_ids.append(new_obj.id)
|
||
|
||
# 如果是目录,递归复制子对象
|
||
if src_is_folder:
|
||
children = await Object.get_children(session, user_id, src_id)
|
||
for child in children:
|
||
child_count, child_ids, child_size = await _copy_object_recursive(
|
||
session, child, new_obj.id, user_id
|
||
)
|
||
copied_count += child_count
|
||
new_ids.extend(child_ids)
|
||
total_copied_size += child_size
|
||
|
||
return copied_count, new_ids, total_copied_size
|
||
|
||
|
||
async def copy_object_recursive(
|
||
session: SessionDep,
|
||
src: Object,
|
||
dst_parent_id: UUID,
|
||
user_id: UUID,
|
||
) -> tuple[int, list[UUID], int]:
|
||
"""
|
||
递归复制对象
|
||
|
||
对于文件:
|
||
- 增加 PhysicalFile 引用计数
|
||
- 创建新的 Object 记录指向同一 PhysicalFile
|
||
|
||
对于目录:
|
||
- 创建新目录
|
||
- 递归复制所有子对象
|
||
|
||
:param session: 数据库会话
|
||
:param src: 源对象
|
||
:param dst_parent_id: 目标父目录UUID
|
||
:param user_id: 用户UUID
|
||
:return: (复制数量, 新对象UUID列表, 复制的总文件大小)
|
||
"""
|
||
return await _copy_object_recursive(session, src, dst_parent_id, user_id)
|