Files
stupid-simple-network-inven…/backend/main.py
T
olivier 88cf6458d0 Initial commit — Stupid Simple Network Inventory
Application web d'inventaire réseau manuel avec FastAPI, Vue 3 et Docker.
Inclut l'authentification JWT, la découverte ICMP, et la topologie en cards CSS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-17 09:19:19 +02:00

202 lines
7.8 KiB
Python

import os
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from database import engine, Base
from routers import vlans, devices, discovery
from routers.auth import router as auth_router, get_current_user, require_password_changed
def _migrate_vlan_nullable():
"""Make vlans.vlan_id nullable (SQLite can't ALTER COLUMN, so recreate)."""
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vlans'"
)).fetchone():
return
cols = conn.execute(text("PRAGMA table_info(vlans)")).fetchall()
if not any(row[1] == 'vlan_id' and row[3] == 1 for row in cols):
return
conn.execute(text("PRAGMA foreign_keys=OFF"))
conn.execute(text("""
CREATE TABLE vlans_new (
id INTEGER NOT NULL PRIMARY KEY,
vlan_id INTEGER UNIQUE,
name VARCHAR NOT NULL,
cidr VARCHAR,
color VARCHAR
)
"""))
conn.execute(text("INSERT INTO vlans_new SELECT id, vlan_id, name, cidr, color FROM vlans"))
conn.execute(text("DROP TABLE vlans"))
conn.execute(text("ALTER TABLE vlans_new RENAME TO vlans"))
conn.commit()
conn.execute(text("PRAGMA foreign_keys=ON"))
conn.commit()
def _migrate_device_virt_type():
"""Ajoute la colonne virt_type sur devices si absente."""
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='devices'"
)).fetchone():
return
cols = [row[1] for row in conn.execute(text("PRAGMA table_info(devices)")).fetchall()]
if 'virt_type' not in cols:
conn.execute(text("ALTER TABLE devices ADD COLUMN virt_type VARCHAR"))
conn.commit()
def _migrate_device_url():
"""Ajoute la colonne url sur devices si absente."""
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='devices'"
)).fetchone():
return
cols = [row[1] for row in conn.execute(text("PRAGMA table_info(devices)")).fetchall()]
if 'url' not in cols:
conn.execute(text("ALTER TABLE devices ADD COLUMN url VARCHAR"))
conn.commit()
def _migrate_users_must_change_password():
"""Ajoute la colonne must_change_password sur users si absente."""
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)).fetchone():
return
cols = [row[1] for row in conn.execute(text("PRAGMA table_info(users)")).fetchall()]
if 'must_change_password' not in cols:
conn.execute(text(
"ALTER TABLE users ADD COLUMN must_change_password BOOLEAN NOT NULL DEFAULT 0"
))
conn.commit()
def _migrate_users_token_version():
"""Ajoute la colonne token_version sur users si absente."""
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)).fetchone():
return
cols = [row[1] for row in conn.execute(text("PRAGMA table_info(users)")).fetchall()]
if 'token_version' not in cols:
conn.execute(text(
"ALTER TABLE users ADD COLUMN token_version INTEGER NOT NULL DEFAULT 1"
))
conn.commit()
def _migrate_force_admin_password_change():
"""Force must_change_password=1 pour admin utilisant encore le mot de passe bootstrap."""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
with engine.connect() as conn:
if not conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)).fetchone():
return
row = conn.execute(text(
"SELECT hashed_password FROM users WHERE username='admin' AND must_change_password=0"
)).fetchone()
if row and pwd_context.verify("admin", row[0]):
conn.execute(text(
"UPDATE users SET must_change_password=1 WHERE username='admin'"
))
conn.commit()
def _migrate_users():
"""Crée la table users et le compte admin par défaut si absents."""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
initial_password = os.environ.get("INITIAL_ADMIN_PASSWORD", "")
with engine.connect() as conn:
table_exists = conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)).fetchone()
if not table_exists:
conn.execute(text("""
CREATE TABLE users (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
username VARCHAR NOT NULL UNIQUE,
hashed_password VARCHAR NOT NULL,
must_change_password BOOLEAN NOT NULL DEFAULT 0,
token_version INTEGER NOT NULL DEFAULT 1
)
"""))
conn.commit()
count = conn.execute(text("SELECT COUNT(*) FROM users")).fetchone()[0]
if count == 0:
if initial_password:
hashed = pwd_context.hash(initial_password)
must_change = 0
else:
hashed = pwd_context.hash("admin")
must_change = 1
conn.execute(
text("INSERT INTO users (username, hashed_password, must_change_password) VALUES ('admin', :h, :m)"),
{"h": hashed, "m": must_change},
)
conn.commit()
def _migrate_drop_links_table():
"""Supprime la table links (fonctionnalité retirée en phase 3). Idempotent."""
with engine.connect() as conn:
conn.execute(text("PRAGMA foreign_keys=OFF"))
if conn.execute(text(
"SELECT name FROM sqlite_master WHERE type='table' AND name='links'"
)).fetchone():
conn.execute(text("DROP TABLE links"))
conn.commit()
conn.execute(text("PRAGMA foreign_keys=ON"))
conn.commit()
_migrate_vlan_nullable()
_migrate_device_virt_type()
_migrate_device_url()
_migrate_users_must_change_password()
_migrate_users_token_version()
_migrate_force_admin_password_change()
_migrate_drop_links_table()
_migrate_users()
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Network Topology Manager")
# CORS — configurable via ALLOWED_ORIGINS env var (comma-separated).
# Default "*" for backward compatibility in a behind-proxy deployment.
# Production: set ALLOWED_ORIGINS="" to disable, or "https://yourdomain.com".
_allowed_origins_env = os.environ.get("ALLOWED_ORIGINS", "*")
if _allowed_origins_env.strip() == "*":
_origins = ["*"]
elif _allowed_origins_env.strip() == "":
_origins = []
else:
_origins = [o.strip() for o in _allowed_origins_env.split(",") if o.strip()]
if _origins:
app.add_middleware(
CORSMiddleware,
allow_origins=_origins,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
app.include_router(vlans.router, prefix="/api/vlans", tags=["vlans"], dependencies=[Depends(require_password_changed)])
app.include_router(devices.router, prefix="/api/devices", tags=["devices"], dependencies=[Depends(require_password_changed)])
app.include_router(discovery.router, prefix="/api/discovery", tags=["discovery"], dependencies=[Depends(require_password_changed)])
@app.get("/api/health")
def health():
return {"status": "ok"}