217 lines
7.8 KiB
Python
217 lines
7.8 KiB
Python
# my_project/models/user.py
|
||
|
||
from typing import Optional, TYPE_CHECKING
|
||
from datetime import datetime
|
||
from sqlmodel import Field, Relationship, UniqueConstraint
|
||
from .base import TableBase
|
||
from .database import get_session
|
||
from sqlmodel import select
|
||
|
||
# TYPE_CHECKING 用于解决循环导入问题,只在类型检查时导入
|
||
if TYPE_CHECKING:
|
||
from .group import Group
|
||
from .download import Download
|
||
from .file import File
|
||
from .folder import Folder
|
||
from .order import Order
|
||
from .share import Share
|
||
from .storage_pack import StoragePack
|
||
from .tag import Tag
|
||
from .task import Task
|
||
from .webdav import WebDAV
|
||
|
||
class User(TableBase, table=True):
|
||
__tablename__ = 'users'
|
||
|
||
email: str = Field(max_length=100, unique=True, index=True, description="用户邮箱,唯一")
|
||
phone: str = Field(default=None, nullable=True, index=True, description="用户手机号,唯一")
|
||
|
||
nick: Optional[str] = Field(default=None, max_length=50, description="用户昵称")
|
||
password: str = Field(max_length=255, description="用户密码(加密后)")
|
||
status: Optional[bool] = Field(default=None, sa_column_kwargs={"server_default": "0"}, description="用户状态: True=正常, None=未激活, False=封禁")
|
||
storage: int = Field(default=0, sa_column_kwargs={"server_default": "0"}, description="已用存储空间(字节)")
|
||
two_factor: Optional[str] = Field(default=None, max_length=255, description="两步验证密钥")
|
||
avatar: Optional[str] = Field(default=None, max_length=255, description="头像地址")
|
||
options: Optional[str] = Field(default=None, description="用户个人设置 (JSON格式)")
|
||
authn: Optional[str] = Field(default=None, description="WebAuthn 凭证")
|
||
open_id: Optional[str] = Field(default=None, max_length=255, unique=True, index=True, description="第三方登录OpenID")
|
||
score: int = Field(default=0, sa_column_kwargs={"server_default": "0"}, description="用户积分")
|
||
group_expires: Optional[datetime] = Field(default=None, description="当前用户组过期时间")
|
||
phone: Optional[str] = Field(default=None, max_length=255, unique=True, index=True, description="手机号")
|
||
|
||
# 外键
|
||
group_id: int = Field(foreign_key="groups.id", index=True, description="所属用户组ID")
|
||
previous_group_id: Optional[int] = Field(default=None, foreign_key="groups.id", description="之前的用户组ID(用于过期后恢复)")
|
||
|
||
# 关系
|
||
group: "Group" = Relationship(
|
||
back_populates="users",
|
||
sa_relationship_kwargs={
|
||
"foreign_keys": "User.group_id"
|
||
}
|
||
)
|
||
previous_group: Optional["Group"] = Relationship(
|
||
back_populates="previous_users",
|
||
sa_relationship_kwargs={
|
||
"foreign_keys": "User.previous_group_id"
|
||
}
|
||
)
|
||
|
||
downloads: list["Download"] = Relationship(back_populates="user")
|
||
files: list["File"] = Relationship(back_populates="user")
|
||
folders: list["Folder"] = Relationship(back_populates="owner")
|
||
orders: list["Order"] = Relationship(back_populates="user")
|
||
shares: list["Share"] = Relationship(back_populates="user")
|
||
storage_packs: list["StoragePack"] = Relationship(back_populates="user")
|
||
tags: list["Tag"] = Relationship(back_populates="user")
|
||
tasks: list["Task"] = Relationship(back_populates="user")
|
||
webdavs: list["WebDAV"] = Relationship(back_populates="user")
|
||
|
||
@staticmethod
|
||
async def create(
|
||
user: Optional["User"] = None,
|
||
**kwargs
|
||
):
|
||
"""
|
||
向数据库内添加用户。
|
||
|
||
:param user: User 实例
|
||
:type user: User
|
||
"""
|
||
if not user:
|
||
user = User(**kwargs)
|
||
|
||
from .database import get_session
|
||
|
||
async for session in get_session():
|
||
try:
|
||
session.add(user)
|
||
await session.commit()
|
||
await session.refresh(user)
|
||
except Exception as e:
|
||
await session.rollback()
|
||
raise e
|
||
return user
|
||
|
||
@staticmethod
|
||
async def get(
|
||
id: int = None,
|
||
email: str = None
|
||
) -> Optional["User"]:
|
||
"""
|
||
获取用户信息。
|
||
|
||
:param id: 用户ID,默认为 None
|
||
:type id: int
|
||
:param email: 用户邮箱,默认为 None
|
||
:type email: str
|
||
:return: 用户对象或 None
|
||
:rtype: Optional[User]
|
||
"""
|
||
session = get_session()
|
||
|
||
if id is None and email is None:
|
||
return None
|
||
|
||
async for session in get_session():
|
||
query = select(User)
|
||
if id is not None:
|
||
query = query.where(User.id == id)
|
||
if email is not None:
|
||
query = query.where(User.email == email)
|
||
|
||
result = await session.exec(query)
|
||
user = result.one_or_none()
|
||
|
||
return user
|
||
|
||
@staticmethod
|
||
async def update(
|
||
id: int,
|
||
email: Optional[str] = None,
|
||
nick: Optional[str] = None,
|
||
password: Optional[str] = None,
|
||
status: Optional[int] = None,
|
||
storage: Optional[int] = None,
|
||
two_factor: Optional[str] = None,
|
||
avatar: Optional[str] = None,
|
||
options: Optional[str] = None,
|
||
authn: Optional[str] = None,
|
||
open_id: Optional[str] = None,
|
||
score: Optional[int] = None,
|
||
group_id: Optional[int] = None
|
||
) -> "User":
|
||
"""
|
||
更新用户信息。
|
||
|
||
:return: 更新后的用户对象
|
||
:rtype: User
|
||
"""
|
||
async for session in get_session():
|
||
try:
|
||
statement = select(User).where(User.id == id)
|
||
result = await session.exec(statement)
|
||
user = result.first()
|
||
|
||
if user is None:
|
||
raise ValueError(f"User with id {id} not found.")
|
||
|
||
if email is not None:
|
||
user.email = email
|
||
if nick is not None:
|
||
user.nick = nick
|
||
if password is not None:
|
||
user.password = password
|
||
if status is not None:
|
||
user.status = status
|
||
if storage is not None:
|
||
user.storage = storage
|
||
if two_factor is not None:
|
||
user.two_factor = two_factor
|
||
if avatar is not None:
|
||
user.avatar = avatar
|
||
if options is not None:
|
||
user.options = options
|
||
if authn is not None:
|
||
user.authn = authn
|
||
if open_id is not None:
|
||
user.open_id = open_id
|
||
if score is not None:
|
||
user.score = score
|
||
if group_id is not None:
|
||
user.group_id = group_id
|
||
|
||
await session.commit()
|
||
await session.refresh(user)
|
||
return user
|
||
except Exception as e:
|
||
await session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
async def delete(
|
||
id: int
|
||
) -> None:
|
||
"""
|
||
删除用户。
|
||
|
||
:param id: 用户ID
|
||
:type id: int
|
||
"""
|
||
if id == 1:
|
||
raise ValueError("Cannot delete the default admin user with id 1.")
|
||
|
||
async for session in get_session():
|
||
try:
|
||
statement = select(User).where(User.id == id)
|
||
result = await session.exec(statement)
|
||
user = result.one_or_none()
|
||
|
||
if user is None:
|
||
raise ValueError(f"User with id {id} not found.")
|
||
|
||
await session.delete(user)
|
||
await session.commit()
|
||
except Exception as e:
|
||
await session.rollback()
|
||
raise e |