""" Tests de sécurité pour l'authentification. Couvre : - SEC-FIX-001 : bootstrap admin, rattrapage admin existant, blocage CRUD avant changement - SEC-FIX-002 : rate limiting login - SEC-FIX-003 : validation mot de passe et username - SEC-FIX-004 : invalidation de token après changement de mot de passe """ import pytest from fastapi.testclient import TestClient from sqlalchemy import text import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # conftest.py sets DATABASE_URL before this import from database import engine, Base from main import ( app, _migrate_users_must_change_password, _migrate_users_token_version, _migrate_force_admin_password_change, _migrate_users, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def reset_db(): """Fresh schema + seeded admin for each test.""" Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine) _migrate_users_must_change_password() _migrate_users_token_version() _migrate_users() yield Base.metadata.drop_all(bind=engine) @pytest.fixture(autouse=True) def reset_rate_limits(): """Remet à zéro les compteurs de rate limiting entre chaque test.""" from routers.auth import _ip_attempts, _login_attempts, _rate_lock with _rate_lock: _ip_attempts.clear() _login_attempts.clear() yield with _rate_lock: _ip_attempts.clear() _login_attempts.clear() @pytest.fixture def client(): return TestClient(app, raise_server_exceptions=True) def _login(client, username="admin", password="admin"): return client.post( "/api/auth/login", data={"username": username, "password": password}, ) def _auth_headers(token): return {"Authorization": f"Bearer {token}"} # --------------------------------------------------------------------------- # SEC-FIX-001 — Bootstrap et rattrapage # --------------------------------------------------------------------------- class TestBootstrap: def test_fresh_db_admin_must_change_password(self, client): """Nouvelle base : admin créé avec must_change_password=1.""" r = _login(client) assert r.status_code == 200 data = r.json() assert data["must_change_password"] is True assert data["username"] == "admin" def test_crud_blocked_before_password_change(self, client): """CRUD refusé (403) tant que must_change_password est vrai.""" token = _login(client).json()["access_token"] r = client.get("/api/vlans/", headers=_auth_headers(token)) assert r.status_code == 403 assert r.json()["detail"] == "Password change required" def test_crud_allowed_after_password_change(self, client): """CRUD autorisé après changement de mot de passe.""" token = _login(client).json()["access_token"] r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "SecurePass1"}, headers=_auth_headers(token), ) assert r.status_code == 200 new_token = r.json()["access_token"] assert r.json()["must_change_password"] is False r2 = client.get("/api/vlans/", headers=_auth_headers(new_token)) assert r2.status_code == 200 def test_migration_forces_existing_admin_with_default_password(self, client): """Rattrapage : admin existant avec must_change_password=0 et password 'admin' est forcé.""" # Simuler une ancienne base : admin avec must_change_password=0 with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() # La migration de rattrapage doit remettre must_change_password=1 _migrate_force_admin_password_change() r = _login(client) assert r.status_code == 200 assert r.json()["must_change_password"] is True def test_migration_does_not_touch_admin_with_custom_password(self, client): """Rattrapage : admin avec mot de passe personnalisé et must_change_password=0 n'est pas touché.""" from passlib.context import CryptContext pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") with engine.connect() as conn: conn.execute(text( "UPDATE users SET hashed_password=:h, must_change_password=0 WHERE username='admin'" ), {"h": pwd.hash("CustomPass9")}) conn.commit() _migrate_force_admin_password_change() r = client.post("/api/auth/login", data={"username": "admin", "password": "CustomPass9"}) assert r.status_code == 200 assert r.json()["must_change_password"] is False def test_initial_admin_password_env_var(self, monkeypatch): """Avec INITIAL_ADMIN_PASSWORD, must_change_password=0.""" monkeypatch.setenv("INITIAL_ADMIN_PASSWORD", "EnvPass42") Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine) _migrate_users_must_change_password() _migrate_users_token_version() _migrate_users() with TestClient(app) as c: r = c.post("/api/auth/login", data={"username": "admin", "password": "EnvPass42"}) assert r.status_code == 200 assert r.json()["must_change_password"] is False # --------------------------------------------------------------------------- # SEC-FIX-004 — Invalidation de token après changement de mot de passe # --------------------------------------------------------------------------- class TestTokenInvalidation: def test_old_token_rejected_after_password_change(self, client): """L'ancien token est invalide après changement de mot de passe.""" # Forcer must_change_password=0 pour pouvoir tester le CRUD with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() old_token = _login(client).json()["access_token"] # Changer le mot de passe → invalide old_token client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "NewPass99"}, headers=_auth_headers(old_token), ) r = client.get("/api/vlans/", headers=_auth_headers(old_token)) assert r.status_code == 401 def test_new_token_valid_after_password_change(self, client): """Le nouveau token fonctionne après changement de mot de passe.""" with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() old_token = _login(client).json()["access_token"] r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "NewPass99"}, headers=_auth_headers(old_token), ) new_token = r.json()["access_token"] r2 = client.get("/api/vlans/", headers=_auth_headers(new_token)) assert r2.status_code == 200 def test_token_without_version_accepted_for_backward_compat(self, client): """Token sans champ 'ver' (ancien format) est accepté : ver absent → ver=1 par défaut.""" from jose import jwt as jose_jwt from routers.auth import SECRET_KEY, ALGORITHM from datetime import datetime, timedelta, timezone expire = datetime.now(timezone.utc) + timedelta(hours=1) old_format_token = jose_jwt.encode( {"sub": "admin", "exp": expire}, SECRET_KEY, algorithm=ALGORITHM, ) with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() r = client.get("/api/auth/me", headers=_auth_headers(old_format_token)) assert r.status_code == 200 def test_token_with_wrong_version_rejected(self, client): """Token avec version incorrecte est rejeté.""" from jose import jwt as jose_jwt from routers.auth import SECRET_KEY, ALGORITHM from datetime import datetime, timedelta, timezone expire = datetime.now(timezone.utc) + timedelta(hours=1) bad_token = jose_jwt.encode( {"sub": "admin", "ver": 999, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM, ) r = client.get("/api/auth/me", headers=_auth_headers(bad_token)) assert r.status_code == 401 # --------------------------------------------------------------------------- # SEC-FIX-003 — Validation mot de passe et username # --------------------------------------------------------------------------- class TestValidation: def _get_valid_token(self, client): """Retourne un token valide (must_change_password forcé à 0).""" with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() return _login(client).json()["access_token"] def test_password_too_short_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "Short1"}, headers=_auth_headers(token), ) assert r.status_code == 400 assert r.json()["detail"] == "password_too_short" def test_password_no_digit_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "OnlyLetters"}, headers=_auth_headers(token), ) assert r.status_code == 400 assert r.json()["detail"] == "password_too_weak" def test_password_no_letter_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "12345678"}, headers=_auth_headers(token), ) assert r.status_code == 400 assert r.json()["detail"] == "password_too_weak" def test_valid_password_accepted(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_password": "ValidPass1"}, headers=_auth_headers(token), ) assert r.status_code == 200 def test_username_invalid_chars_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_username": "bad user!"}, headers=_auth_headers(token), ) assert r.status_code == 400 assert r.json()["detail"] == "username_invalid" def test_username_too_long_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_username": "a" * 65}, headers=_auth_headers(token), ) assert r.status_code == 400 assert r.json()["detail"] == "username_invalid" def test_valid_username_accepted(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "admin", "new_username": "admin_user.1"}, headers=_auth_headers(token), ) assert r.status_code == 200 def test_wrong_current_password_rejected(self, client): token = self._get_valid_token(client) r = client.put( "/api/auth/account", json={"current_password": "wrong", "new_password": "NewPass1!"}, headers=_auth_headers(token), ) assert r.status_code == 400 # --------------------------------------------------------------------------- # SEC-FIX-002 — Rate limiting # --------------------------------------------------------------------------- class TestRateLimit: def test_ip_rate_limit_triggers_429(self, client): """Après trop de tentatives par IP, le login retourne 429.""" from routers.auth import _ip_attempts, _rate_lock, _IP_MAX with _rate_lock: _ip_attempts["testclient"] = [__import__("time").time()] * _IP_MAX r = _login(client) assert r.status_code == 429 def test_username_rate_limit_triggers_429(self, client): """Après trop de tentatives par username, le login retourne 429.""" from routers.auth import _login_attempts, _rate_lock, _USERNAME_MAX with _rate_lock: _login_attempts["admin"] = [__import__("time").time()] * _USERNAME_MAX r = _login(client) assert r.status_code == 429 def test_successful_login_clears_username_attempts(self, client): """Login réussi remet à zéro le compteur username.""" from routers.auth import _login_attempts, _rate_lock with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() r = _login(client) assert r.status_code == 200 with _rate_lock: assert "admin" not in _login_attempts