Files
findreve/services/session.py

61 lines
1.6 KiB
Python

"""
会话服务,负责处理登录与令牌生成逻辑。
"""
from datetime import datetime, timedelta, timezone
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any
import jwt
from model import Setting, User
from model.response import TokenResponse
from pkg import Password, utils
import JWT
async def create_access_token(
session: AsyncSession,
data: dict[str, Any],
) -> str:
"""
创建访问令牌。
"""
to_encode = data.copy()
jwt_exp_setting = await Setting.get(session, Setting.name == "jwt_token_exp")
expire = datetime.now(timezone.utc) + timedelta(int(jwt_exp_setting.value))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, key=await JWT.get_secret_key(), algorithm="HS256")
return encoded_jwt
async def authenticate_user(
session: AsyncSession,
username: str,
password: str,
) -> User:
"""
验证用户名和密码,返回认证后的用户。
"""
account = await User.get(session, User.email == username)
if not account or account.email != username or not Password.verify(account.password, password):
utils.raise_unauthorized("Account or password is incorrect")
return account
async def login_for_access_token(
session: AsyncSession,
username: str,
password: str,
) -> TokenResponse:
"""
登录并生成访问令牌。
"""
user = await authenticate_user(session=session, username=username, password=password)
access_token = await create_access_token(
session=session,
data={"sub": user.email},
)
return TokenResponse(access_token=access_token)