import errno import ipaddress import socket import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional import dns.resolver import dns.reversename from fastapi import APIRouter, HTTPException from pydantic import BaseModel, field_validator router = APIRouter() MAX_HOSTS_PER_TARGET = 1024 # refuse les /21 et plus larges MAX_HOSTS_TOTAL = 4096 # cap global sur l'ensemble des targets class ScanTarget(BaseModel): vlan_id: int cidr: str class ScanRequest(BaseModel): dns_server: str = "8.8.8.8" targets: list[ScanTarget] tcp_check: bool = False @field_validator("dns_server") @classmethod def _dns_server(cls, v: str) -> str: try: ipaddress.ip_address(v) except ValueError: raise ValueError(f"dns_server must be a valid IP address, got: {v!r}") return v class DiscoveredHost(BaseModel): ip: str hostname: Optional[str] = None vlan_id: int cidr: str class ScanResponse(BaseModel): hosts: list[DiscoveredHost] total_scanned: int duration_s: float def _ping(ip: str) -> bool: try: r = subprocess.run( ["ping", "-c", "1", "-W", "1", ip], capture_output=True, timeout=3, ) if r.returncode != 0: return False # Guard against proxy-ARP / gateway false positives: verify the ICMP # reply actually came from the target IP and not an intermediate node. stdout = r.stdout.decode(errors="ignore") return f"from {ip}:" in stdout or f"from {ip} " in stdout except Exception: return False def _ptr_lookup(ip: str, nameserver: str) -> Optional[str]: try: resolver = dns.resolver.Resolver(configure=False) resolver.nameservers = [nameserver] resolver.timeout = 1 resolver.lifetime = 2 rev = dns.reversename.from_address(ip) ans = resolver.resolve(rev, "PTR") return str(ans[0]).rstrip(".") except Exception: return None _TCP_PROBE_PORTS = (22, 80, 443, 8080, 8443) _TCP_PROBE_TIMEOUT = 0.5 # seconds per port def _tcp_check(ip: str) -> bool: # Secondary check after ICMP: some gateways (e.g. UniFi) respond to ICMP # for every IP in the subnet via proxy-ARP, spoofing the source IP so the # source-IP guard in _ping() cannot help. A real host will reply to TCP # (RST = port closed, or accept = port open); a ghost IP gets its SYN # dropped by the gateway → timeout → False. for port in _TCP_PROBE_PORTS: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(_TCP_PROBE_TIMEOUT) try: err = sock.connect_ex((ip, port)) if err == 0 or err == errno.ECONNREFUSED: return True except OSError: pass finally: sock.close() return False def _scan_one(ip: str, dns_server: str, vlan_id: int, cidr: str, tcp_check: bool = False) -> Optional[DiscoveredHost]: if not _ping(ip): return None if tcp_check and not _tcp_check(ip): return None hostname = _ptr_lookup(ip, dns_server) return DiscoveredHost(ip=ip, hostname=hostname, vlan_id=vlan_id, cidr=cidr) class PingRequest(BaseModel): ips: list[str] @field_validator("ips") @classmethod def _ips(cls, v: list[str]) -> list[str]: for ip in v: try: ipaddress.ip_address(ip) except ValueError: raise ValueError(f"Invalid IP address: {ip!r}") return v class PingResult(BaseModel): ip: str alive: bool @router.post("/ping", response_model=list[PingResult]) def ping_many(req: PingRequest): if not req.ips: return [] with ThreadPoolExecutor(max_workers=50) as pool: futures = {pool.submit(_ping, ip): ip for ip in req.ips} results = [PingResult(ip=futures[f], alive=f.result()) for f in as_completed(futures)] return results @router.post("/scan", response_model=ScanResponse) def scan(req: ScanRequest): tasks: list[tuple[str, str, int, str]] = [] for t in req.targets: try: net = ipaddress.ip_network(t.cidr, strict=False) except ValueError: raise HTTPException(400, f"CIDR invalide : {t.cidr}") hosts = list(net.hosts()) if len(hosts) > MAX_HOSTS_PER_TARGET: raise HTTPException( 400, f"Réseau {t.cidr} trop large ({len(hosts)} hôtes). " f"Maximum par target : {MAX_HOSTS_PER_TARGET} hôtes (/22 ou plus petit).", ) for ip in hosts: tasks.append((str(ip), req.dns_server, t.vlan_id, t.cidr)) if not tasks: raise HTTPException(400, "Aucune cible à scanner.") if len(tasks) > MAX_HOSTS_TOTAL: raise HTTPException( 400, f"Trop d'hôtes au total ({len(tasks)}). Maximum global : {MAX_HOSTS_TOTAL}.", ) t0 = time.time() results: list[DiscoveredHost] = [] with ThreadPoolExecutor(max_workers=100) as pool: futures = [pool.submit(_scan_one, *args, tcp_check=req.tcp_check) for args in tasks] for f in as_completed(futures): host = f.result() if host: results.append(host) results.sort(key=lambda h: ipaddress.ip_address(h.ip)) return ScanResponse( hosts=results, total_scanned=len(tasks), duration_s=round(time.time() - t0, 1), )