Merge branch 'main' into Owner-of-the-car-enhance

This commit is contained in:
2025-10-03 00:09:50 +08:00
21 changed files with 514 additions and 141 deletions

View File

@@ -1 +1,3 @@
from . import token
from . import token
from .setting import Setting
from .object import Object

151
model/base.py Normal file
View File

@@ -0,0 +1,151 @@
# model/base.py
from datetime import datetime, timezone
from typing import Optional, Type, TypeVar, Union, Literal, List
from sqlalchemy import DateTime, BinaryExpression, ClauseElement
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import SQLModel, Field, select, Relationship
from sqlalchemy.sql._typing import _OnClauseArgument
B = TypeVar('B', bound='TableBase')
M = TypeVar('M', bound='SQLModel')
utcnow = lambda: datetime.now(tz=timezone.utc)
class TableBase(AsyncAttrs, SQLModel):
__abstract__ = True
created_at: datetime = Field(
default_factory=utcnow,
description="创建时间",
)
updated_at: datetime = Field(
sa_type=DateTime,
description="更新时间",
sa_column_kwargs={"default": utcnow, "onupdate": utcnow},
default_factory=utcnow
)
deleted_at: Optional[datetime] = Field(
default=None,
description="删除时间",
sa_column={"nullable": True}
)
@classmethod
async def add(
cls: Type[B],
session: AsyncSession,
instances: B | List[B],
refresh: bool = True
) -> B | List[B]:
is_list = isinstance(instances, list)
if is_list:
session.add_all(instances)
else:
session.add(instances)
await session.commit()
if refresh:
if is_list:
for i in instances:
await session.refresh(i)
else:
await session.refresh(instances)
return instances
async def save(
self: B,
session: AsyncSession,
load: Union[Relationship, None] = None, # 设默认值,避免必须传
):
session.add(self)
await session.commit()
if load is not None:
cls = type(self)
return await cls.get(session, cls.id == self.id, load=load) # 若该模型没有 id请别用 load 模式
else:
await session.refresh(self)
return self
async def update(
self: B,
session: AsyncSession,
other: M,
extra_data: dict = None,
exclude_unset: bool = True,
) -> B:
self.sqlmodel_update(
other.model_dump(exclude_unset=exclude_unset),
update=extra_data
)
session.add(self)
await session.commit()
await session.refresh(self)
return self
@classmethod
async def delete(
cls: Type[B],
session: AsyncSession,
instance: B | list[B],
) -> None:
if isinstance(instance, list):
for inst in instance:
await session.delete(inst)
else:
await session.delete(instance)
await session.commit()
@classmethod
async def get(
cls: Type[B],
session: AsyncSession,
condition: BinaryExpression | ClauseElement | None,
*,
offset: int | None = None,
limit: int | None = None,
fetch_mode: Literal["one", "first", "all"] = "first",
join: Type[B] | tuple[Type[B], _OnClauseArgument] | None = None,
options: list | None = None,
load: Union[Relationship, None] = None,
order_by: list[ClauseElement] | None = None
) -> B | List[B] | None:
statement = select(cls)
if condition is not None:
statement = statement.where(condition)
if join is not None:
statement = statement.join(*join)
if options:
statement = statement.options(*options)
if load:
statement = statement.options(selectinload(load))
if order_by is not None:
statement = statement.order_by(*order_by)
if offset:
statement = statement.offset(offset)
if limit:
statement = statement.limit(limit)
result = await session.exec(statement)
if fetch_mode == "one":
return result.one()
elif fetch_mode == "first":
return result.first()
elif fetch_mode == "all":
return list(result.all())
else:
raise ValueError(f"无效的 fetch_mode: {fetch_mode}")
@classmethod
async def get_exist_one(cls: Type[B], session: AsyncSession, id: int, load: Union[Relationship, None] = None) -> B:
instance = await cls.get(session, cls.id == id, load=load)
if not instance:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Not found")
return instance
# 需要“自增 id 主键”的模型才混入它Setting 不混入
class IdMixin(SQLModel):
id: Optional[int] = Field(default=None, primary_key=True, description="主键ID")

View File

@@ -1,19 +1,31 @@
'''
Author: 于小丘 海枫
Date: 2024-10-02 15:23:34
LastEditors: Yuerchu admin@yuxiaoqiu.cn
LastEditTime: 2024-11-29 20:05:03
FilePath: /Findreve/model.py
Description: Findreve 数据库组件 model
Copyright (c) 2018-2024 by 于小丘Yuerchu, All Rights Reserved.
'''
from contextlib import asynccontextmanager
import aiosqlite
from datetime import datetime
import tool
import logging
from typing import Literal, Optional
from typing import Optional
from sqlmodel import SQLModel
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.orm import sessionmaker
from typing import AsyncGenerator
import warnings
from .migration import migration
ASYNC_DATABASE_URL = "sqlite+aiosqlite:///data.db"
engine: AsyncEngine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True,
connect_args={
"check_same_thread": False
} if ASYNC_DATABASE_URL.startswith("sqlite") else None,
future=True,
# pool_size=POOL_SIZE,
# max_overflow=64,
)
_async_session_factory = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 数据库类
class Database:
@@ -24,104 +36,28 @@ class Database:
db_path: str = "data.db" # db_path 数据库文件路径,默认为 data.db
):
self.db_path = db_path
async def init_db(self):
"""初始化数据库和表"""
logging.info("开始初始化数据库和表")
create_objects_table = """
CREATE TABLE IF NOT EXISTS fr_objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
key TEXT NOT NULL,
name TEXT NOT NULL,
icon TEXT,
status TEXT,
phone TEXT,
context TEXT,
find_ip TEXT,
create_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
lost_at TIMESTAMP
)
"""
create_settings_table = """
CREATE TABLE IF NOT EXISTS fr_settings (
type TEXT,
name TEXT PRIMARY KEY,
value TEXT
)
"""
async with aiosqlite.connect(self.db_path) as db:
logging.info("连接到数据库")
await db.execute(create_objects_table)
logging.info("创建或验证fr_objects表")
await db.execute(create_settings_table)
logging.info("创建或验证fr_settings表")
# 初始化设置表数据
async with db.execute("SELECT name FROM fr_settings WHERE name = 'version'") as cursor:
if not await cursor.fetchone():
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'version', '1.0.0')
)
logging.info("插入初始版本信息: version 1.0.0")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'ver'") as cursor:
if not await cursor.fetchone():
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('int', 'ver', '1')
)
logging.info("插入初始版本号: ver 1")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'account'") as cursor:
if not await cursor.fetchone():
account = 'admin@yuxiaoqiu.cn'
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'account', account)
)
logging.info(f"插入初始账号信息: {account}")
print(f"账号: {account}")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'password'") as cursor:
if not await cursor.fetchone():
password = tool.generate_password()
hashed_password = tool.hash_password(password)
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'password', hashed_password)
)
logging.info("插入初始密码信息")
print(f"密码(请牢记,后续不再显示): {password}")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'SECRET_KEY'") as cursor:
if not await cursor.fetchone():
secret_key = tool.generate_password(64)
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'SECRET_KEY', secret_key)
)
logging.info("插入初始密钥信息")
await db.commit()
logging.info("数据库初始化完成并提交更改")
async def add_object(
self,
key: str,
type: Literal['normal', 'car'],
name: str,
icon: str = None,
phone: str = None,
):
@staticmethod
@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with _async_session_factory() as session:
yield session
async def init_db(
self,
url: str = ASYNC_DATABASE_URL
):
"""创建数据库结构"""
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with self.get_session() as session:
await migration(session) # 执行迁移脚本
async def add_object(self, key: str, name: str, icon: str = None, phone: str = None):
"""
添加新对象
:param type: 对象类型
:param key: 序列号
:param name: 名称
:param icon: 图标
@@ -135,15 +71,14 @@ class Database:
now = datetime.now()
now = now.strftime("%Y-%m-%d %H:%M:%S")
await db.execute(
"INSERT INTO fr_objects (key, name, icon, phone, create_at, status, type) VALUES (?, ?, ?, ?, ?, 'ok', ?)",
(key, name, icon, phone, now, type)
"INSERT INTO fr_objects (key, name, icon, phone, create_at, status) VALUES (?, ?, ?, ?, ?, 'ok')",
(key, name, icon, phone, now)
)
await db.commit()
async def update_object(
self,
id: int,
type: Literal['normal', 'car'] = None,
key: str = None,
name: str = None,
icon: str = None,
@@ -151,13 +86,11 @@ class Database:
phone: int = None,
lost_description: Optional[str] = None,
find_ip: Optional[str] = None,
lost_time: Optional[str] = None
):
lost_time: Optional[str] = None):
"""
更新对象信息
:param id: 对象ID
:param type: 对象类型
:param key: 序列号
:param name: 名称
:param icon: 图标
@@ -185,18 +118,13 @@ class Database:
f"phone = COALESCE(?, phone), "
f"context = COALESCE(?, context), "
f"find_ip = COALESCE(?, find_ip), "
f"lost_at = COALESCE(?, lost_at), "
f"type = COALESCE(?, type) "
f"lost_at = COALESCE(?, lost_at) "
f"WHERE id = ?",
(key, name, icon, status, phone, lost_description, find_ip, lost_time, type, id)
(key, name, icon, status, phone, lost_description, find_ip, lost_time, id)
)
await db.commit()
async def get_object(
self,
id: int = None,
key: str = None
):
async def get_object(self, id: int = None, key: str = None):
"""
获取对象

84
model/migration.py Normal file
View File

@@ -0,0 +1,84 @@
from typing import Sequence
from sqlmodel import select
from .setting import Setting
import tool
default_settings: list[Setting] = [
Setting(type='string', name='version', value='1.0.0'),
Setting(type='int', name='ver', value='1'),
Setting(type='string', name='account', value='admin@yuxiaoqiu.cn'),
]
async def migration(session):
# 先准备基础配置
settings: list[Setting] = default_settings.copy()
# 生成初始密码与密钥
admin_password = tool.generate_password()
print(f"密码(请牢记,后续不再显示): {admin_password}")
settings.append(Setting(type='string', name='password', value=tool.hash_password(admin_password)))
settings.append(Setting(type='string', name='SECRET_KEY', value=tool.generate_password(64)))
# 读取库里已存在的 name避免主键冲突
names = [s.name for s in settings]
exist_stmt = select(Setting.name).where(Setting.name.in_(names))
exist_rs = await session.exec(exist_stmt)
existed: set[str] = set(exist_rs.all())
to_insert = [s for s in settings if s.name not in existed]
if to_insert:
# 使用你写好的通用新增方法(是类方法),并传入会话
await Setting.add(session, to_insert, refresh=False)
"""
# 初始化设置表数据
async with db.execute("SELECT name FROM fr_settings WHERE name = 'version'") as cursor:
if not await cursor.fetchone():
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'version', '1.0.0')
)
logging.info("插入初始版本信息: version 1.0.0")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'ver'") as cursor:
if not await cursor.fetchone():
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('int', 'ver', '1')
)
logging.info("插入初始版本号: ver 1")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'account'") as cursor:
if not await cursor.fetchone():
account = 'admin@yuxiaoqiu.cn'
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'account', account)
)
logging.info(f"插入初始账号信息: {account}")
print(f"账号: {account}")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'password'") as cursor:
if not await cursor.fetchone():
password = tool.generate_password()
hashed_password = tool.hash_password(password)
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'password', hashed_password)
)
logging.info("插入初始密码信息")
print(f"密码(请牢记,后续不再显示): {password}")
async with db.execute("SELECT name FROM fr_settings WHERE name = 'SECRET_KEY'") as cursor:
if not await cursor.fetchone():
secret_key = tool.generate_password(64)
await db.execute(
"INSERT INTO fr_settings (type, name, value) VALUES (?, ?, ?)",
('string', 'SECRET_KEY', secret_key)
)
logging.info("插入初始密钥信息")
await db.commit()
logging.info("数据库初始化完成并提交更改")
"""

46
model/object.py Normal file
View File

@@ -0,0 +1,46 @@
# my_project/models/download.py
from typing import Literal, Optional, TYPE_CHECKING
from sqlmodel import Field, Column, SQLModel, String, DateTime
from .base import TableBase, IdMixin
from datetime import datetime
"""
原建表语句:
CREATE TABLE IF NOT EXISTS fr_objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
name TEXT NOT NULL,
icon TEXT,
status TEXT,
phone TEXT,
context TEXT,
find_ip TEXT,
create_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
lost_at TIMESTAMP
"""
if TYPE_CHECKING:
pass
class Object(IdMixin, TableBase, table=True):
__tablename__ = 'fr_objects'
key: str = Field(index=True, nullable=False, description="物品外部ID")
type: Literal['object', 'box'] = Field(
default='object',
description="物品类型",
sa_column=Column(String, default='object', nullable=False)
)
name: str = Field(nullable=False, description="物品名称")
icon: Optional[str] = Field(default=None, description="物品图标")
status: Optional[str] = Field(default=None, description="物品状态")
phone: Optional[str] = Field(default=None, description="联系电话")
context: Optional[str] = Field(default=None, description="物品描述")
find_ip: Optional[str] = Field(default=None, description="最后一次发现的IP地址")
lost_at: Optional[datetime] = Field(
default=None,
description="物品标记为丢失的时间",
sa_column=Column(DateTime, nullable=True)
)

23
model/setting.py Normal file
View File

@@ -0,0 +1,23 @@
# model/setting.py
from typing import TYPE_CHECKING, Optional
from sqlmodel import Field
from .base import TableBase
"""
原表:
CREATE TABLE IF NOT EXISTS fr_settings (
type TEXT,
name TEXT PRIMARY KEY,
value TEXT
)
"""
if TYPE_CHECKING:
pass
class Setting(TableBase, table=True):
__tablename__ = 'fr_settings'
type: str = Field(index=True, nullable=False, description="设置类型")
name: str = Field(primary_key=True, nullable=False, description="设置名称") # name 为唯一主键
value: Optional[str] = Field(description="设置值")