""" Tests de validation — Phase 3 Couvre : - SEC-FIX-006 : validation des entrées discovery (dns_server, ips, cap global) - SEC-FIX-007 : validators Pydantic sur VlanCreate et DeviceCreate - SEC-FIX-013 : PRAGMA foreign_keys=ON (test indirect via FK constraint) - SEC-FIX-017 : suppression code orphelin Links (endpoint /api/links inexistant) """ import pytest from fastapi.testclient import TestClient from sqlalchemy import text import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from database import engine, Base from main import app, _migrate_users_must_change_password, _migrate_users_token_version, _migrate_users # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def reset_db(): Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine) _migrate_users_must_change_password() _migrate_users_token_version() _migrate_users() yield Base.metadata.drop_all(bind=engine) @pytest.fixture(autouse=True) def reset_rate_limits(): from routers.auth import _ip_attempts, _login_attempts, _rate_lock with _rate_lock: _ip_attempts.clear() _login_attempts.clear() yield with _rate_lock: _ip_attempts.clear() _login_attempts.clear() @pytest.fixture def client(): return TestClient(app, raise_server_exceptions=True) def _get_token(client): """Retourne un token admin valide avec must_change_password=0.""" with engine.connect() as conn: conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'")) conn.commit() r = client.post("/api/auth/login", data={"username": "admin", "password": "admin"}) return r.json()["access_token"] def _auth(token): return {"Authorization": f"Bearer {token}"} # --------------------------------------------------------------------------- # SEC-FIX-007 — Validation VlanCreate # --------------------------------------------------------------------------- class TestVlanValidation: def test_vlan_id_out_of_range_rejected(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 0}, headers=_auth(token)) assert r.status_code == 422 def test_vlan_id_max_boundary_rejected(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 4095}, headers=_auth(token)) assert r.status_code == 422 def test_vlan_id_valid_accepted(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 100, "color": "#AABBCC"}, headers=_auth(token)) assert r.status_code == 200 def test_vlan_empty_name_rejected(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": " "}, headers=_auth(token)) assert r.status_code == 422 def test_vlan_invalid_cidr_rejected(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "cidr": "not-a-cidr"}, headers=_auth(token)) assert r.status_code == 422 def test_vlan_valid_cidr_accepted(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "cidr": "192.168.1.0/24", "color": "#AABBCC"}, headers=_auth(token)) assert r.status_code == 200 def test_vlan_invalid_color_rejected(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "color": "blue"}, headers=_auth(token)) assert r.status_code == 422 def test_vlan_valid_color_accepted(self, client): token = _get_token(client) r = client.post("/api/vlans/", json={"name": "Test", "color": "#1a2B3c"}, headers=_auth(token)) assert r.status_code == 200 # --------------------------------------------------------------------------- # SEC-FIX-007 — Validation DeviceCreate # --------------------------------------------------------------------------- class TestDeviceValidation: def test_invalid_type_rejected(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "type": "supercomputer"}, headers=_auth(token)) assert r.status_code == 422 def test_valid_type_accepted(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "type": "server"}, headers=_auth(token)) assert r.status_code == 200 def test_invalid_virt_type_rejected(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "virt_type": "docker"}, headers=_auth(token)) assert r.status_code == 422 def test_valid_virt_type_accepted(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "virt_type": "lxc"}, headers=_auth(token)) assert r.status_code == 200 def test_invalid_url_rejected(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "url": "ftp://bad"}, headers=_auth(token)) assert r.status_code == 422 def test_valid_url_accepted(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": "srv", "url": "https://192.168.1.1:8006"}, headers=_auth(token)) assert r.status_code == 200 def test_empty_name_rejected(self, client): token = _get_token(client) r = client.post("/api/devices/", json={"name": ""}, headers=_auth(token)) assert r.status_code == 422 def test_invalid_interface_ip_rejected(self, client): token = _get_token(client) r = client.post("/api/devices/", json={ "name": "srv", "interfaces": [{"name": "eth0", "ip_address": "not-an-ip"}] }, headers=_auth(token)) assert r.status_code == 422 def test_valid_interface_ip_accepted(self, client): token = _get_token(client) r = client.post("/api/devices/", json={ "name": "srv", "interfaces": [{"name": "eth0", "ip_address": "10.0.0.1"}] }, headers=_auth(token)) assert r.status_code == 200 # --------------------------------------------------------------------------- # SEC-FIX-006 — Validation discovery # --------------------------------------------------------------------------- class TestDiscoveryValidation: def test_invalid_dns_server_rejected(self, client): token = _get_token(client) r = client.post("/api/discovery/scan", json={ "dns_server": "not-an-ip", "targets": [{"vlan_id": 1, "cidr": "192.168.1.0/24"}] }, headers=_auth(token)) assert r.status_code == 422 def test_invalid_ping_ip_rejected(self, client): token = _get_token(client) r = client.post("/api/discovery/ping", json={"ips": ["1.2.3.4", "bad-ip"]}, headers=_auth(token)) assert r.status_code == 422 def test_valid_ping_ips_accepted(self, client): token = _get_token(client) r = client.post("/api/discovery/ping", json={"ips": []}, headers=_auth(token)) assert r.status_code == 200 def test_scan_oversized_cidr_rejected(self, client): token = _get_token(client) r = client.post("/api/discovery/scan", json={ "dns_server": "8.8.8.8", "targets": [{"vlan_id": 1, "cidr": "10.0.0.0/8"}] }, headers=_auth(token)) assert r.status_code == 400 def test_scan_global_cap_rejected(self, client): """Plusieurs targets dont le total dépasse MAX_HOSTS_TOTAL.""" token = _get_token(client) r = client.post("/api/discovery/scan", json={ "dns_server": "8.8.8.8", "targets": [ {"vlan_id": 1, "cidr": "10.0.0.0/22"}, # 1022 hôtes {"vlan_id": 2, "cidr": "10.1.0.0/22"}, # 1022 hôtes {"vlan_id": 3, "cidr": "10.2.0.0/22"}, # 1022 hôtes {"vlan_id": 4, "cidr": "10.3.0.0/22"}, # 1022 hôtes {"vlan_id": 5, "cidr": "10.4.0.0/22"}, # 1022 hôtes → total > 4096 ] }, headers=_auth(token)) assert r.status_code == 400 assert "total" in r.json()["detail"].lower() # --------------------------------------------------------------------------- # SEC-FIX-017 — Endpoint /api/links absent # --------------------------------------------------------------------------- class TestLinksEndpointRemoved: def test_links_list_returns_404(self, client): """Le routeur /api/links a été supprimé en phase 3.""" token = _get_token(client) r = client.get("/api/links/", headers=_auth(token)) assert r.status_code == 404 def test_links_create_returns_404(self, client): token = _get_token(client) r = client.post("/api/links/", json={ "source_device_id": 1, "target_device_id": 2, "link_type": "trunk" }, headers=_auth(token)) assert r.status_code == 404 # --------------------------------------------------------------------------- # SEC-FIX-013 — PRAGMA foreign_keys=ON # --------------------------------------------------------------------------- class TestForeignKeys: def test_foreign_keys_pragma_is_on(self): """Vérifie que PRAGMA foreign_keys=ON est actif sur chaque connexion.""" with engine.connect() as conn: result = conn.execute(text("PRAGMA foreign_keys")).fetchone() assert result[0] == 1