Files
disknext/service/storage/object.py
于小丘 6c96c43bea
Some checks failed
Test / test (push) Failing after 3m47s
refactor: 统一 sqlmodel_ext 用法至官方推荐模式
- 替换 Field(max_length=X) 为 StrX/TextX 类型别名(21 个 sqlmodels 文件)
- 替换 get + 404 检查为 get_exist_one()(17 个路由文件,约 50 处)
- 替换 save + session.refresh 为 save(load=...)
- 替换 session.add + commit 为 save()(dav/provider.py)
- 更新所有依赖至最新版本

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 11:13:16 +08:00

506 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from uuid import UUID
from loguru import logger as l
from sqlalchemy import update as sql_update
from sqlalchemy.sql.functions import func
from middleware.dependencies import SessionDep
from .local_storage import LocalStorageService
from .s3_storage import S3StorageService
from sqlmodels import (
Object,
PhysicalFile,
Policy,
PolicyType,
User,
)
async def adjust_user_storage(
session: SessionDep,
user_id: UUID,
delta: int,
commit: bool = True,
) -> None:
"""
原子更新用户已用存储空间
使用 SQL UPDATE SET storage = GREATEST(0, storage + delta) 避免竞态条件。
:param session: 数据库会话
:param user_id: 用户UUID
:param delta: 变化量(正数增加,负数减少)
:param commit: 是否立即提交
"""
if delta == 0:
return
stmt = (
sql_update(User)
.where(User.id == user_id)
.values(storage=func.greatest(0, User.storage + delta))
)
await session.execute(stmt)
if commit:
await session.commit()
l.debug(f"用户 {user_id} 存储配额变更: {'+' if delta > 0 else ''}{delta} bytes")
# ==================== 软删除 ====================
async def soft_delete_objects(
session: SessionDep,
objects: list[Object],
) -> int:
"""
软删除对象列表
只标记顶层对象:设置 deleted_at、保存原 parent_id 到 deleted_original_parent_id、
将 parent_id 置 NULL 脱离文件树。子对象保持不变,物理文件不移动。
:param session: 数据库会话
:param objects: 待软删除的对象列表
:return: 软删除的对象数量
"""
deleted_count = 0
now = datetime.now()
for obj in objects:
obj.deleted_at = now
obj.deleted_original_parent_id = obj.parent_id
obj.parent_id = None
await obj.save(session, commit=False, refresh=False)
deleted_count += 1
await session.commit()
return deleted_count
# ==================== 恢复 ====================
async def _resolve_name_conflict(
session: SessionDep,
user_id: UUID,
parent_id: UUID,
name: str,
) -> str:
"""
解决同名冲突,返回不冲突的名称
命名规则:原名称 → 原名称 (1) → 原名称 (2) → ...
对于有扩展名的文件name.ext → name (1).ext → name (2).ext → ...
:param session: 数据库会话
:param user_id: 用户UUID
:param parent_id: 父目录UUID
:param name: 原始名称
:return: 不冲突的名称
"""
existing = await Object.get(
session,
(Object.owner_id == user_id) &
(Object.parent_id == parent_id) &
(Object.name == name) &
(Object.deleted_at == None)
)
if not existing:
return name
# 分离文件名和扩展名
if '.' in name:
base, ext = name.rsplit('.', 1)
ext = f".{ext}"
else:
base = name
ext = ""
counter = 1
while True:
new_name = f"{base} ({counter}){ext}"
existing = await Object.get(
session,
(Object.owner_id == user_id) &
(Object.parent_id == parent_id) &
(Object.name == new_name) &
(Object.deleted_at == None)
)
if not existing:
return new_name
counter += 1
async def restore_objects(
session: SessionDep,
objects: list[Object],
user_id: UUID,
) -> int:
"""
从回收站恢复对象
检查原父目录是否存在且未删除:
- 存在 → 恢复到原位置
- 不存在 → 恢复到用户根目录
处理同名冲突(自动重命名)。
:param session: 数据库会话
:param objects: 待恢复的对象列表(必须是回收站中的顶层对象)
:param user_id: 用户UUID
:return: 恢复的对象数量
"""
root = await Object.get_root(session, user_id)
if not root:
raise ValueError("用户根目录不存在")
restored_count = 0
for obj in objects:
if not obj.deleted_at:
continue
# 确定恢复目标目录
target_parent_id = root.id
if obj.deleted_original_parent_id:
original_parent = await Object.get(
session,
(Object.id == obj.deleted_original_parent_id) & (Object.deleted_at == None)
)
if original_parent:
target_parent_id = original_parent.id
# 解决同名冲突
resolved_name = await _resolve_name_conflict(
session, user_id, target_parent_id, obj.name
)
# 恢复对象
obj.parent_id = target_parent_id
obj.deleted_at = None
obj.deleted_original_parent_id = None
if resolved_name != obj.name:
obj.name = resolved_name
await obj.save(session, commit=False, refresh=False)
restored_count += 1
await session.commit()
return restored_count
# ==================== 永久删除 ====================
async def _collect_file_entries_all(
session: SessionDep,
user_id: UUID,
root: Object,
) -> tuple[list[tuple[UUID, str, UUID]], int, int]:
"""
BFS 收集子树中所有文件的物理文件信息(包含已删除和未删除的子对象)
只执行 SELECT 查询,不触发 commitORM 对象始终有效。
:param session: 数据库会话
:param user_id: 用户UUID
:param root: 根对象
:return: (文件条目列表[(obj_id, name, physical_file_id)], 总对象数, 总文件大小)
"""
file_entries: list[tuple[UUID, str, UUID]] = []
total_count = 1
total_file_size = 0
# 根对象本身是文件
if root.is_file and root.physical_file_id:
file_entries.append((root.id, root.name, root.physical_file_id))
total_file_size += root.size
# BFS 遍历子目录(使用 get_all_children 包含所有子对象)
if root.is_folder:
queue: list[UUID] = [root.id]
while queue:
parent_id = queue.pop(0)
children = await Object.get_all_children(session, user_id, parent_id)
for child in children:
total_count += 1
if child.is_file and child.physical_file_id:
file_entries.append((child.id, child.name, child.physical_file_id))
total_file_size += child.size
elif child.is_folder:
queue.append(child.id)
return file_entries, total_count, total_file_size
async def permanently_delete_objects(
session: SessionDep,
objects: list[Object],
user_id: UUID,
) -> int:
"""
永久删除回收站中的对象
验证对象在回收站中deleted_at IS NOT NULL
BFS 收集所有子文件的 PhysicalFile 信息,
处理引用计数,引用为 0 时物理删除文件,
最后硬删除根 ObjectCASCADE 自动清理子对象)。
:param session: 数据库会话
:param objects: 待永久删除的对象列表
:param user_id: 用户UUID
:return: 永久删除的对象数量
"""
total_deleted = 0
for obj in objects:
if not obj.deleted_at:
l.warning(f"对象 {obj.id} 不在回收站中,跳过永久删除")
continue
root_id = obj.id
file_entries, obj_count, total_file_size = await _collect_file_entries_all(
session, user_id, obj
)
# 处理 PhysicalFile 引用计数
for obj_id, obj_name, physical_file_id in file_entries:
physical_file = await PhysicalFile.get(session, PhysicalFile.id == physical_file_id)
if not physical_file:
continue
physical_file.decrement_reference()
if physical_file.can_be_deleted:
# 物理删除文件
policy = await Policy.get(session, Policy.id == physical_file.policy_id)
if policy:
try:
if policy.type == PolicyType.LOCAL:
storage_service = LocalStorageService(policy)
await storage_service.delete_file(physical_file.storage_path)
elif policy.type == PolicyType.S3:
s3_service = await S3StorageService.from_policy(policy)
await s3_service.delete_file(physical_file.storage_path)
l.debug(f"物理文件已删除: {obj_name}")
except Exception as e:
l.warning(f"物理删除文件失败: {obj_name}, 错误: {e}")
await PhysicalFile.delete(session, physical_file, commit=False)
l.debug(f"物理文件记录已删除: {physical_file.storage_path}")
else:
physical_file = await physical_file.save(session, commit=False)
l.debug(f"物理文件仍有 {physical_file.reference_count} 个引用: {physical_file.storage_path}")
# 更新用户存储配额
if total_file_size > 0:
await adjust_user_storage(session, user_id, -total_file_size, commit=False)
# 硬删除根对象CASCADE 自动删除所有子对象(不立即提交,避免其余对象过期)
await Object.delete(session, condition=Object.id == root_id, commit=False)
total_deleted += obj_count
# 统一提交所有变更
await session.commit()
return total_deleted
# ==================== 旧接口(保持向后兼容) ====================
async def _collect_file_entries(
session: SessionDep,
user_id: UUID,
root: Object,
) -> tuple[list[tuple[UUID, str, UUID]], int, int]:
"""
BFS 收集子树中所有文件的物理文件信息
只执行 SELECT 查询,不触发 commitORM 对象始终有效。
:param session: 数据库会话
:param user_id: 用户UUID
:param root: 根对象
:return: (文件条目列表[(obj_id, name, physical_file_id)], 总对象数, 总文件大小)
"""
file_entries: list[tuple[UUID, str, UUID]] = []
total_count = 1
total_file_size = 0
# 根对象本身是文件
if root.is_file and root.physical_file_id:
file_entries.append((root.id, root.name, root.physical_file_id))
total_file_size += root.size
# BFS 遍历子目录
if root.is_folder:
queue: list[UUID] = [root.id]
while queue:
parent_id = queue.pop(0)
children = await Object.get_children(session, user_id, parent_id)
for child in children:
total_count += 1
if child.is_file and child.physical_file_id:
file_entries.append((child.id, child.name, child.physical_file_id))
total_file_size += child.size
elif child.is_folder:
queue.append(child.id)
return file_entries, total_count, total_file_size
async def delete_object_recursive(
session: SessionDep,
obj: Object,
user_id: UUID,
) -> int:
"""
删除对象及其所有子对象(硬删除)
两阶段策略:
1. BFS 只读收集所有文件的 PhysicalFile 信息
2. 批量处理引用计数commit=False最后删除根对象触发 CASCADE
:param session: 数据库会话
:param obj: 要删除的对象
:param user_id: 用户UUID
:return: 删除的对象数量
"""
# 阶段一:只读收集(不触发任何 commit
root_id = obj.id
file_entries, total_count, total_file_size = await _collect_file_entries(session, user_id, obj)
# 阶段二:批量处理 PhysicalFile 引用(全部 commit=False
for obj_id, obj_name, physical_file_id in file_entries:
physical_file = await PhysicalFile.get(session, PhysicalFile.id == physical_file_id)
if not physical_file:
continue
physical_file.decrement_reference()
if physical_file.can_be_deleted:
# 物理删除文件
policy = await Policy.get(session, Policy.id == physical_file.policy_id)
if policy:
try:
if policy.type == PolicyType.LOCAL:
storage_service = LocalStorageService(policy)
await storage_service.delete_file(physical_file.storage_path)
elif policy.type == PolicyType.S3:
options = await policy.awaitable_attrs.options
s3_service = S3StorageService(
policy,
region=options.s3_region if options else 'us-east-1',
is_path_style=options.s3_path_style if options else False,
)
await s3_service.delete_file(physical_file.storage_path)
l.debug(f"物理文件已删除: {obj_name}")
except Exception as e:
l.warning(f"物理删除文件失败: {obj_name}, 错误: {e}")
await PhysicalFile.delete(session, physical_file, commit=False)
l.debug(f"物理文件记录已删除: {physical_file.storage_path}")
else:
physical_file = await physical_file.save(session, commit=False)
l.debug(f"物理文件仍有 {physical_file.reference_count} 个引用: {physical_file.storage_path}")
# 阶段三:更新用户存储配额(与删除在同一事务中)
if total_file_size > 0:
await adjust_user_storage(session, user_id, -total_file_size, commit=False)
# 阶段四:删除根对象,数据库 CASCADE 自动删除所有子对象
# commit=True默认一次性提交所有 PhysicalFile 变更 + Object 删除 + 配额更新
await Object.delete(session, condition=Object.id == root_id)
return total_count
# ==================== 复制 ====================
async def _copy_object_recursive(
session: SessionDep,
src: Object,
dst_parent_id: UUID,
user_id: UUID,
) -> tuple[int, list[UUID], int]:
"""
递归复制对象(内部实现)
:param session: 数据库会话
:param src: 源对象
:param dst_parent_id: 目标父目录UUID
:param user_id: 用户UUID
:return: (复制数量, 新对象UUID列表, 复制的总文件大小)
"""
copied_count = 0
new_ids: list[UUID] = []
total_copied_size = 0
# 在 save() 之前保存需要的属性值,避免 commit 后对象过期导致懒加载失败
src_is_folder = src.is_folder
src_is_file = src.is_file
src_id = src.id
src_size = src.size
src_physical_file_id = src.physical_file_id
# 创建新的 Object 记录
new_obj = Object(
name=src.name,
type=src.type,
size=src.size,
password=src.password,
parent_id=dst_parent_id,
owner_id=user_id,
policy_id=src.policy_id,
physical_file_id=src.physical_file_id,
)
# 如果是文件,增加物理文件引用计数
if src_is_file and src_physical_file_id:
physical_file = await PhysicalFile.get(session, PhysicalFile.id == src_physical_file_id)
if physical_file:
physical_file.increment_reference()
physical_file = await physical_file.save(session)
total_copied_size += src_size
new_obj = await new_obj.save(session)
copied_count += 1
new_ids.append(new_obj.id)
# 如果是目录,递归复制子对象
if src_is_folder:
children = await Object.get_children(session, user_id, src_id)
for child in children:
child_count, child_ids, child_size = await _copy_object_recursive(
session, child, new_obj.id, user_id
)
copied_count += child_count
new_ids.extend(child_ids)
total_copied_size += child_size
return copied_count, new_ids, total_copied_size
async def copy_object_recursive(
session: SessionDep,
src: Object,
dst_parent_id: UUID,
user_id: UUID,
) -> tuple[int, list[UUID], int]:
"""
递归复制对象
对于文件:
- 增加 PhysicalFile 引用计数
- 创建新的 Object 记录指向同一 PhysicalFile
对于目录:
- 创建新目录
- 递归复制所有子对象
:param session: 数据库会话
:param src: 源对象
:param dst_parent_id: 目标父目录UUID
:param user_id: 用户UUID
:return: (复制数量, 新对象UUID列表, 复制的总文件大小)
"""
return await _copy_object_recursive(session, src, dst_parent_id, user_id)