Files
disknext/routers/wopi/files/__init__.py
于小丘 ccadfe57cd
All checks were successful
Test / test (push) Successful in 1m45s
feat: migrate ORM base to sqlmodel-ext, add file viewers and WOPI integration
- Migrate SQLModel base classes, mixins, and database management to
  external sqlmodel-ext package; remove sqlmodels/base/, sqlmodels/mixin/,
  and sqlmodels/database.py
- Add file viewer/editor system with WOPI protocol support for
  collaborative editing (OnlyOffice, Collabora)
- Add enterprise edition license verification module (ee/)
- Add Dockerfile multi-stage build with Cython compilation support
- Add new dependencies: sqlmodel-ext, cryptography, whatthepatch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 14:23:17 +08:00

204 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WOPI 文件操作端点
实现 WOPI 协议的核心文件操作接口:
- CheckFileInfo: 获取文件元数据
- GetFile: 下载文件内容
- PutFile: 上传/更新文件内容
"""
from uuid import UUID
from fastapi import APIRouter, Query, Request, Response
from fastapi.responses import JSONResponse
from loguru import logger as l
from middleware.dependencies import SessionDep
from sqlmodels import Object, PhysicalFile, Policy, PolicyType, User, WopiFileInfo
from service.storage import LocalStorageService
from utils import http_exceptions
from utils.JWT.wopi_token import verify_wopi_token
wopi_files_router = APIRouter(prefix="/files", tags=["wopi"])
@wopi_files_router.get(
path='/{file_id}',
summary='WOPI CheckFileInfo',
description='返回文件的元数据信息。',
)
async def check_file_info(
session: SessionDep,
file_id: UUID,
access_token: str = Query(...),
) -> JSONResponse:
"""
WOPI CheckFileInfo 端点
认证WOPI access_tokenquery 参数)
返回 WOPI 规范的 PascalCase JSON。
"""
# 验证令牌
payload = verify_wopi_token(access_token)
if not payload or payload.file_id != file_id:
http_exceptions.raise_unauthorized("WOPI token 无效或文件不匹配")
# 获取文件
file_obj: Object | None = await Object.get(
session,
Object.id == file_id,
)
if not file_obj or not file_obj.is_file:
http_exceptions.raise_not_found("文件不存在")
# 获取用户信息
user: User | None = await User.get(session, User.id == payload.user_id)
user_name = user.nickname or user.email or str(payload.user_id) if user else str(payload.user_id)
# 构建响应
info = WopiFileInfo(
base_file_name=file_obj.name,
size=file_obj.size or 0,
owner_id=str(file_obj.owner_id),
user_id=str(payload.user_id),
user_friendly_name=user_name,
version=file_obj.updated_at.isoformat() if file_obj.updated_at else "",
user_can_write=payload.can_write,
read_only=not payload.can_write,
supports_update=payload.can_write,
)
return JSONResponse(content=info.to_wopi_dict())
@wopi_files_router.get(
path='/{file_id}/contents',
summary='WOPI GetFile',
description='返回文件的二进制内容。',
)
async def get_file(
session: SessionDep,
file_id: UUID,
access_token: str = Query(...),
) -> Response:
"""
WOPI GetFile 端点
认证WOPI access_tokenquery 参数)
返回文件的原始二进制内容。
"""
# 验证令牌
payload = verify_wopi_token(access_token)
if not payload or payload.file_id != file_id:
http_exceptions.raise_unauthorized("WOPI token 无效或文件不匹配")
# 获取文件
file_obj: Object | None = await Object.get(session, Object.id == file_id)
if not file_obj or not file_obj.is_file:
http_exceptions.raise_not_found("文件不存在")
# 获取物理文件
physical_file: PhysicalFile | None = await file_obj.awaitable_attrs.physical_file
if not physical_file or not physical_file.storage_path:
http_exceptions.raise_internal_error("文件存储路径丢失")
# 获取策略
policy: Policy | None = await Policy.get(session, Policy.id == file_obj.policy_id)
if not policy:
http_exceptions.raise_internal_error("存储策略不存在")
if policy.type == PolicyType.LOCAL:
storage_service = LocalStorageService(policy)
if not await storage_service.file_exists(physical_file.storage_path):
http_exceptions.raise_not_found("物理文件不存在")
import aiofiles
async with aiofiles.open(physical_file.storage_path, 'rb') as f:
content = await f.read()
return Response(
content=content,
media_type="application/octet-stream",
headers={"X-WOPI-ItemVersion": file_obj.updated_at.isoformat() if file_obj.updated_at else ""},
)
else:
http_exceptions.raise_not_implemented("S3 存储暂未实现")
@wopi_files_router.post(
path='/{file_id}/contents',
summary='WOPI PutFile',
description='更新文件内容。',
)
async def put_file(
session: SessionDep,
request: Request,
file_id: UUID,
access_token: str = Query(...),
) -> JSONResponse:
"""
WOPI PutFile 端点
认证WOPI access_tokenquery 参数,需要写权限)
接收请求体中的文件二进制内容并覆盖存储。
"""
# 验证令牌
payload = verify_wopi_token(access_token)
if not payload or payload.file_id != file_id:
http_exceptions.raise_unauthorized("WOPI token 无效或文件不匹配")
if not payload.can_write:
http_exceptions.raise_forbidden("没有写入权限")
# 获取文件
file_obj: Object | None = await Object.get(session, Object.id == file_id)
if not file_obj or not file_obj.is_file:
http_exceptions.raise_not_found("文件不存在")
# 获取物理文件
physical_file: PhysicalFile | None = await file_obj.awaitable_attrs.physical_file
if not physical_file or not physical_file.storage_path:
http_exceptions.raise_internal_error("文件存储路径丢失")
# 获取策略
policy: Policy | None = await Policy.get(session, Policy.id == file_obj.policy_id)
if not policy:
http_exceptions.raise_internal_error("存储策略不存在")
# 读取请求体
content = await request.body()
if policy.type == PolicyType.LOCAL:
import aiofiles
async with aiofiles.open(physical_file.storage_path, 'wb') as f:
await f.write(content)
# 更新文件大小
new_size = len(content)
old_size = file_obj.size or 0
file_obj.size = new_size
file_obj = await file_obj.save(session, commit=False)
# 更新物理文件大小
physical_file.size = new_size
await physical_file.save(session, commit=False)
# 更新用户存储配额
size_diff = new_size - old_size
if size_diff != 0:
from service.storage import adjust_user_storage
await adjust_user_storage(session, file_obj.owner_id, size_diff, commit=False)
await session.commit()
l.info(f"WOPI PutFile: file_id={file_id}, new_size={new_size}")
return JSONResponse(
content={"ItemVersion": file_obj.updated_at.isoformat() if file_obj.updated_at else ""},
status_code=200,
)
else:
http_exceptions.raise_not_implemented("S3 存储暂未实现")