feat: add optional TCP check to scan to filter proxy-ARP false positives

Some gateways (e.g. UniFi) respond to ICMP for every IP in a subnet via
proxy-ARP, spoofing the source IP so the existing ICMP guard cannot help.
A secondary TCP probe (ports 22, 80, 443, 8080, 8443) distinguishes real
hosts (RST/connect on closed ports) from ghost IPs (gateway drops SYN →
timeout). The check is opt-in (disabled by default) to avoid missing
devices whose firewall DROPs all probed ports.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-18 11:17:55 +02:00
parent 14de657deb
commit cc716783ea
3 changed files with 53 additions and 2 deletions
+31 -2
View File
@@ -1,4 +1,6 @@
import errno
import ipaddress
import socket
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -23,6 +25,7 @@ class ScanTarget(BaseModel):
class ScanRequest(BaseModel):
dns_server: str = "8.8.8.8"
targets: list[ScanTarget]
tcp_check: bool = False
@field_validator("dns_server")
@classmethod
@@ -77,9 +80,35 @@ def _ptr_lookup(ip: str, nameserver: str) -> Optional[str]:
return None
def _scan_one(ip: str, dns_server: str, vlan_id: int, cidr: str) -> Optional[DiscoveredHost]:
_TCP_PROBE_PORTS = (22, 80, 443, 8080, 8443)
_TCP_PROBE_TIMEOUT = 0.5 # seconds per port
def _tcp_check(ip: str) -> bool:
# Secondary check after ICMP: some gateways (e.g. UniFi) respond to ICMP
# for every IP in the subnet via proxy-ARP, spoofing the source IP so the
# source-IP guard in _ping() cannot help. A real host will reply to TCP
# (RST = port closed, or accept = port open); a ghost IP gets its SYN
# dropped by the gateway → timeout → False.
for port in _TCP_PROBE_PORTS:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(_TCP_PROBE_TIMEOUT)
try:
err = sock.connect_ex((ip, port))
if err == 0 or err == errno.ECONNREFUSED:
return True
except OSError:
pass
finally:
sock.close()
return False
def _scan_one(ip: str, dns_server: str, vlan_id: int, cidr: str, tcp_check: bool = False) -> Optional[DiscoveredHost]:
if not _ping(ip):
return None
if tcp_check and not _tcp_check(ip):
return None
hostname = _ptr_lookup(ip, dns_server)
return DiscoveredHost(ip=ip, hostname=hostname, vlan_id=vlan_id, cidr=cidr)
@@ -146,7 +175,7 @@ def scan(req: ScanRequest):
results: list[DiscoveredHost] = []
with ThreadPoolExecutor(max_workers=100) as pool:
futures = [pool.submit(_scan_one, *args) for args in tasks]
futures = [pool.submit(_scan_one, *args, tcp_check=req.tcp_check) for args in tasks]
for f in as_completed(futures):
host = f.result()
if host: