Refactor code structure for improved readability and maintainability

This commit is contained in:
2025-11-27 20:56:48 +08:00
parent 83276c8b95
commit 1533d9e89c
42 changed files with 5282 additions and 330 deletions

View File

@@ -1,47 +1,58 @@
# my_project/models/user.py
from typing import Optional, TYPE_CHECKING
from datetime import datetime
from sqlmodel import Field, Relationship, UniqueConstraint
from .base import TableBase
from .database import get_session
from sqlmodel import select
# TYPE_CHECKING 用于解决循环导入问题,只在类型检查时导入
if TYPE_CHECKING:
from .group import Group
from .download import Download
from .file import File
from .folder import Folder
from .order import Order
from .share import Share
from .storage_pack import StoragePack
from .tag import Tag
from .task import Task
from .webdav import WebDAV
from .group import Group
from .download import Download
from .file import File
from .folder import Folder
from .order import Order
from .share import Share
from .storage_pack import StoragePack
from .tag import Tag
from .task import Task
from .webdav import WebDAV
class User(TableBase, table=True):
__tablename__ = 'users'
email: str = Field(max_length=100, unique=True, index=True, description="用户邮箱,唯一")
phone: str = Field(default=None, nullable=True, index=True, description="用户手机号,唯一")
email: str = Field(max_length=100, unique=True, index=True)
"""用户邮箱,唯一"""
nick: Optional[str] = Field(default=None, max_length=50, description="用户昵称")
password: str = Field(max_length=255, description="用户密码(加密后)")
status: Optional[bool] = Field(default=None, sa_column_kwargs={"server_default": "0"}, description="用户状态: True=正常, None=未激活, False=封禁")
storage: int = Field(default=0, sa_column_kwargs={"server_default": "0"}, description="已用存储空间(字节)")
two_factor: Optional[str] = Field(default=None, max_length=255, description="两步验证密钥")
avatar: Optional[str] = Field(default=None, max_length=255, description="头像地址")
options: Optional[str] = Field(default=None, description="用户个人设置 (JSON格式)")
authn: Optional[str] = Field(default=None, description="WebAuthn 凭证")
open_id: Optional[str] = Field(default=None, max_length=255, unique=True, index=True, description="第三方登录OpenID")
score: int = Field(default=0, sa_column_kwargs={"server_default": "0"}, description="用户积分")
group_expires: Optional[datetime] = Field(default=None, description="当前用户组过期时间")
phone: Optional[str] = Field(default=None, max_length=255, unique=True, index=True, description="手机号")
phone: str | None = Field(default=None, nullable=True, index=True)
"""用户手机号,唯一"""
nick: str | None = Field(default=None, max_length=50)
"""用户昵称"""
password: str = Field(max_length=255)
"""用户密码(加密后)"""
status: bool | None = Field(default=None, sa_column_kwargs={"server_default": "0"})
"""用户状态: True=正常, None=未激活, False=封禁"""
storage: int = Field(default=0, sa_column_kwargs={"server_default": "0"})
"""已用存储空间(字节)"""
two_factor: str | None = Field(default=None, max_length=255)
"""两步验证密钥"""
avatar: str | None = Field(default=None, max_length=255)
"""头像地址"""
options: str | None = Field(default=None)
"""用户个人设置 (JSON格式)"""
authn: str | None = Field(default=None)
"""WebAuthn 凭证"""
open_id: str | None = Field(default=None, max_length=255, unique=True, index=True)
"""第三方登录OpenID"""
score: int = Field(default=0, sa_column_kwargs={"server_default": "0"})
"""用户积分"""
group_expires: datetime | None = Field(default=None)
"""当前用户组过期时间"""
phone: str | None = Field(default=None, max_length=255, unique=True, index=True)
"""手机号"""
# 外键
group_id: int = Field(foreign_key="groups.id", index=True, description="所属用户组ID")
previous_group_id: Optional[int] = Field(default=None, foreign_key="groups.id", description="之前的用户组ID用于过期后恢复")
group_id: int = Field(foreign_key="groups.id", index=True)
"""所属用户组ID"""
previous_group_id: int | None = Field(default=None, foreign_key="groups.id")
"""之前的用户组ID用于过期后恢复"""
# 关系
group: "Group" = Relationship(
@@ -66,152 +77,4 @@ class User(TableBase, table=True):
tags: list["Tag"] = Relationship(back_populates="user")
tasks: list["Task"] = Relationship(back_populates="user")
webdavs: list["WebDAV"] = Relationship(back_populates="user")
@staticmethod
async def create(
user: Optional["User"] = None,
**kwargs
):
"""
向数据库内添加用户。
:param user: User 实例
:type user: User
"""
if not user:
user = User(**kwargs)
from .database import get_session
async for session in get_session():
try:
session.add(user)
await session.commit()
await session.refresh(user)
except Exception as e:
await session.rollback()
raise e
return user
@staticmethod
async def get(
id: int = None,
email: str = None
) -> Optional["User"]:
"""
获取用户信息。
:param id: 用户ID默认为 None
:type id: int
:param email: 用户邮箱,默认为 None
:type email: str
:return: 用户对象或 None
:rtype: Optional[User]
"""
session = get_session()
if id is None and email is None:
return None
async for session in get_session():
query = select(User)
if id is not None:
query = query.where(User.id == id)
if email is not None:
query = query.where(User.email == email)
result = await session.exec(query)
user = result.one_or_none()
return user
@staticmethod
async def update(
id: int,
email: Optional[str] = None,
nick: Optional[str] = None,
password: Optional[str] = None,
status: Optional[int] = None,
storage: Optional[int] = None,
two_factor: Optional[str] = None,
avatar: Optional[str] = None,
options: Optional[str] = None,
authn: Optional[str] = None,
open_id: Optional[str] = None,
score: Optional[int] = None,
group_id: Optional[int] = None
) -> "User":
"""
更新用户信息。
:return: 更新后的用户对象
:rtype: User
"""
async for session in get_session():
try:
statement = select(User).where(User.id == id)
result = await session.exec(statement)
user = result.first()
if user is None:
raise ValueError(f"User with id {id} not found.")
if email is not None:
user.email = email
if nick is not None:
user.nick = nick
if password is not None:
user.password = password
if status is not None:
user.status = status
if storage is not None:
user.storage = storage
if two_factor is not None:
user.two_factor = two_factor
if avatar is not None:
user.avatar = avatar
if options is not None:
user.options = options
if authn is not None:
user.authn = authn
if open_id is not None:
user.open_id = open_id
if score is not None:
user.score = score
if group_id is not None:
user.group_id = group_id
await session.commit()
await session.refresh(user)
return user
except Exception as e:
await session.rollback()
raise e
@staticmethod
async def delete(
id: int
) -> None:
"""
删除用户。
:param id: 用户ID
:type id: int
"""
if id == 1:
raise ValueError("Cannot delete the default admin user with id 1.")
async for session in get_session():
try:
statement = select(User).where(User.id == id)
result = await session.exec(statement)
user = result.one_or_none()
if user is None:
raise ValueError(f"User with id {id} not found.")
await session.delete(user)
await session.commit()
except Exception as e:
await session.rollback()
raise e