diff --git a/main.py b/main.py index 61f0403..97fc051 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ from models.database import init_db from models.migration import migration from pkg.lifespan import lifespan from pkg.JWT import jwt as JWT -from pkg.log import log, set_log_level +from pkg.log import set_log_level # 添加初始化数据库启动项 lifespan.add_startup(init_db) @@ -40,8 +40,9 @@ for router in routers.Router: 401: {"description": "未授权 Unauthorized"}, 403: {"description": "禁止访问 Forbidden"}, 404: {"description": "未找到 Not found"}, - 500: {"description": "内部服务器错误 Internal server error"} - },) + 500: {"description": "内部服务器错误 Internal server error"}, + }, + ) # 启动时打印欢迎信息 if __name__ == "__main__": diff --git a/pkg/password/pwd.py b/pkg/password/pwd.py index 1b7cd59..15fcdf2 100644 --- a/pkg/password/pwd.py +++ b/pkg/password/pwd.py @@ -1,4 +1,5 @@ import secrets +from argon2 import PasswordHasher class Password: @@ -23,16 +24,14 @@ class Password: password: str, ) -> str: """ - 生成密码的加盐哈希值。 + 生成密码的Argon2哈希值。 - :return: 包含盐值和哈希值的字符串。 + :param password: 要哈希的密码。 + :return: 使用Argon2算法生成的密码哈希。 :rtype: str """ - import os, hashlib, binascii - salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii') - pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) - pwdhash = binascii.hexlify(pwdhash) - return (salt + pwdhash).decode('ascii') + ph = PasswordHasher() + return ph.hash(password) @staticmethod def verify( @@ -40,20 +39,17 @@ class Password: provided_password: str, ) -> bool: """ - 验证存储的密码哈希值与用户提供的密码是否匹配。 + 验证存储的Argon2密码哈希值与用户提供的密码是否匹配。 - :param stored_password: 存储的密码哈希值(包含盐值)。 + :param stored_password: 存储的Argon2密码哈希值。 :param provided_password: 用户提供的密码。 - :param debug: 是否输出调试信息,将会输出原密码和哈希值。 - :return: 如果密码匹配返回 `True` ,否则返回 `False` 。 - """ - import hashlib, binascii - salt = stored_password[:64] - stored_password = stored_password[64:] - pwdhash = hashlib.pbkdf2_hmac('sha256', - provided_password.encode('utf-8'), - salt.encode('ascii'), - 100000) - pwdhash = binascii.hexlify(pwdhash).decode('ascii') - return secrets.compare_digest(pwdhash, stored_password) \ No newline at end of file + :return: 如果密码匹配返回 `True` ,否则返回 `False` 。 + :rtype: bool + """ + ph = PasswordHasher() + try: + ph.verify(stored_password, provided_password) + return True + except: + return False diff --git a/routers/controllers/user.py b/routers/controllers/user.py index 26983b3..94f98fe 100644 --- a/routers/controllers/user.py +++ b/routers/controllers/user.py @@ -49,19 +49,13 @@ async def router_user_session( ) ) - if not is_login: - if detail in ["User not found", "Incorrect password"]: - raise HTTPException(status_code=400, detail="Invalid username or password") - elif detail == "Need to complete registration": - raise HTTPException(status_code=400, detail="User account is not fully registered") - elif detail == "Account is banned": - raise HTTPException(status_code=403, detail="User account is banned") - else: - raise HTTPException(status_code=500, detail="Internal server error during login") if isinstance(detail, models.response.TokenModel): return detail + elif detail is None: + raise HTTPException(status_code=401, detail="Invalid username or password") + elif detail is False: + raise HTTPException(status_code=403, detail="User account is banned or not fully registered") else: - log.error(f"Unexpected return type from login service: {type(detail)}") raise HTTPException(status_code=500, detail="Internal server error during login") @user_router.post( diff --git a/service/user/login.py b/service/user/login.py index 8eb0ff4..6f27095 100644 --- a/service/user/login.py +++ b/service/user/login.py @@ -5,7 +5,7 @@ from models.response import TokenModel from models.user import User from pkg.log import log -async def Login(LoginRequest: LoginRequest) -> tuple[bool, TokenModel | str]: +async def Login(LoginRequest: LoginRequest) -> TokenModel | bool | None: """ 根据账号密码进行登录。 @@ -32,32 +32,29 @@ async def Login(LoginRequest: LoginRequest) -> tuple[bool, TokenModel | str]: # [TODO] 验证码校验 - # 验证用户是否存在 + # 获取用户信息 user = await User.get(email=LoginRequest.username) + # 验证用户是否存在 if not user: log.debug(f"Cannot find user with email: {LoginRequest.username}") - return False, "User not found" + return None # 验证密码是否正确 if not Password.verify(user.password, LoginRequest.password): log.debug(f"Password verification failed for user: {LoginRequest.username}") - return False, "Incorrect password" + return None # 验证用户是否可登录 - if user.status == None: - # 未完成注册 - return False, "Need to complete registration" - elif user.status == False: - # 账号已被封禁 - return False, "Account is banned" + if not user.status: + # 未完成注册 or 账号已被封禁 + return False # 创建令牌 - access_token, access_expire = create_access_token(data={'sub': user.email}) refresh_token, refresh_expire = create_refresh_token(data={'sub': user.email}) - return True, TokenModel( + return TokenModel( access_token=access_token, access_expires=access_expire, refresh_token=refresh_token,