Files
stupid-simple-network-inven…/backend/tests/test_validation.py
T
olivier 88cf6458d0 Initial commit — Stupid Simple Network Inventory
Application web d'inventaire réseau manuel avec FastAPI, Vue 3 et Docker.
Inclut l'authentification JWT, la découverte ICMP, et la topologie en cards CSS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-17 09:19:19 +02:00

246 lines
9.6 KiB
Python

"""
Tests de validation — Phase 3
Couvre :
- SEC-FIX-006 : validation des entrées discovery (dns_server, ips, cap global)
- SEC-FIX-007 : validators Pydantic sur VlanCreate et DeviceCreate
- SEC-FIX-013 : PRAGMA foreign_keys=ON (test indirect via FK constraint)
- SEC-FIX-017 : suppression code orphelin Links (endpoint /api/links inexistant)
"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import text
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database import engine, Base
from main import app, _migrate_users_must_change_password, _migrate_users_token_version, _migrate_users
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_db():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
_migrate_users_must_change_password()
_migrate_users_token_version()
_migrate_users()
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture(autouse=True)
def reset_rate_limits():
from routers.auth import _ip_attempts, _login_attempts, _rate_lock
with _rate_lock:
_ip_attempts.clear()
_login_attempts.clear()
yield
with _rate_lock:
_ip_attempts.clear()
_login_attempts.clear()
@pytest.fixture
def client():
return TestClient(app, raise_server_exceptions=True)
def _get_token(client):
"""Retourne un token admin valide avec must_change_password=0."""
with engine.connect() as conn:
conn.execute(text("UPDATE users SET must_change_password=0 WHERE username='admin'"))
conn.commit()
r = client.post("/api/auth/login", data={"username": "admin", "password": "admin"})
return r.json()["access_token"]
def _auth(token):
return {"Authorization": f"Bearer {token}"}
# ---------------------------------------------------------------------------
# SEC-FIX-007 — Validation VlanCreate
# ---------------------------------------------------------------------------
class TestVlanValidation:
def test_vlan_id_out_of_range_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 0}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_id_max_boundary_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 4095}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_id_valid_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "vlan_id": 100, "color": "#AABBCC"}, headers=_auth(token))
assert r.status_code == 200
def test_vlan_empty_name_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": " "}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_invalid_cidr_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "cidr": "not-a-cidr"}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_valid_cidr_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "cidr": "192.168.1.0/24", "color": "#AABBCC"}, headers=_auth(token))
assert r.status_code == 200
def test_vlan_invalid_color_rejected(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "color": "blue"}, headers=_auth(token))
assert r.status_code == 422
def test_vlan_valid_color_accepted(self, client):
token = _get_token(client)
r = client.post("/api/vlans/", json={"name": "Test", "color": "#1a2B3c"}, headers=_auth(token))
assert r.status_code == 200
# ---------------------------------------------------------------------------
# SEC-FIX-007 — Validation DeviceCreate
# ---------------------------------------------------------------------------
class TestDeviceValidation:
def test_invalid_type_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "type": "supercomputer"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_type_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "type": "server"}, headers=_auth(token))
assert r.status_code == 200
def test_invalid_virt_type_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "virt_type": "docker"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_virt_type_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "virt_type": "lxc"}, headers=_auth(token))
assert r.status_code == 200
def test_invalid_url_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "url": "ftp://bad"}, headers=_auth(token))
assert r.status_code == 422
def test_valid_url_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": "srv", "url": "https://192.168.1.1:8006"}, headers=_auth(token))
assert r.status_code == 200
def test_empty_name_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={"name": ""}, headers=_auth(token))
assert r.status_code == 422
def test_invalid_interface_ip_rejected(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={
"name": "srv",
"interfaces": [{"name": "eth0", "ip_address": "not-an-ip"}]
}, headers=_auth(token))
assert r.status_code == 422
def test_valid_interface_ip_accepted(self, client):
token = _get_token(client)
r = client.post("/api/devices/", json={
"name": "srv",
"interfaces": [{"name": "eth0", "ip_address": "10.0.0.1"}]
}, headers=_auth(token))
assert r.status_code == 200
# ---------------------------------------------------------------------------
# SEC-FIX-006 — Validation discovery
# ---------------------------------------------------------------------------
class TestDiscoveryValidation:
def test_invalid_dns_server_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "not-an-ip",
"targets": [{"vlan_id": 1, "cidr": "192.168.1.0/24"}]
}, headers=_auth(token))
assert r.status_code == 422
def test_invalid_ping_ip_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/ping", json={"ips": ["1.2.3.4", "bad-ip"]}, headers=_auth(token))
assert r.status_code == 422
def test_valid_ping_ips_accepted(self, client):
token = _get_token(client)
r = client.post("/api/discovery/ping", json={"ips": []}, headers=_auth(token))
assert r.status_code == 200
def test_scan_oversized_cidr_rejected(self, client):
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "8.8.8.8",
"targets": [{"vlan_id": 1, "cidr": "10.0.0.0/8"}]
}, headers=_auth(token))
assert r.status_code == 400
def test_scan_global_cap_rejected(self, client):
"""Plusieurs targets dont le total dépasse MAX_HOSTS_TOTAL."""
token = _get_token(client)
r = client.post("/api/discovery/scan", json={
"dns_server": "8.8.8.8",
"targets": [
{"vlan_id": 1, "cidr": "10.0.0.0/22"}, # 1022 hôtes
{"vlan_id": 2, "cidr": "10.1.0.0/22"}, # 1022 hôtes
{"vlan_id": 3, "cidr": "10.2.0.0/22"}, # 1022 hôtes
{"vlan_id": 4, "cidr": "10.3.0.0/22"}, # 1022 hôtes
{"vlan_id": 5, "cidr": "10.4.0.0/22"}, # 1022 hôtes → total > 4096
]
}, headers=_auth(token))
assert r.status_code == 400
assert "total" in r.json()["detail"].lower()
# ---------------------------------------------------------------------------
# SEC-FIX-017 — Endpoint /api/links absent
# ---------------------------------------------------------------------------
class TestLinksEndpointRemoved:
def test_links_list_returns_404(self, client):
"""Le routeur /api/links a été supprimé en phase 3."""
token = _get_token(client)
r = client.get("/api/links/", headers=_auth(token))
assert r.status_code == 404
def test_links_create_returns_404(self, client):
token = _get_token(client)
r = client.post("/api/links/", json={
"source_device_id": 1, "target_device_id": 2, "link_type": "trunk"
}, headers=_auth(token))
assert r.status_code == 404
# ---------------------------------------------------------------------------
# SEC-FIX-013 — PRAGMA foreign_keys=ON
# ---------------------------------------------------------------------------
class TestForeignKeys:
def test_foreign_keys_pragma_is_on(self):
"""Vérifie que PRAGMA foreign_keys=ON est actif sur chaque connexion."""
with engine.connect() as conn:
result = conn.execute(text("PRAGMA foreign_keys")).fetchone()
assert result[0] == 1