优化用户登录逻辑,简化错误处理,更新密码哈希生成方式为Argon2

This commit is contained in:
2025-11-01 19:41:19 +08:00
parent 0d45a07ba7
commit 83276c8b95
4 changed files with 34 additions and 46 deletions

View File

@@ -5,7 +5,7 @@ from models.database import init_db
from models.migration import migration from models.migration import migration
from pkg.lifespan import lifespan from pkg.lifespan import lifespan
from pkg.JWT import jwt as JWT from pkg.JWT import jwt as JWT
from pkg.log import log, set_log_level from pkg.log import set_log_level
# 添加初始化数据库启动项 # 添加初始化数据库启动项
lifespan.add_startup(init_db) lifespan.add_startup(init_db)
@@ -40,8 +40,9 @@ for router in routers.Router:
401: {"description": "未授权 Unauthorized"}, 401: {"description": "未授权 Unauthorized"},
403: {"description": "禁止访问 Forbidden"}, 403: {"description": "禁止访问 Forbidden"},
404: {"description": "未找到 Not found"}, 404: {"description": "未找到 Not found"},
500: {"description": "内部服务器错误 Internal server error"} 500: {"description": "内部服务器错误 Internal server error"},
},) },
)
# 启动时打印欢迎信息 # 启动时打印欢迎信息
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,4 +1,5 @@
import secrets import secrets
from argon2 import PasswordHasher
class Password: class Password:
@@ -23,16 +24,14 @@ class Password:
password: str, password: str,
) -> str: ) -> str:
""" """
生成密码的加盐哈希值。 生成密码的Argon2哈希值。
:return: 包含盐值和哈希值的字符串 :param password: 要哈希的密码
:return: 使用Argon2算法生成的密码哈希。
:rtype: str :rtype: str
""" """
import os, hashlib, binascii ph = PasswordHasher()
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii') return ph.hash(password)
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
pwdhash = binascii.hexlify(pwdhash)
return (salt + pwdhash).decode('ascii')
@staticmethod @staticmethod
def verify( def verify(
@@ -40,20 +39,17 @@ class Password:
provided_password: str, provided_password: str,
) -> bool: ) -> bool:
""" """
验证存储的密码哈希值与用户提供的密码是否匹配。 验证存储的Argon2密码哈希值与用户提供的密码是否匹配。
:param stored_password: 存储的密码哈希值(包含盐值) :param stored_password: 存储的Argon2密码哈希值。
:param provided_password: 用户提供的密码。 :param provided_password: 用户提供的密码。
:param debug: 是否输出调试信息,将会输出原密码和哈希值。
:return: 如果密码匹配返回 `True` ,否则返回 `False` 。
"""
import hashlib, binascii
salt = stored_password[:64]
stored_password = stored_password[64:]
pwdhash = hashlib.pbkdf2_hmac('sha256',
provided_password.encode('utf-8'),
salt.encode('ascii'),
100000)
pwdhash = binascii.hexlify(pwdhash).decode('ascii')
return secrets.compare_digest(pwdhash, stored_password) :return: 如果密码匹配返回 `True` ,否则返回 `False` 。
:rtype: bool
"""
ph = PasswordHasher()
try:
ph.verify(stored_password, provided_password)
return True
except:
return False

View File

@@ -49,19 +49,13 @@ async def router_user_session(
) )
) )
if not is_login:
if detail in ["User not found", "Incorrect password"]:
raise HTTPException(status_code=400, detail="Invalid username or password")
elif detail == "Need to complete registration":
raise HTTPException(status_code=400, detail="User account is not fully registered")
elif detail == "Account is banned":
raise HTTPException(status_code=403, detail="User account is banned")
else:
raise HTTPException(status_code=500, detail="Internal server error during login")
if isinstance(detail, models.response.TokenModel): if isinstance(detail, models.response.TokenModel):
return detail return detail
elif detail is None:
raise HTTPException(status_code=401, detail="Invalid username or password")
elif detail is False:
raise HTTPException(status_code=403, detail="User account is banned or not fully registered")
else: else:
log.error(f"Unexpected return type from login service: {type(detail)}")
raise HTTPException(status_code=500, detail="Internal server error during login") raise HTTPException(status_code=500, detail="Internal server error during login")
@user_router.post( @user_router.post(

View File

@@ -5,7 +5,7 @@ from models.response import TokenModel
from models.user import User from models.user import User
from pkg.log import log from pkg.log import log
async def Login(LoginRequest: LoginRequest) -> tuple[bool, TokenModel | str]: async def Login(LoginRequest: LoginRequest) -> TokenModel | bool | None:
""" """
根据账号密码进行登录。 根据账号密码进行登录。
@@ -32,32 +32,29 @@ async def Login(LoginRequest: LoginRequest) -> tuple[bool, TokenModel | str]:
# [TODO] 验证码校验 # [TODO] 验证码校验
# 验证用户是否存在 # 获取用户信息
user = await User.get(email=LoginRequest.username) user = await User.get(email=LoginRequest.username)
# 验证用户是否存在
if not user: if not user:
log.debug(f"Cannot find user with email: {LoginRequest.username}") log.debug(f"Cannot find user with email: {LoginRequest.username}")
return False, "User not found" return None
# 验证密码是否正确 # 验证密码是否正确
if not Password.verify(user.password, LoginRequest.password): if not Password.verify(user.password, LoginRequest.password):
log.debug(f"Password verification failed for user: {LoginRequest.username}") log.debug(f"Password verification failed for user: {LoginRequest.username}")
return False, "Incorrect password" return None
# 验证用户是否可登录 # 验证用户是否可登录
if user.status == None: if not user.status:
# 未完成注册 # 未完成注册 or 账号已被封禁
return False, "Need to complete registration" return False
elif user.status == False:
# 账号已被封禁
return False, "Account is banned"
# 创建令牌 # 创建令牌
access_token, access_expire = create_access_token(data={'sub': user.email}) access_token, access_expire = create_access_token(data={'sub': user.email})
refresh_token, refresh_expire = create_refresh_token(data={'sub': user.email}) refresh_token, refresh_expire = create_refresh_token(data={'sub': user.email})
return True, TokenModel( return TokenModel(
access_token=access_token, access_token=access_token,
access_expires=access_expire, access_expires=access_expire,
refresh_token=refresh_token, refresh_token=refresh_token,