Actualiser scripts/pfSense/pyfrc2g.py

This commit is contained in:
2025-11-22 13:21:57 +01:00
parent fc60e89c14
commit 948668bdb7

View File

@@ -6,17 +6,30 @@ import re
from collections import OrderedDict from collections import OrderedDict
from graphviz import Digraph from graphviz import Digraph
import os import os
import glob
import csv import csv
import logging
import shutil
from config import INTERFACE_MAP, NET_MAP, ADDRESS_MAP, PORT_MAP from config import INTERFACE_MAP, NET_MAP, ADDRESS_MAP, PORT_MAP
import hashlib
logging.basicConfig(level=logging.INFO)
# --- CONFIG --- # --- CONFIG ---
PFS_URL = "https://VOTRE_PASSERELLE/api/v2/firewall/rules" # pfSense
PFS_TOKEN = "VOTRE_CLE" PFS_URL = "https://PFS_ADDRESS/api/v2/firewall/rules"
FICHIER_CSV = "output_pfs01.csv" # fichier de sortie CSV PFS_TOKEN = "<API_TOKEN>"
PASSERELLE = "<GW_NAME>"
FICHIER_CSV = "output_"+PASSERELLE+".csv"
GRAPH_OUTPUT_DIR = "tmp/graphs_"+PASSERELLE
# ===================================== def md5sum(path):
# FONCTIONS md5 = hashlib.md5()
# ===================================== with open(path, "rb") as f:
# Lire le fichier par blocs pour éviter de saturer la mémoire
for chunk in iter(lambda: f.read(4096), b""):
md5.update(chunk)
return md5.hexdigest()
def recup_regles(url, token): def recup_regles(url, token):
try: try:
@@ -55,10 +68,23 @@ def normalize_ports(port_field):
return "Any" return "Any"
return re.sub(r'\s+', '', port_field.strip()) or "Any" return re.sub(r'\s+', '', port_field.strip()) or "Any"
# ===================================== def export_to_ciso(url,token,fichier):
# 🎨 FONCTION : GÉNÉRATION DES GRAPHES upload_url = url
# ===================================== upload_headers = {
def parse_csv_and_generate(csv_path, output_dir="graphs"): 'Authorization': f'Token {token}',
'accept': 'application/json',
'Content-Type': 'document',
'Content-Disposition': f'attachment; filename={fichier}'
}
file_path = fichier
with open(file_path, 'rb') as file:
response = requests.post(upload_url, headers=upload_headers, data=file, verify=False)
if response.status_code == 200:
return True
else:
return False
def parse_csv_and_generate(csv_path, output_dir):
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
flux_par_passerelle = OrderedDict() flux_par_passerelle = OrderedDict()
next_id = 0 next_id = 0
@@ -81,8 +107,12 @@ def parse_csv_and_generate(csv_path, output_dir="graphs"):
with open(csv_path, newline='', encoding='utf-8') as f: with open(csv_path, newline='', encoding='utf-8') as f:
reader = csv.DictReader(f) reader = csv.DictReader(f)
for row in reader: for row in reader:
floating = (row.get("FLOTTANT") or "").strip()
source = (row.get("SOURCE") or "").strip() source = (row.get("SOURCE") or "").strip()
if floating == "False":
passerelle = (row.get("PASSERELLE") or "").strip() passerelle = (row.get("PASSERELLE") or "").strip()
else:
passerelle = "Règles flottantes"
action = (row.get("ACTION") or "").strip().upper() action = (row.get("ACTION") or "").strip().upper()
protocole = (row.get("PROTOCOLE") or "").strip() or "Any" protocole = (row.get("PROTOCOLE") or "").strip() or "Any"
ports = normalize_ports(row.get("PORT")) ports = normalize_ports(row.get("PORT"))
@@ -136,7 +166,7 @@ def parse_csv_and_generate(csv_path, output_dir="graphs"):
g.attr("node", fontname="Helvetica,Arial,sans-serif", fontsize="11", shape="record") g.attr("node", fontname="Helvetica,Arial,sans-serif", fontsize="11", shape="record")
g.attr("edge", fontname="Helvetica,Arial,sans-serif") g.attr("edge", fontname="Helvetica,Arial,sans-serif")
g.attr(rankdir="LR") g.attr(rankdir="LR")
g.attr(label=f"PASSERELLE : {passerelle} INTERFACE", labelloc="t", fontsize="14", color="#8888ff") g.attr(label=f"<<b>PASSERELLE : {passerelle} INTERFACE</b>>", labelloc="t", fontsize="14", color="#8888ff")
for source, cluster in sources.items(): for source, cluster in sources.items():
with g.subgraph(name=f"cluster_{source.replace(' ', '_')}") as sg: with g.subgraph(name=f"cluster_{source.replace(' ', '_')}") as sg:
@@ -146,37 +176,119 @@ def parse_csv_and_generate(csv_path, output_dir="graphs"):
for src, dst in cluster["edges"]: for src, dst in cluster["edges"]:
sg.edge(src, dst) sg.edge(src, dst)
g.render(view=False) output_path = g.render(view=False)
print(f"✅ Graph généré : {filename}.png") # Suppression du fichier .gv après rendu
total_nodes = sum(len(c["nodes"]) for c in sources.values()) try:
total_edges = sum(len(c["edges"]) for c in sources.values()) if os.path.exists(filename):
print(f" - {total_nodes} nœuds") os.remove(filename)
print(f" - {total_edges} arêtes") print(f"🗑️ Fichier temporaire supprimé : {filename}")
except Exception as e:
print(f"⚠️ Impossible de supprimer {filename} : {e}")
print(f"✅ Graph généré : {filename}.png")
try:
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
from reportlab.lib.utils import ImageReader
# Récupération des PNG
png_files = sorted(glob.glob(os.path.join(output_dir, "*.png")))
if not png_files:
print("⚠️ Aucun fichier PNG trouvé pour le PDF.")
return
pdf_path = os.path.join(output_dir, PASSERELLE+"_MATRICE_DES_FLUX.pdf")
# Création PDF
c = canvas.Canvas(pdf_path, pagesize=A4)
width, height = A4
c.setTitle(f"Matrice des flux de la passerelle {PASSERELLE} ")
for i, png in enumerate(png_files):
# --- Titre / chapitre = nom du fichier ---
titre_page = os.path.basename(png).replace(".gv.png", "")
# Ajout du signet PDF
c.bookmarkPage(titre_page)
c.addOutlineEntry(titre_page, titre_page, level=0)
# Chargement de l'image
img = ImageReader(png)
img_width, img_height = img.getSize()
# Mise à léchelle automatique
scale = min(width / img_width, height / img_height)
new_width = img_width * scale
new_height = img_height * scale
# Centrage
x = (width - new_width) / 2
y = (height - new_height) / 2
# Dessin
c.drawImage(img, x, y, width=new_width, height=new_height)
c.showPage()
c.save()
print(f"📄 PDF avec chapitres généré : {pdf_path}")
# Suppression des fichiers PNG
try:
for png in png_files:
if os.path.exists(png):
os.remove(png)
print(f"🗑️ PNG supprimé : {png}")
except Exception as e:
print(f"⚠️ Impossible de supprimer certains PNG : {e}")
except Exception as e:
print(f"⚠️ Erreur lors de la génération du PDF : {e}")
# --- EXTRACTION DES DONNÉES --- # --- EXTRACTION DES DONNÉES ---
data = recup_regles(PFS_URL, PFS_TOKEN) data = recup_regles(PFS_URL, PFS_TOKEN)
entries = data.get("data", []) entries = data.get("data", [])
# --- CRÉATION DU CSV --- if entries:
with open(FICHIER_CSV, "w", newline="", encoding="utf-8") as f: # --- CRÉATION DU CSV ---
with open(FICHIER_CSV, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter( writer = csv.DictWriter(
f, f,
fieldnames=["SOURCE", "PASSERELLE", "ACTION", "PROTOCOLE", "PORT", "DESTINATION", "COMMENTAIRE","DESACTIVE"] fieldnames=["SOURCE", "PASSERELLE", "ACTION", "PROTOCOLE", "PORT", "DESTINATION", "COMMENTAIRE","DESACTIVE","FLOTTANT"]
) )
writer.writeheader() writer.writeheader()
for entry in entries: for entry in entries:
writer.writerow({ writer.writerow({
"SOURCE": safe_value(entry.get("source"), "source"), "SOURCE": safe_value(entry.get("source"), "source"),
"PASSERELLE": "VOTRE_PASSERELLE/"+safe_value(entry.get("interface"), "interface"), "PASSERELLE": "PFS01/"+safe_value(entry.get("interface"), "interface"),
"ACTION": safe_value(entry.get("type")), "ACTION": safe_value(entry.get("type")),
"PROTOCOLE": safe_value(entry.get("protocol")), "PROTOCOLE": safe_value(entry.get("protocol")),
"PORT": safe_value(entry.get("destination_port"), "destination_port"), "PORT": safe_value(entry.get("destination_port"), "destination_port"),
"DESTINATION": safe_value(entry.get("destination"), "destination"), "DESTINATION": safe_value(entry.get("destination"), "destination"),
"COMMENTAIRE": safe_value(entry.get("descr")), "COMMENTAIRE": safe_value(entry.get("descr")),
"DESACTIVE": safe_value(entry.get("disabled")) "DESACTIVE": safe_value(entry.get("disabled")),
"FLOTTANT": safe_value(entry.get("floating"))
}) })
parse_csv_and_generate(FICHIER_CSV) print(f"✅ Fichier CSV généré : {FICHIER_CSV}")
print(f"✅ Fichier CSV généré : {FICHIER_CSV}") # Récupération de la précédente somme md5sum
with open("md5sum.txt", "r") as f:
prev_md5sum = f.readline().strip() # .strip() enlève les retours à la ligne
# Génération de la somme md5sum du fichier csv généré
actual_md5sum = md5sum(FICHIER_CSV)
# Comparaison des sommes md5sum.
# Si différentes => génération de la matrice.
# Si identique => arrêt du script.
if prev_md5sum != actual_md5sum:
with open("md5sum.txt", "w") as f:
f.write(actual_md5sum + "\n")
parse_csv_and_generate(FICHIER_CSV,GRAPH_OUTPUT_DIR)
else:
logging.info("Pas de règles crées ou modifiées")
else:
logging.info("Le script n'a récupéré aucune règle firewall")