Files
disknext/tests/unit/service/test_login.py
于小丘 209cb24ab4 feat: add models for physical files, policies, and user management
- Implement PhysicalFile model to manage physical file references and reference counting.
- Create Policy model with associated options and group links for storage policies.
- Introduce Redeem and Report models for handling redeem codes and reports.
- Add Settings model for site configuration and user settings management.
- Develop Share model for sharing objects with unique codes and associated metadata.
- Implement SourceLink model for managing download links associated with objects.
- Create StoragePack model for managing user storage packages.
- Add Tag model for user-defined tags with manual and automatic types.
- Implement Task model for managing background tasks with status tracking.
- Develop User model with comprehensive user management features including authentication.
- Introduce UserAuthn model for managing WebAuthn credentials.
- Create WebDAV model for managing WebDAV accounts associated with users.
2026-02-10 19:07:48 +08:00

234 lines
5.7 KiB
Python

"""
Login 服务的单元测试
"""
import pytest
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodels.user import User, LoginRequest, TokenResponse
from sqlmodels.group import Group
from service.user.login import login
from utils.password.pwd import Password
@pytest.fixture
async def setup_user(db_session: AsyncSession):
"""创建测试用户"""
# 创建用户组
group = Group(name="测试组")
group = await group.save(db_session)
# 创建正常用户
plain_password = "secure_password_123"
user = User(
email="loginuser@test.local",
password=Password.hash(plain_password),
status=True,
group_id=group.id
)
user = await user.save(db_session)
return {
"user": user,
"password": plain_password,
"group_id": group.id
}
@pytest.fixture
async def setup_banned_user(db_session: AsyncSession):
"""创建被封禁的用户"""
group = Group(name="测试组2")
group = await group.save(db_session)
user = User(
email="banneduser@test.local",
password=Password.hash("password"),
status=False, # 封禁状态
group_id=group.id
)
user = await user.save(db_session)
return user
@pytest.fixture
async def setup_2fa_user(db_session: AsyncSession):
"""创建启用了两步验证的用户"""
import pyotp
group = Group(name="测试组3")
group = await group.save(db_session)
secret = pyotp.random_base32()
user = User(
email="2fauser@test.local",
password=Password.hash("password"),
status=True,
two_factor=secret,
group_id=group.id
)
user = await user.save(db_session)
return {
"user": user,
"secret": secret,
"password": "password"
}
@pytest.mark.asyncio
async def test_login_success(db_session: AsyncSession, setup_user):
"""测试正常登录"""
user_data = setup_user
login_request = LoginRequest(
email="loginuser@test.local",
password=user_data["password"]
)
result = await login(db_session, login_request)
assert isinstance(result, TokenResponse)
assert result.access_token is not None
assert result.refresh_token is not None
assert result.access_expires is not None
assert result.refresh_expires is not None
@pytest.mark.asyncio
async def test_login_user_not_found(db_session: AsyncSession):
"""测试用户不存在"""
login_request = LoginRequest(
email="nonexistent@test.local",
password="any_password"
)
result = await login(db_session, login_request)
assert result is None
@pytest.mark.asyncio
async def test_login_wrong_password(db_session: AsyncSession, setup_user):
"""测试密码错误"""
login_request = LoginRequest(
email="loginuser@test.local",
password="wrong_password"
)
result = await login(db_session, login_request)
assert result is None
@pytest.mark.asyncio
async def test_login_user_banned(db_session: AsyncSession, setup_banned_user):
"""测试用户被封禁"""
login_request = LoginRequest(
email="banneduser@test.local",
password="password"
)
result = await login(db_session, login_request)
assert result is False
@pytest.mark.asyncio
async def test_login_2fa_required(db_session: AsyncSession, setup_2fa_user):
"""测试需要 2FA"""
user_data = setup_2fa_user
login_request = LoginRequest(
email="2fauser@test.local",
password=user_data["password"]
# 未提供 two_fa_code
)
result = await login(db_session, login_request)
assert result == "2fa_required"
@pytest.mark.asyncio
async def test_login_2fa_invalid(db_session: AsyncSession, setup_2fa_user):
"""测试 2FA 错误"""
user_data = setup_2fa_user
login_request = LoginRequest(
email="2fauser@test.local",
password=user_data["password"],
two_fa_code="000000" # 错误的验证码
)
result = await login(db_session, login_request)
assert result == "2fa_invalid"
@pytest.mark.asyncio
async def test_login_2fa_success(db_session: AsyncSession, setup_2fa_user):
"""测试 2FA 成功"""
import pyotp
user_data = setup_2fa_user
secret = user_data["secret"]
# 生成当前有效的 TOTP 码
totp = pyotp.TOTP(secret)
valid_code = totp.now()
login_request = LoginRequest(
email="2fauser@test.local",
password=user_data["password"],
two_fa_code=valid_code
)
result = await login(db_session, login_request)
assert isinstance(result, TokenResponse)
assert result.access_token is not None
@pytest.mark.asyncio
async def test_login_returns_valid_tokens(db_session: AsyncSession, setup_user):
"""测试返回的令牌可以被解码"""
import jwt as pyjwt
user_data = setup_user
login_request = LoginRequest(
email="loginuser@test.local",
password=user_data["password"]
)
result = await login(db_session, login_request)
assert isinstance(result, TokenResponse)
# 注意: 实际项目中需要使用正确的 SECRET_KEY
# 这里假设测试环境已经设置了 SECRET_KEY
# decoded = pyjwt.decode(
# result.access_token,
# SECRET_KEY,
# algorithms=["HS256"]
# )
# assert decoded["sub"] == "loginuser"
@pytest.mark.asyncio
async def test_login_case_sensitive_email(db_session: AsyncSession, setup_user):
"""测试邮箱大小写敏感"""
user_data = setup_user
# 使用大写邮箱登录
login_request = LoginRequest(
email="LOGINUSER@TEST.LOCAL",
password=user_data["password"]
)
result = await login(db_session, login_request)
# 应该失败,因为邮箱大小写不匹配
assert result is None