Files
disknext/routers/api/v1/admin/__init__.py

324 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, status
from loguru import logger as l
from middleware.auth import admin_required
from middleware.dependencies import SessionDep
from models import (
User, ResponseBase,
Setting, Object, ObjectType, Share, AdminSummaryResponse, MetricsSummary, LicenseInfo, VersionInfo,
)
from models.base import SQLModelBase
from models.setting import (
SettingItem, SettingsListResponse, SettingsUpdateRequest, SettingsUpdateResponse,
)
from models.setting import SettingsType
from utils import http_exceptions
from utils.conf import appmeta
from .file import admin_file_router
from .group import admin_group_router
from .policy import admin_policy_router
from .share import admin_share_router
from .task import admin_task_router
from .user import admin_user_router
from .vas import admin_vas_router
class Aria2TestRequest(SQLModelBase):
"""Aria2 测试请求 DTO"""
rpc_url: str
"""RPC 地址"""
secret: str | None = None
"""RPC 密钥"""
# 管理员根目录 /api/admin
admin_router = APIRouter(
prefix="/admin",
tags=["admin"],
)
admin_router.include_router(admin_group_router)
admin_router.include_router(admin_user_router)
admin_router.include_router(admin_file_router)
admin_router.include_router(admin_policy_router)
admin_router.include_router(admin_share_router)
admin_router.include_router(admin_task_router)
admin_router.include_router(admin_vas_router)
# 离线下载 /api/admin/aria2
admin_aria2_router = APIRouter(
prefix='/admin/aria2',
tags=['admin', 'admin_aria2']
)
@admin_router.get(
path='/',
summary='自己是否为管理员',
dependencies=[Depends(admin_required)],
status_code=status.HTTP_204_NO_CONTENT
)
async def is_admin() -> None:
"""
检查当前用户是否具有管理员权限。
如果用户是管理员,则返回 204 No Content 响应;否则返回 403 Forbidden 错误。
Returns:
None: 无内容响应
"""
return None
@admin_router.get(
path='/summary',
summary='获取站点概况',
description='Get site summary information',
dependencies=[Depends(admin_required)],
)
async def router_admin_get_summary(session: SessionDep) -> AdminSummaryResponse:
"""
获取站点概况信息,包括用户数、分享数、文件数等统计指标。
响应数据结构:
- metrics_summary: 统计摘要(日期列表、每日增量、总计)
- site_urls: 站点URL列表
- license: 许可证信息(过期时间、签发时间、域名配置)
- version: 版本信息版本号、是否Pro、提交哈希
Returns:
AdminSummaryResponse: 包含站点概况信息的响应模型。
"""
# 统计最近 14 天的数据
days_count = 14
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
dates: list[datetime] = []
files: list[int] = []
users: list[int] = []
shares: list[int] = []
# 从 11 天前到今天
for i in range(days_count - 1, -1, -1):
day_start = today_start - timedelta(days=i)
day_end = day_start + timedelta(days=1)
dates.append(day_start)
# 统计每日新增
file_count = await Object.count(
session,
Object.type == ObjectType.FILE,
created_after_datetime=day_start,
created_before_datetime=day_end,
)
user_count = await User.count(
session,
created_after_datetime=day_start,
created_before_datetime=day_end,
)
share_count = await Share.count(
session,
created_after_datetime=day_start,
created_before_datetime=day_end,
)
files.append(file_count)
users.append(user_count)
shares.append(share_count)
# 统计总数
file_total = await Object.count(session, Object.type == ObjectType.FILE)
user_total = await User.count(session)
share_total = await Share.count(session)
entities_total = await Object.count(session)
metrics_summary = MetricsSummary(
dates=dates,
files=files,
users=users,
shares=shares,
file_total=file_total,
user_total=user_total,
share_total=share_total,
entities_total=entities_total,
generated_at=now,
)
# 获取站点 URL从设置读取
site_urls: list[str] = []
site_url_setting = await Setting.get(
session,
(Setting.type == SettingsType.BASIC) & (Setting.name == "siteURL"),
)
if site_url_setting and site_url_setting.value:
site_urls.append(site_url_setting.value)
# 许可证信息(从设置读取或使用默认值)
license_info = LicenseInfo(
expired_at=now + timedelta(days=365),
signed_at=now,
root_domains=[],
domains=[],
vol_domains=[],
)
# 版本信息
version_info = VersionInfo(
version=appmeta.BackendVersion,
pro=appmeta.IsPro,
commit="dev",
)
return AdminSummaryResponse(
metrics_summary=metrics_summary,
site_urls=site_urls,
license=license_info,
version=version_info,
)
@admin_router.get(
path='/news',
summary='获取社区新闻',
description='Get community news',
dependencies=[Depends(admin_required)],
)
def router_admin_get_news() -> ResponseBase:
"""
获取社区新闻信息,包括最新的动态和公告。
Returns:
ResponseBase: 包含社区新闻信息的响应模型。
"""
http_exceptions.raise_not_implemented()
@admin_router.patch(
path='/settings',
summary='更新设置',
description='Update settings',
dependencies=[Depends(admin_required)],
)
async def router_admin_update_settings(
session: SessionDep,
request: SettingsUpdateRequest,
) -> SettingsUpdateResponse:
"""
批量更新站点设置。
:param session: 数据库会话
:param request: 更新请求,设置项列表
:return: 更新结果
"""
updated_count = 0
created_count = 0
for item in request.settings:
existing = await Setting.get(
session,
(Setting.type == item.type) & (Setting.name == item.name)
)
if existing:
existing.value = item.value
await existing.save(session)
updated_count += 1
else:
new_setting = Setting(type=item.type, name=item.name, value=item.value)
await new_setting.save(session)
created_count += 1
l.info(f"管理员更新了 {updated_count} 个设置项,新建了 {created_count} 个设置项")
return SettingsUpdateResponse(updated=updated_count, created=created_count)
@admin_router.get(
path='/settings',
summary='获取设置',
description='Get settings',
dependencies=[Depends(admin_required)],
)
async def router_admin_get_settings(
session: SessionDep,
type: str | None = None,
name: str | None = None,
) -> SettingsListResponse:
"""
获取站点设置,支持按类型和名称筛选。
:param session: 数据库会话
:param type: 设置类型(可选筛选条件)
:param name: 设置名称(可选筛选条件)
:return: 设置项列表
"""
# 构建查询条件
conditions = []
if type:
conditions.append(Setting.type == type)
if name:
conditions.append(Setting.name == name)
if conditions:
condition = conditions[0]
for c in conditions[1:]:
condition = condition & c
else:
condition = None
settings = await Setting.get(session, condition, fetch_mode="all")
# 转换为 DTO
setting_items = [
SettingItem(type=s.type, name=s.name, value=s.value)
for s in settings
]
return SettingsListResponse(settings=setting_items, total=len(setting_items))
@admin_aria2_router.post(
path='/test',
summary='测试 Aria2 连接',
description='Test Aria2 RPC connection',
dependencies=[Depends(admin_required)]
)
async def router_admin_aira2_test(
request: Aria2TestRequest,
) -> ResponseBase:
"""
测试 Aria2 RPC 连接。
:param request: 测试请求
:return: 测试结果
"""
import aiohttp
try:
payload = {
"jsonrpc": "2.0",
"id": "test",
"method": "aria2.getVersion",
"params": [f"token:{request.secret}"] if request.secret else [],
}
async with aiohttp.ClientSession() as client:
async with client.post(request.rpc_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
return ResponseBase(
code=400,
msg=f"连接失败HTTP {resp.status}"
)
result = await resp.json()
if "error" in result:
return ResponseBase(
code=400,
msg=f"Aria2 错误: {result['error']['message']}"
)
version = result.get("result", {}).get("version", "unknown")
return ResponseBase(data={
"connected": True,
"version": version,
})
except Exception as e:
return ResponseBase(code=400, msg=f"连接失败: {str(e)}")