Files
stupid-simple-network-inven…/backend/tests/test_validation.py
T
olivier e8ca10f1b7 fix: cap /api/discovery/ping at 4096 IPs and fix test suite
- Add MAX_PING_IPS=4096 constant and validate list size in PingRequest
  before spawning futures, returning 422 on overflow
- Add test_ping_too_many_ips_rejected to cover the new cap
- Pin httpx<0.28 in requirements-test.txt (0.28 broke TestClient API)
- Fix reset_db fixture to set a known admin password regardless of
  INITIAL_ADMIN_PASSWORD env var (was causing 401 on all auth tests)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 18:16:08 +02:00

261 lines
10 KiB
Python

"""
Tests de validation — Phase 3
Couvre :
- SEC-FIX-006 : validation des entrées discovery (dns_server, ips, cap global)
- SEC-FIX-007 : validators Pydantic sur VlanCreate et DeviceCreate
- SEC-FIX-013 : PRAGMA foreign_keys=ON (test indirect via FK constraint)
- SEC-FIX-017 : suppression code orphelin Links (endpoint /api/links inexistant)
"""
import pytest
from fastapi.testclient import TestClient
from passlib.context import CryptContext
from sqlalchemy import text
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database import engine, Base
from main import app, _migrate_users_must_change_password, _migrate_users_token_version, _migrate_users
_TEST_PASSWORD = "admin-test"
_pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_db():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
_migrate_users_must_change_password()
_migrate_users_token_version()
_migrate_users()
# Force a known password regardless of INITIAL_ADMIN_PASSWORD env var
with engine.connect() as conn:
hashed = _pwd_ctx.hash(_TEST_PASSWORD)
conn.execute(text(f"UPDATE users SET hashed_password=:h WHERE username='admin'"), {"h": hashed})
conn.commit()
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture(autouse=True)
def reset_rate_limits():
from routers.auth import _ip_attempts, _login_attempts, _rate_lock
with _rate_lock:
_ip_attempts.clear()
_login_attempts.clear()
yield
with _rate_lock:
_ip_attempts.clear()
_login_attempts.clear()
@pytest.fixture
def client():
return TestClient(app, raise_server_exceptions=True)
def _get_token(client):
"""Retourne un token admin valide avec must_change_password=0."""
with engine.connect() as conn:
conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'"))
conn.commit()
r = client.post("/api/auth/login", data={"username": "admin", "password": _TEST_PASSWORD})
return r.json()["access_token"]
def _auth(token):
return {"Authorization": f"Bearer {token}"}
# ---------------------------------------------------------------------------
# SEC-FIX-007 — Validation VlanCreate
# ---------------------------------------------------------------------------
class TestVlanValidation:
def test_vlan_id_out_of_range_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 0}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_id_max_boundary_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 4095}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_id_valid_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 100, "color": "#AABBCC"}, headers=_auth(token))
assert r.status_code == 200
def test_vlan_empty_name_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": " "}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_invalid_cidr_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "cidr": "not-a-cidr"}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_valid_cidr_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "cidr": "192.168.1.0/24", "color": "#AABBCC"}, headers=_auth(token))
assert r.status_code == 200
def test_vlan_invalid_color_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "color": "blue"}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_valid_color_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "color": "#1a2B3c"}, headers=_auth(token))
assert r.status_code == 200
# ---------------------------------------------------------------------------
# SEC-FIX-007 — Validation DeviceCreate
# ---------------------------------------------------------------------------
class TestDeviceValidation:
def test_invalid_type_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "type": "supercomputer"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_type_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "type": "server"}, headers=_auth(token))
assert r.status_code == 200
def test_invalid_virt_type_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "virt_type": "docker"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_virt_type_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "virt_type": "lxc"}, headers=_auth(token))
assert r.status_code == 200
def test_invalid_url_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "url": "ftp://bad"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_url_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "url": "https://192.168.1.1:8006"}, headers=_auth(token))
assert r.status_code == 200
def test_empty_name_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": ""}, headers=_auth(token))
assert r.status_code == 422
def test_invalid_interface_ip_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={
"name": "srv",
"interfaces": [{"name": "eth0", "ip_address": "not-an-ip"}]
}, headers=_auth(token))
assert r.status_code == 422
def test_valid_interface_ip_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={
"name": "srv",
"interfaces": [{"name": "eth0", "ip_address": "10.0.0.1"}]
}, headers=_auth(token))
assert r.status_code == 200
# ---------------------------------------------------------------------------
# SEC-FIX-006 — Validation discovery
# ---------------------------------------------------------------------------
class TestDiscoveryValidation:
def test_invalid_dns_server_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "not-an-ip",
"targets": [{"vlan_id": 1, "cidr": "192.168.1.0/24"}]
}, headers=_auth(token))
assert r.status_code == 422
def test_invalid_ping_ip_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/ping", json={"ips": ["1.2.3.4", "bad-ip"]}, headers=_auth(token))
assert r.status_code == 422
def test_valid_ping_ips_accepted(self, client):
token = _get_token(client)
r = client.post("/api/discovery/ping", json={"ips": []}, headers=_auth(token))
assert r.status_code == 200
def test_ping_too_many_ips_rejected(self, client):
token = _get_token(client)
ips = [f"10.0.{i // 256}.{i % 256}" for i in range(4097)]
r = client.post("/api/discovery/ping", json={"ips": ips}, headers=_auth(token))
assert r.status_code == 422
def test_scan_oversized_cidr_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "8.8.8.8",
"targets": [{"vlan_id": 1, "cidr": "10.0.0.0/8"}]
}, headers=_auth(token))
assert r.status_code == 400
def test_scan_global_cap_rejected(self, client):
"""Plusieurs targets dont le total dépasse MAX_HOSTS_TOTAL."""
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "8.8.8.8",
"targets": [
{"vlan_id": 1, "cidr": "10.0.0.0/22"}, # 1022 hôtes
{"vlan_id": 2, "cidr": "10.1.0.0/22"}, # 1022 hôtes
{"vlan_id": 3, "cidr": "10.2.0.0/22"}, # 1022 hôtes
{"vlan_id": 4, "cidr": "10.3.0.0/22"}, # 1022 hôtes
{"vlan_id": 5, "cidr": "10.4.0.0/22"}, # 1022 hôtes → total > 4096
]
}, headers=_auth(token))
assert r.status_code == 400
assert "total" in r.json()["detail"].lower()
# ---------------------------------------------------------------------------
# SEC-FIX-017 — Endpoint /api/links absent
# ---------------------------------------------------------------------------
class TestLinksEndpointRemoved:
def test_links_list_returns_404(self, client):
"""Le routeur /api/links a été supprimé en phase 3."""
token = _get_token(client)
r = client.get("/api/links/", headers=_auth(token))
assert r.status_code == 404
def test_links_create_returns_404(self, client):
token = _get_token(client)
r = client.post("/api/links/", json={
"source_device_id": 1, "target_device_id": 2, "link_type": "trunk"
}, headers=_auth(token))
assert r.status_code == 404
# ---------------------------------------------------------------------------
# SEC-FIX-013 — PRAGMA foreign_keys=ON
# ---------------------------------------------------------------------------
class TestForeignKeys:
def test_foreign_keys_pragma_is_on(self):
"""Vérifie que PRAGMA foreign_keys=ON est actif sur chaque connexion."""
with engine.connect() as conn:
result = conn.execute(text("PRAGMA foreign_keys")).fetchone()
assert result[0] == 1