用户登录

This commit is contained in:
2025-07-17 19:33:48 +08:00
parent 412565cda2
commit e98c46f44a
26 changed files with 187 additions and 385 deletions

View File

@@ -1,5 +1,7 @@
from fastapi.security import OAuth2PasswordBearer
from models.setting import Setting
from datetime import datetime, timedelta, timezone
import jwt
oauth2_scheme = OAuth2PasswordBearer(
scheme_name='获取 JWT Bearer 令牌',
@@ -17,4 +19,26 @@ async def load_secret_key() -> None:
:type key: str
"""
global SECRET_KEY
SECRET_KEY = await Setting.get(type='auth', name='secret_key')
SECRET_KEY = await Setting.get(type='auth', name='secret_key')
# 访问令牌
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, datetime]:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=3)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm='HS256')
return encoded_jwt, expire
# 刷新令牌
def create_refresh_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, datetime]:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(days=30)
to_encode.update({"exp": expire, "token_type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm='HS256')
return encoded_jwt, expire

View File

@@ -1,7 +1,6 @@
from rich import print
from rich.console import Console
from rich.markdown import Markdown
from configparser import ConfigParser
from typing import Literal, Optional, Dict, Union
from enum import Enum
import time
@@ -17,10 +16,6 @@ class LogLevelEnum(str, Enum):
# 默认日志级别
LogLevel = LogLevelEnum.INFO
# 日志文件路径
LOG_FILE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
# 是否启用文件日志
ENABLE_FILE_LOG = False
def set_log_level(level: Union[str, LogLevelEnum]) -> None:
"""设置日志级别"""
@@ -33,17 +28,6 @@ def set_log_level(level: Union[str, LogLevelEnum]) -> None:
else:
LogLevel = level
def enable_file_log(enable: bool = True) -> None:
"""启用或禁用文件日志"""
global ENABLE_FILE_LOG
ENABLE_FILE_LOG = enable
if enable and not os.path.exists(LOG_FILE_PATH):
try:
os.makedirs(LOG_FILE_PATH)
except Exception as e:
print(f"[bold red]创建日志目录失败: {e}[/bold red]")
ENABLE_FILE_LOG = False
def truncate_path(full_path: str, marker: str = "HeyAuth") -> str:
"""截断路径只保留从marker开始的部分"""
try:
@@ -114,17 +98,6 @@ def log(level: str = 'debug', message: str = ''):
if should_log:
print(log_message)
# 文件日志记录
if ENABLE_FILE_LOG:
try:
# 去除rich格式化标记
clean_message = f"{level_value.upper()}\t{timestamp} From {filename}, line {lineno} {message}"
log_file = os.path.join(LOG_FILE_PATH, f"{time.strftime('%Y%m%d')}.log")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"{clean_message}\n")
except Exception as e:
print(f"[bold red]写入日志文件失败: {e}[/bold red]")
# 便捷日志函数
debug = lambda message: log('debug', message)
@@ -133,31 +106,6 @@ warning = lambda message: log('warn', message)
error = lambda message: log('error', message)
success = lambda message: log('success', message)
def load_config(config_path: str) -> bool:
"""从配置文件加载日志配置"""
try:
if not os.path.exists(config_path):
return False
config = ConfigParser()
config.read(config_path, encoding='utf-8')
if 'log' in config:
log_config = config['log']
if 'level' in log_config:
set_log_level(log_config['level'])
if 'file_log' in log_config:
enable_file_log(log_config.getboolean('file_log'))
if 'log_path' in log_config:
global LOG_FILE_PATH
custom_path = log_config['log_path']
if os.path.exists(custom_path) or os.makedirs(custom_path, exist_ok=True):
LOG_FILE_PATH = custom_path
return True
except Exception as e:
error(f"加载日志配置失败: {e}")
return False
def title(title: str = '海枫授权系统 HeyAuth', size: Optional[Literal['h1', 'h2', 'h3', 'h4', 'h5']] = 'h1'):
"""
输出标题
@@ -204,8 +152,4 @@ if __name__ == '__main__':
print("\n设置为DEBUG级别测试:")
set_log_level(LogLevelEnum.DEBUG)
debug('这是一个debug日志') # 现在会显示
print("\n启用文件日志测试:")
enable_file_log()
info('此日志将同时记录到文件')
debug('这是一个debug日志') # 现在会显示