diff --git a/scripts/OPNSense/pyfrc2g-ciso_assist.py b/scripts/OPNSense/pyfrc2g-ciso_assist.py index e69de29..42b2ac6 100644 --- a/scripts/OPNSense/pyfrc2g-ciso_assist.py +++ b/scripts/OPNSense/pyfrc2g-ciso_assist.py @@ -0,0 +1,325 @@ +import requests +import json +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +import re +from collections import OrderedDict +from graphviz import Digraph +import os +import logging +import glob +import csv +from config import INTERFACE_MAP, NET_MAP, ADDRESS_MAP, PORT_MAP +import hashlib + +logging.basicConfig(level=logging.INFO) + +# --- CONFIG --- +OPNS_URL = "https:///upload/" + +def md5sum(path): + md5 = hashlib.md5() + with open(path, "rb") as f: + # Lire le fichier par blocs pour éviter de saturer la mémoire + for chunk in iter(lambda: f.read(4096), b""): + md5.update(chunk) + return md5.hexdigest() + +def recup_regles(url, api_secret, api_key, params): + try: + #headers = {"accept": "application/json", "X-API-Key": token} + reponse = requests.get( + url, + params=params, + auth=(api_key, api_secret), # même ordre que -u "SECRET:KEY" + verify=False # équivalent de curl -k + ) + return reponse.json() + except ValueError: + print("Échec de la connexion:", reponse.status_code, reponse.text) + exit() + +def safe_value(value, field=None): + # if value is None: + # print("c'est None") + # return "Any" + if isinstance(value, list): + value = ", ".join(map(str, value)) + if field in ("source", "interface"): + val = str(value).lower() + if val in INTERFACE_MAP: + return INTERFACE_MAP[val] + if str(field) in ("destination_port"): + val = str(value) + if val in PORT_MAP: + return PORT_MAP[val] + if str(field) in ("destination"): + val = str(value).lower() + if val in NET_MAP: + return NET_MAP[val] + if str(field) in ("destination"): + val = str(value).lower() + if val in ADDRESS_MAP: + return ADDRESS_MAP[val] + return value + +def normalize_ports(port_field): + if not port_field: + return "Any" + return re.sub(r'\s+', '', port_field.strip()) or "Any" + +def export_to_ciso(url,token,fichier): + upload_url = url + upload_headers = { + 'Authorization': f'Token {token}', + 'accept': 'application/json', + 'Content-Type': 'document', + 'Content-Disposition': f'attachment; filename={fichier}' + } + file_path = fichier + with open(file_path, 'rb') as file: + response = requests.post(upload_url, headers=upload_headers, data=file, verify=False) + if response.status_code == 200: + return True + else: + return False + +def parse_csv_and_generate(csv_path, output_dir): + os.makedirs(output_dir, exist_ok=True) + flux_par_passerelle = OrderedDict() + next_id = 0 + + def get_node(nodes_local, key, label=None, color=None, force_unique=False): + """Crée ou récupère un nœud factorisé par cluster/source sauf si force_unique.""" + nonlocal next_id + actual_key = f"{key}__{next_id}" if force_unique else key + if actual_key not in nodes_local: + nodes_local[actual_key] = (f"node{next_id}", color, label if label else key) + next_id += 1 + return nodes_local[actual_key][0] + + def get_action_color(action): + return "#a3f7a3" if action == "PASS" else "#f7a3a3" if action == "BLOCK" else None + + def get_destination_color(disabled): + return "#ffcc00" if disabled == "True" else None + + with open(csv_path, newline='', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + source = (row.get("SOURCE") or "").strip() + passerelle = (row.get("PASSERELLE") or "").strip() + action = (row.get("ACTION") or "").strip().upper() + protocole = (row.get("PROTOCOLE") or "").strip() or "Any" + ports = normalize_ports(row.get("PORT")) + destination = (row.get("DESTINATION") or "").strip() + descr = (row.get("COMMENTAIRE") or "").strip() + + source_label = f"SOURCE | {source}" if source else "SOURCE | " + passerelle_label = f"PASSERELLE | {passerelle}" if passerelle else "PASSERELLE | " + action_label = f"ACTION | {action}" if action else "ACTION | " + proto_label = f"PROTOCOLE | {protocole}" + port_label = f"PORT | {ports}" + destination_label = f"{destination} | {descr}" if descr else f" VLAN | {destination}" or "" + + # --- Initialisation cluster/source --- + if passerelle not in flux_par_passerelle: + flux_par_passerelle[passerelle] = OrderedDict() + if source not in flux_par_passerelle[passerelle]: + flux_par_passerelle[passerelle][source] = {"nodes": OrderedDict(), "edges": set()} + + cluster = flux_par_passerelle[passerelle][source] + + # --- Création des nœuds --- + n_source = get_node(cluster["nodes"], source_label) + n_pass = get_node(cluster["nodes"], passerelle_label) + n_action = get_node(cluster["nodes"], action_label, color=get_action_color(action)) + proto_key = f"{protocole}|{action}" + n_proto = get_node(cluster["nodes"], proto_key, label=proto_label) + port_key = f"{ports}|{proto_key}" + n_port = get_node(cluster["nodes"], port_key, label=port_label) + if "Regles-flottantes" in passerelle: + n_destination = get_node(cluster["nodes"], destination_label) + else: + n_destination = get_node(cluster["nodes"], destination_label, force_unique=True) # DESTINATION non factorisée + + edges = [ + (n_source, n_pass), + (n_pass, n_action), + (n_action, n_proto), + (n_proto, n_port), + (n_port, n_destination), + ] + + cluster["edges"].update(edges) + + # --- Génération des graphes --- + for passerelle, sources in flux_par_passerelle.items(): + filename = os.path.join(output_dir, f"{passerelle.replace('/', '_')}.gv") + g = Digraph('g', filename=filename, format='png') + g.attr(fontname="Helvetica,Arial,sans-serif") + g.attr("node", fontname="Helvetica,Arial,sans-serif", fontsize="11", shape="record") + g.attr("edge", fontname="Helvetica,Arial,sans-serif") + g.attr(rankdir="LR") + g.attr(label=f"PASSERELLE : {passerelle}", labelloc="t", fontsize="14", color="#8888ff") + + for source, cluster in sources.items(): + with g.subgraph(name=f"cluster_{source.replace(' ', '_')}") as sg: + sg.attr(label=f"SOURCE : {source}", style="dashed", color="#aaaaaa") + for nid, color, label in cluster["nodes"].values(): + sg.node(nid, label=label, shape="record", **({"style":"filled","fillcolor":color} if color else {})) + for src, dst in cluster["edges"]: + sg.edge(src, dst) + + output_path = g.render(view=False) + + # Suppression du fichier .gv après rendu + try: + if os.path.exists(filename): + os.remove(filename) + print(f"🗑️ Fichier temporaire supprimé : {filename}") + except Exception as e: + print(f"⚠️ Impossible de supprimer {filename} : {e}") + + print(f"✅ Graph généré : {filename}.png") + + try: + from reportlab.pdfgen import canvas + from reportlab.lib.pagesizes import A4 + from reportlab.lib.utils import ImageReader + + # Récupération des PNG + png_files = sorted(glob.glob(os.path.join(output_dir, "*.png"))) + if not png_files: + print("⚠️ Aucun fichier PNG trouvé pour le PDF.") + return + + pdf_path = os.path.join(output_dir, PASSERELLE+"_MATRICE_DES_FLUX.pdf") + + # Création PDF + c = canvas.Canvas(pdf_path, pagesize=A4) + width, height = A4 + + c.setTitle(f"Matrice des flux de la passerelle {PASSERELLE} ") + + for i, png in enumerate(png_files): + # --- Titre / chapitre = nom du fichier --- + titre_page = os.path.basename(png).replace(".gv.png", "") + + # Ajout du signet PDF + c.bookmarkPage(titre_page) + c.addOutlineEntry(titre_page, titre_page, level=0) + + # Chargement de l'image + img = ImageReader(png) + img_width, img_height = img.getSize() + + # Mise à l’échelle automatique + scale = min(width / img_width, height / img_height) + new_width = img_width * scale + new_height = img_height * scale + + # Centrage + x = (width - new_width) / 2 + y = (height - new_height) / 2 + + # Dessin + c.drawImage(img, x, y, width=new_width, height=new_height) + + c.showPage() + + c.save() + print(f"📄 PDF avec chapitres généré : {pdf_path}") + + try: + for png in png_files: + if os.path.exists(png): + os.remove(png) + print(f"🗑️ PNG supprimé : {png}") + except Exception as e: + print(f"⚠️ Impossible de supprimer certains PNG : {e}") + + except Exception as e: + print(f"⚠️ Erreur lors de la génération du PDF : {e}") + + if not export_to_ciso(CISO_EVIDENCE, CISO_TOKEN, pdf_path): + logging.error("Échec de l'exportation dans Ciso.") + return + +# --- EXTRACTION DES DONNÉES --- +with open(FICHIER_CSV, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, + fieldnames=["SOURCE", "PASSERELLE", "ACTION", "PROTOCOLE", "PORT", "DESTINATION", "COMMENTAIRE"] + ) + writer.writeheader() # → On écrit l’entête une seule fois + + # Boucle sur les interfaces + for interface in INTERFACES: + params = { + "interface": interface, + "show_all": "1" + } + + data = recup_regles(OPNS_URL, OPNS_SECRET, OPNS_KEY, params) + entries = data.get("rows", []) + + # Écriture des entrées + for entry in entries: + source_val = ( + entry.get('source', {}).get('network') + or entry.get('source', {}).get('address') + or entry.get('source_net') + or entry.get('source', {}).get('any') + ) + destination_val = ( + entry.get('destination', {}).get('network') + or entry.get('destination', {}).get('address') + or entry.get('destination', {}).get('any') + or entry.get("destination_net") + ) + port_dest_val = ( + entry.get('destination', {}).get('port') + or entry.get("destination_port") + ) + writer.writerow({ + "SOURCE": safe_value(source_val, "source"), + "PASSERELLE": PASSERELLE + "/" + safe_value(entry.get("interface"), "interface") + if entry.get("interface") + else PASSERELLE + "/Regles-flottantes", + "ACTION": safe_value(entry.get("action")), + "PROTOCOLE": safe_value(entry.get("protocol")), + "PORT": safe_value(port_dest_val, "destination_port"), + "DESTINATION": safe_value(destination_val, "destination"), + "COMMENTAIRE": safe_value(entry.get("description")) + }) + + +# Récupération de la précédente somme md5sum +with open("md5sum.txt", "r") as f: + prev_md5sum = f.readline().strip() # .strip() enlève les retours à la ligne +# Génération de la somme md5sum du fichier csv généré +actual_md5sum = md5sum(FICHIER_CSV) + +# Comparaison des sommes md5sum. +# Si différentes => génération de la matrice. +# Si identique => arrêt du script. +if prev_md5sum != actual_md5sum: + with open("md5sum.txt", "w") as f: + f.write(actual_md5sum + "\n") + parse_csv_and_generate(FICHIER_CSV,GRAPH_OUTPUT_DIR) +else: + logging.info("Pas de règles crées ou modifiées") + +if os.path.exists(FICHIER_CSV): + os.remove(FICHIER_CSV) \ No newline at end of file