Files
PVE-Backup-Report/src/pve_backup_report/collectors.py
T
2026-05-13 16:04:17 +02:00

1451 lines
45 KiB
Python

from __future__ import annotations
from dataclasses import replace
from datetime import datetime
import re
from typing import Any
from urllib.parse import urlparse
from pve_backup_report.models import (
BackupJob,
CollectionIssue,
Guest,
LastBackupResult,
PbsAccessUser,
PbsBackupSnapshotSummary,
PbsDatastoreUsage,
PbsGarbageCollectionStatus,
PbsRetentionPolicy,
PbsStorage,
Pool,
ReportData,
)
from pve_backup_report.pbs_client import PbsApiError, PbsClient, PbsHttpError
from pve_backup_report.pve_client import PveApiError, PveClient
def collect_report_data(
client: PveClient,
pbs_clients: list[PbsClient] | None = None,
) -> ReportData:
"""Collecte limitee aux stockages PBS et jobs de sauvegarde PVE."""
issues: list[CollectionIssue] = []
pbs_storages = collect_pbs_storages(client, issues)
pbs_clients = pbs_clients or []
pbs_access_users = collect_pbs_access_users(pbs_clients, pbs_storages, issues)
pbs_retention_policies = collect_pbs_retention_policies(pbs_clients, issues)
pbs_datastore_usages = collect_pbs_datastore_usages(pbs_clients, pbs_storages, issues)
pbs_gc_statuses = collect_pbs_gc_statuses(pbs_clients, pbs_storages, issues)
pbs_snapshot_summaries = collect_pbs_snapshot_summaries(
pbs_clients,
pbs_storages,
issues,
)
backup_jobs = collect_backup_jobs(client, issues)
guests = collect_guests(client, issues)
pools = collect_pools(client, issues)
last_backup_results = collect_last_backup_results(client, guests, issues)
return ReportData(
pbs_server_names=[pbs_client.server_name for pbs_client in pbs_clients],
pbs_storages=pbs_storages,
pbs_access_users=pbs_access_users,
pbs_retention_policies=pbs_retention_policies,
pbs_snapshot_summaries=pbs_snapshot_summaries,
pbs_datastore_usages=pbs_datastore_usages,
pbs_gc_statuses=pbs_gc_statuses,
guests=guests,
pools=pools,
backup_jobs=backup_jobs,
last_backup_results=last_backup_results,
issues=issues,
)
def collect_pbs_datastore_usages(
clients: list[PbsClient],
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[PbsDatastoreUsage]:
usages: list[PbsDatastoreUsage] = []
for client in clients:
for datastore in pbs_datastores_for_usage(client, pbs_storages, issues):
try:
raw_status = client.get_datastore_status(datastore)
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_storage_usage",
"statut datastore PBS indisponible",
details=f"{client.server_name}: {datastore}: {exc}",
)
continue
if not isinstance(raw_status, dict):
add_issue(
issues,
"warning",
"pbs_storage_usage",
"reponse statut datastore PBS inattendue",
details=f"{client.server_name}: {datastore}",
)
continue
usages.append(normalize_pbs_datastore_usage(client.server_name, datastore, raw_status))
return sorted(usages, key=lambda usage: (usage.server_name, usage.datastore))
def collect_pbs_gc_statuses(
clients: list[PbsClient],
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[PbsGarbageCollectionStatus]:
statuses: list[PbsGarbageCollectionStatus] = []
for client in clients:
for datastore in pbs_datastores_for_usage(client, pbs_storages, issues):
try:
raw_status = client.get_datastore_gc_status(datastore)
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_gc",
"statut garbage collector PBS indisponible",
details=f"{client.server_name}: {datastore}: {exc}",
)
continue
if not isinstance(raw_status, dict):
add_issue(
issues,
"warning",
"pbs_gc",
"reponse garbage collector PBS inattendue",
details=f"{client.server_name}: {datastore}",
)
continue
statuses.append(normalize_pbs_gc_status(client.server_name, datastore, raw_status))
return sorted(statuses, key=lambda status: (status.server_name, status.datastore))
def pbs_datastores_for_usage(
client: PbsClient,
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[str]:
datastores = {
storage.datastore
for storage in pbs_client_storages(client, pbs_storages)
if storage.datastore
}
try:
raw_datastores = client.get_datastores()
except PbsApiError as exc:
if not datastores:
add_issue(
issues,
"warning",
"pbs_storage_usage",
"liste datastores PBS indisponible",
details=f"{client.server_name}: {exc}",
)
return sorted(datastores)
if not isinstance(raw_datastores, list):
if not datastores:
add_issue(
issues,
"warning",
"pbs_storage_usage",
"reponse datastores PBS inattendue",
details=client.server_name,
)
return sorted(datastores)
for raw_datastore in raw_datastores:
if not isinstance(raw_datastore, dict):
continue
datastore = string_value(raw_datastore, "name", "store", "datastore")
if datastore:
datastores.add(datastore)
if not datastores:
add_issue(
issues,
"warning",
"pbs_storage_usage",
"aucun datastore PBS trouve pour le statut d'espace",
details=client.server_name,
)
return sorted(datastores)
def collect_pbs_snapshot_summaries(
clients: list[PbsClient],
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> dict[tuple[str, str, str, str, int], PbsBackupSnapshotSummary]:
if not clients:
return {}
summaries: dict[tuple[str, str, str, str, int], PbsBackupSnapshotSummary] = {}
for client in clients:
for datastore, namespace in pbs_snapshot_scopes(client, pbs_storages, issues):
try:
raw_snapshots = client.get_datastore_snapshots(datastore, namespace)
except PbsApiError as exc:
if is_missing_pbs_snapshot_namespace(exc, namespace):
continue
add_issue(
issues,
"warning",
"pbs_snapshots",
"snapshots PBS indisponibles",
details=f"{client.server_name}: {datastore}/{namespace}: {exc}",
)
continue
if not isinstance(raw_snapshots, list):
add_issue(
issues,
"warning",
"pbs_snapshots",
"reponse snapshots PBS inattendue",
details=f"{client.server_name}: {datastore}/{namespace}",
)
continue
for raw_snapshot in raw_snapshots:
if not isinstance(raw_snapshot, dict):
continue
summary = normalize_pbs_snapshot(
client.server_name,
datastore,
namespace,
raw_snapshot,
)
if summary is None:
continue
key = pbs_snapshot_summary_key(summary)
summaries[key] = merge_pbs_snapshot_summary(summaries.get(key), summary)
return summaries
def is_missing_pbs_snapshot_namespace(exc: PbsApiError, namespace: str) -> bool:
return (
isinstance(exc, PbsHttpError)
and exc.status_code == 400
and bool(namespace)
and namespace != "/"
)
def pbs_snapshot_scopes(
client: PbsClient,
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[tuple[str, str]]:
storage_scopes = pbs_storage_snapshot_scopes(client, pbs_storages, issues)
api_scopes = collect_pbs_api_snapshot_scopes(
client,
issues,
additional_namespaces=pve_declared_pbs_namespaces(pbs_storages),
warn_on_error=not storage_scopes,
)
return sorted(set(storage_scopes) | set(api_scopes))
def collect_pbs_api_snapshot_scopes(
client: PbsClient,
issues: list[CollectionIssue] | None = None,
additional_namespaces: list[str] | None = None,
warn_on_error: bool = True,
) -> list[tuple[str, str]]:
try:
raw_datastores = client.get_datastores()
except PbsApiError as exc:
if warn_on_error:
add_issue(
issues,
"warning",
"pbs_snapshots",
"liste datastores PBS indisponible",
details=f"{client.server_name}: {exc}",
)
return []
if not isinstance(raw_datastores, list):
if warn_on_error:
add_issue(
issues,
"warning",
"pbs_snapshots",
"reponse datastores PBS inattendue",
details=client.server_name,
)
return []
scopes: set[tuple[str, str]] = set()
pve_namespaces = set(additional_namespaces or [])
for raw_datastore in raw_datastores:
if not isinstance(raw_datastore, dict):
continue
datastore = string_value(raw_datastore, "name", "store", "datastore")
if not datastore:
continue
datastore_namespaces = set(collect_pbs_datastore_namespaces(client, datastore, issues))
for namespace in datastore_namespaces | pve_namespaces:
scopes.add((datastore, namespace))
return sorted(scopes)
def collect_pbs_datastore_namespaces(
client: PbsClient,
datastore: str,
issues: list[CollectionIssue] | None = None,
) -> list[str]:
try:
raw_namespaces = client.get_datastore_namespaces(datastore)
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_snapshots",
"liste namespaces PBS indisponible",
details=f"{client.server_name}: {datastore}: {exc}",
)
return ["/"]
if not isinstance(raw_namespaces, list):
add_issue(
issues,
"warning",
"pbs_snapshots",
"reponse namespaces PBS inattendue",
details=f"{client.server_name}: {datastore}",
)
return ["/"]
namespaces = {
normalize_pbs_namespace(string_value(raw_namespace, "ns"))
for raw_namespace in raw_namespaces
if isinstance(raw_namespace, dict)
}
return sorted(namespaces or {"/"})
def pbs_storage_snapshot_scopes(
client: PbsClient,
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[tuple[str, str]]:
scopes: set[tuple[str, str]] = set()
for storage in pbs_client_storages(client, pbs_storages):
if not storage.datastore:
add_issue(
issues,
"warning",
"pbs_snapshots",
"snapshots PBS ignores: datastore absent",
details=f"{client.server_name}: {storage.storage_id}",
)
continue
scopes.add((storage.datastore, storage.namespace or "/"))
return sorted(scopes)
def pve_declared_pbs_namespaces(pbs_storages: list[PbsStorage]) -> list[str]:
namespaces = {storage.namespace or "/" for storage in pbs_storages}
return sorted(namespace for namespace in namespaces if namespace)
def pbs_client_storages(
client: PbsClient,
pbs_storages: list[PbsStorage],
) -> list[PbsStorage]:
pbs_hosts = {client.server_name}
if client.api_url:
api_hostname = api_host(client.api_url)
pbs_hosts.add(api_hostname)
for configured_host, display_name in client.config.pbs_hostnames.items():
if api_hostname in {configured_host, display_name}:
pbs_hosts.add(configured_host)
pbs_hosts.add(display_name)
return [
storage
for storage in pbs_storages
if storage.server is not None and storage.server in pbs_hosts
]
def collect_pbs_retention_policies(
clients: list[PbsClient],
issues: list[CollectionIssue] | None = None,
) -> list[PbsRetentionPolicy]:
policies: list[PbsRetentionPolicy] = []
for client in clients:
try:
raw_policies = client.get_prune_jobs()
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_retention",
f"politique de retention {client.server_name} indisponible",
details=str(exc),
)
continue
if not isinstance(raw_policies, list):
add_issue(
issues,
"warning",
"pbs_retention",
"reponse /config/prune inattendue",
details=client.server_name,
)
continue
for raw_policy in raw_policies:
if not isinstance(raw_policy, dict):
add_issue(
issues,
"warning",
"pbs_retention",
"politique de retention ignoree: format invalide",
details=client.server_name,
)
continue
policies.append(normalize_pbs_retention_policy(client.server_name, raw_policy))
if not policies:
add_issue(
issues,
"warning",
"pbs_retention",
"aucune politique de retention PBS collectee",
details="verifier la presence de prune jobs PBS et les droits des tokens PBS",
)
return sorted(
policies,
key=lambda policy: (
policy.server_name,
policy.datastore or "",
policy.namespace or "",
policy.policy_id,
),
)
def collect_pbs_access_users(
clients: list[PbsClient],
pbs_storages: list[PbsStorage],
issues: list[CollectionIssue] | None = None,
) -> list[PbsAccessUser]:
access_users: list[PbsAccessUser] = []
for client in clients:
client_storages = [
storage
for storage in pbs_client_storages(client, pbs_storages)
if storage.username
]
if not client_storages:
continue
raw_users = collect_pbs_raw_access_users(client, issues)
users_by_id = {
string_value(raw_user, "userid"): raw_user
for raw_user in raw_users
if string_value(raw_user, "userid")
}
for storage in client_storages:
if not storage.username:
continue
raw_user = users_by_id.get(pbs_auth_user_id(storage.username), {})
permissions = collect_pbs_access_permissions(client, storage, issues)
access_users.append(
normalize_pbs_access_user(
client.server_name,
storage,
raw_user,
permissions,
)
)
return sorted(
access_users,
key=lambda user: (
user.server_name,
user.storage_id,
user.datastore or "",
user.namespace or "",
user.auth_id,
),
)
def collect_pbs_raw_access_users(
client: PbsClient,
issues: list[CollectionIssue] | None = None,
) -> list[dict[str, Any]]:
try:
raw_users = client.get_access_users()
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_users",
"liste des utilisateurs PBS indisponible",
details=f"{client.server_name}: {exc}",
)
return []
if not isinstance(raw_users, list):
add_issue(
issues,
"warning",
"pbs_users",
"reponse /access/users inattendue",
details=client.server_name,
)
return []
return [raw_user for raw_user in raw_users if isinstance(raw_user, dict)]
def collect_pbs_access_permissions(
client: PbsClient,
storage: PbsStorage,
issues: list[CollectionIssue] | None = None,
) -> dict[str, bool]:
if not storage.username or not storage.datastore:
return {}
path = pbs_datastore_acl_path(storage.datastore, storage.namespace)
try:
raw_permissions = client.get_access_permissions(storage.username, path)
except PbsApiError as exc:
add_issue(
issues,
"warning",
"pbs_users",
"permissions utilisateur PBS indisponibles",
details=f"{client.server_name}: {storage.username}: {path}: {exc}",
)
return {}
if not isinstance(raw_permissions, dict):
add_issue(
issues,
"warning",
"pbs_users",
"reponse /access/permissions inattendue",
details=f"{client.server_name}: {storage.username}: {path}",
)
return {}
path_permissions = raw_permissions.get(path)
if isinstance(path_permissions, dict):
return {
str(permission): bool(enabled)
for permission, enabled in path_permissions.items()
}
return {
str(permission): bool(enabled)
for permission, enabled in raw_permissions.items()
if isinstance(enabled, bool)
}
def collect_pbs_storages(
client: PveClient,
issues: list[CollectionIssue] | None = None,
) -> list[PbsStorage]:
raw_storages = client.get_storages()
if not isinstance(raw_storages, list):
add_issue(issues, "error", "storage", "reponse /storage inattendue")
return []
storages: list[PbsStorage] = []
for raw_storage in raw_storages:
if not isinstance(raw_storage, dict):
add_issue(issues, "warning", "storage", "storage ignore: format invalide")
continue
if str(raw_storage.get("type", "")).lower() != "pbs":
continue
storages.append(normalize_pbs_storage(raw_storage))
return storages
def collect_backup_jobs(
client: PveClient,
issues: list[CollectionIssue] | None = None,
) -> list[BackupJob]:
raw_jobs = client.get_backup_jobs()
if not isinstance(raw_jobs, list):
add_issue(issues, "error", "backup_jobs", "reponse jobs inattendue")
return []
jobs: list[BackupJob] = []
for raw_job in raw_jobs:
if not isinstance(raw_job, dict):
add_issue(issues, "warning", "backup_jobs", "job ignore: format invalide")
continue
jobs.append(normalize_backup_job(raw_job))
return jobs
def collect_guests(
client: PveClient,
issues: list[CollectionIssue] | None = None,
) -> list[Guest]:
raw_resources = client.get_cluster_resources()
if not isinstance(raw_resources, list):
add_issue(issues, "error", "guests", "reponse /cluster/resources inattendue")
return []
guests: list[Guest] = []
for raw_resource in raw_resources:
if not isinstance(raw_resource, dict):
add_issue(issues, "warning", "guests", "ressource ignoree: format invalide")
continue
resource_type = string_value(raw_resource, "type")
if resource_type not in {"qemu", "lxc"}:
continue
guest = normalize_guest(raw_resource)
if guest is None:
add_issue(
issues,
"warning",
"guests",
"VM/CT ignoree: vmid invalide ou absent",
details=str(raw_resource.get("id", "")),
)
continue
guests.append(guest)
return sorted(enrich_guest_notes(client, guests, issues), key=lambda guest: guest.vmid)
def enrich_guest_notes(
client: PveClient,
guests: list[Guest],
issues: list[CollectionIssue] | None = None,
) -> list[Guest]:
enriched: list[Guest] = []
for guest in guests:
if not guest.node or guest.guest_type not in {"qemu", "lxc"}:
enriched.append(guest)
continue
try:
raw_config = (
client.get_qemu_config(guest.node, guest.vmid)
if guest.guest_type == "qemu"
else client.get_lxc_config(guest.node, guest.vmid)
)
except PveApiError as exc:
add_issue(
issues,
"warning",
"guests",
"notes VM/CT indisponibles",
details=f"{guest.node}/{guest.guest_type}/{guest.vmid}: {exc}",
)
enriched.append(guest)
continue
if not isinstance(raw_config, dict):
add_issue(
issues,
"warning",
"guests",
"configuration VM/CT ignoree: format invalide",
details=f"{guest.node}/{guest.guest_type}/{guest.vmid}",
)
enriched.append(guest)
continue
enriched.append(replace(guest, notes=guest_notes(raw_config) or guest.notes))
return enriched
def collect_pools(
client: PveClient,
issues: list[CollectionIssue] | None = None,
) -> list[Pool]:
raw_pools = client.get_pools()
if not isinstance(raw_pools, list):
add_issue(issues, "error", "pools", "reponse /pools inattendue")
return []
pools: list[Pool] = []
for raw_pool in raw_pools:
if not isinstance(raw_pool, dict):
add_issue(issues, "warning", "pools", "pool ignore: format invalide")
continue
pool_id = string_value(raw_pool, "poolid", "pool", "id")
if not pool_id:
add_issue(issues, "warning", "pools", "pool ignore: id absent")
continue
pool_detail = collect_pool_detail(client, pool_id, issues)
if pool_detail is None:
continue
pools.append(pool_detail)
return sorted(pools, key=lambda pool: pool.pool_id)
def collect_last_backup_results(
client: PveClient,
guests: list[Guest],
issues: list[CollectionIssue] | None = None,
) -> dict[int, LastBackupResult]:
raw_tasks = collect_backup_task_candidates(client, guests, issues)
results: dict[int, LastBackupResult] = {}
for raw_task in recent_unique_vzdump_tasks(raw_tasks)[: client.config.pve_task_history_limit]:
if not isinstance(raw_task, dict):
add_issue(issues, "warning", "tasks", "tache ignoree: format invalide")
continue
task_results = collect_task_log_backup_results(client, raw_task, issues)
if not task_results:
fallback_result = normalize_last_backup_result(raw_task)
task_results = [fallback_result] if fallback_result is not None else []
for result in task_results:
current = results.get(result.vmid)
if current is None or backup_result_sort_key(result) > backup_result_sort_key(current):
results[result.vmid] = result
return results
def collect_backup_task_candidates(
client: PveClient,
guests: list[Guest],
issues: list[CollectionIssue] | None = None,
) -> list[Any]:
tasks: list[Any] = []
try:
raw_tasks = client.get_cluster_tasks()
except PveApiError as exc:
add_issue(
issues,
"warning",
"tasks",
"historique des taches de sauvegarde indisponible",
details=str(exc),
)
raw_tasks = []
if isinstance(raw_tasks, list):
tasks.extend(raw_tasks)
else:
add_issue(issues, "error", "tasks", "reponse /cluster/tasks inattendue")
for node in sorted({guest.node for guest in guests if guest.node}):
try:
raw_node_tasks = client.get_node_tasks(node)
except PveApiError as exc:
add_issue(
issues,
"warning",
"tasks",
"historique des taches de sauvegarde du noeud indisponible",
details=f"{node}: {exc}",
)
continue
if isinstance(raw_node_tasks, list):
tasks.extend(raw_node_tasks)
else:
add_issue(
issues,
"warning",
"tasks",
"reponse /nodes/{node}/tasks inattendue",
details=node,
)
return tasks
def recent_unique_vzdump_tasks(raw_tasks: list[Any]) -> list[dict[str, Any]]:
tasks_by_key: dict[str, dict[str, Any]] = {}
for raw_task in raw_tasks:
if not isinstance(raw_task, dict):
continue
if string_value(raw_task, "type") != "vzdump":
continue
key = string_value(raw_task, "upid") or ":".join(
[
string_value(raw_task, "node") or "",
string_value(raw_task, "id") or "",
string_value(raw_task, "starttime") or "",
string_value(raw_task, "endtime") or "",
]
)
if key not in tasks_by_key:
tasks_by_key[key] = raw_task
return sorted(
tasks_by_key.values(),
key=lambda task: task_time_sort_key(task),
reverse=True,
)
def collect_task_log_backup_results(
client: PveClient,
raw_task: dict[str, Any],
issues: list[CollectionIssue] | None = None,
) -> list[LastBackupResult]:
node = string_value(raw_task, "node")
upid = string_value(raw_task, "upid")
if not node or not upid:
return []
try:
raw_log = client.get_task_log(node, upid)
except PveApiError as exc:
add_issue(
issues,
"warning",
"tasks",
"log de tache vzdump indisponible",
details=f"{node}: {exc}",
)
return []
if not isinstance(raw_log, list):
add_issue(
issues,
"warning",
"tasks",
"log de tache vzdump ignore: format invalide",
details=node,
)
return []
lines = [task_log_line(entry) for entry in raw_log]
return parse_vzdump_task_log(raw_task, [line for line in lines if line])
def collect_pool_detail(
client: PveClient,
pool_id: str,
issues: list[CollectionIssue] | None = None,
) -> Pool | None:
raw_pool_detail = client.get_pool(pool_id)
if not isinstance(raw_pool_detail, dict):
add_issue(
issues,
"warning",
"pools",
"detail pool ignore: format invalide",
details=pool_id,
)
return None
return normalize_pool(pool_id, raw_pool_detail)
def normalize_pbs_storage(raw_storage: dict[str, Any]) -> PbsStorage:
return PbsStorage(
storage_id=string_value(raw_storage, "storage", "id") or "inconnu",
username=string_value(raw_storage, "username"),
server=string_value(raw_storage, "server"),
datastore=string_value(raw_storage, "datastore"),
namespace=string_value(raw_storage, "namespace"),
enabled=parse_enabled(raw_storage, default=True),
raw=dict(raw_storage),
)
def normalize_pbs_access_user(
server_name: str,
storage: PbsStorage,
raw_user: dict[str, Any],
permissions: dict[str, bool] | None = None,
) -> PbsAccessUser:
return PbsAccessUser(
server_name=server_name,
auth_id=storage.username or "inconnu",
user_id=pbs_auth_user_id(storage.username or "inconnu"),
storage_id=storage.storage_id,
datastore=storage.datastore,
namespace=storage.namespace,
enabled=parse_enabled(raw_user, default=True if raw_user else None),
expire=normalize_pbs_user_expire(raw_user),
email=string_value(raw_user, "email"),
comment=string_value(raw_user, "comment"),
permissions=permissions or {},
raw=dict(raw_user),
)
def normalize_pbs_user_expire(raw_user: dict[str, Any]) -> int | None:
if not raw_user:
return None
if "expire" not in raw_user:
return 0
return parse_optional_int(raw_user.get("expire"))
def normalize_pbs_retention_policy(
server_name: str,
raw_policy: dict[str, Any],
) -> PbsRetentionPolicy:
return PbsRetentionPolicy(
policy_id=string_value(raw_policy, "id", "job-id", "jobid") or "inconnu",
server_name=server_name,
datastore=string_value(raw_policy, "store", "datastore"),
namespace=string_value(raw_policy, "ns", "namespace") or "/",
schedule=string_value(raw_policy, "schedule"),
enabled=parse_enabled(raw_policy, default=True) is not False,
keep_last=parse_optional_int(raw_policy.get("keep-last")),
keep_hourly=parse_optional_int(raw_policy.get("keep-hourly")),
keep_daily=parse_optional_int(raw_policy.get("keep-daily")),
keep_weekly=parse_optional_int(raw_policy.get("keep-weekly")),
keep_monthly=parse_optional_int(raw_policy.get("keep-monthly")),
keep_yearly=parse_optional_int(raw_policy.get("keep-yearly")),
max_depth=parse_optional_int(raw_policy.get("max-depth")),
comment=string_value(raw_policy, "comment"),
raw=dict(raw_policy),
)
def normalize_pbs_snapshot(
server_name: str,
datastore: str,
namespace: str,
raw_snapshot: dict[str, Any],
) -> PbsBackupSnapshotSummary | None:
backup_type = string_value(raw_snapshot, "backup-type")
if backup_type not in {"vm", "ct"}:
return None
vmid = parse_vmid(raw_snapshot.get("backup-id"))
if vmid is None:
return None
backup_at = parse_timestamp(raw_snapshot.get("backup-time"))
if backup_at is None:
return None
return PbsBackupSnapshotSummary(
server_name=server_name,
vmid=vmid,
guest_type=pbs_guest_type_to_pve_type(backup_type),
datastore=datastore,
namespace=namespace or "/",
snapshot_count=1,
oldest_backup_at=backup_at,
newest_backup_at=backup_at,
newest_backup_size_bytes=parse_snapshot_size(raw_snapshot),
raw=dict(raw_snapshot),
)
def normalize_pbs_datastore_usage(
server_name: str,
datastore: str,
raw_status: dict[str, Any],
) -> PbsDatastoreUsage:
return PbsDatastoreUsage(
server_name=server_name,
datastore=datastore,
total_bytes=parse_optional_int(raw_status.get("total")),
used_bytes=parse_optional_int(raw_status.get("used")),
available_bytes=parse_optional_int(raw_status.get("avail")),
raw=dict(raw_status),
)
def normalize_pbs_gc_status(
server_name: str,
datastore: str,
raw_status: dict[str, Any],
) -> PbsGarbageCollectionStatus:
last_run_state = string_value(raw_status, "last-run-state")
upid = string_value(raw_status, "upid")
if last_run_state:
status = last_run_state
elif upid:
status = "en_cours"
else:
status = "non_renseigne"
return PbsGarbageCollectionStatus(
server_name=server_name,
datastore=datastore,
status=status,
schedule=string_value(raw_status, "schedule"),
last_run_endtime=parse_timestamp(raw_status.get("last-run-endtime")),
next_run=parse_timestamp(raw_status.get("next-run")),
raw=dict(raw_status),
)
def normalize_backup_job(raw_job: dict[str, Any]) -> BackupJob:
job_id = string_value(raw_job, "id", "job-id", "jobid") or "inconnu"
return BackupJob(
job_id=job_id,
storage=string_value(raw_job, "storage"),
schedule=string_value(raw_job, "schedule"),
enabled=parse_enabled(raw_job, default=True),
mode=string_value(raw_job, "mode"),
selection=first_present_string(raw_job, "vmid", "pool", "all"),
excluded=string_value(raw_job, "exclude"),
raw=dict(raw_job),
)
def normalize_guest(raw_resource: dict[str, Any]) -> Guest | None:
vmid = parse_vmid(raw_resource.get("vmid"))
if vmid is None:
resource_id = string_value(raw_resource, "id")
if resource_id and "/" in resource_id:
vmid = parse_vmid(resource_id.rsplit("/", 1)[-1])
if vmid is None:
return None
guest_type = string_value(raw_resource, "type") or "inconnu"
return Guest(
vmid=vmid,
name=string_value(raw_resource, "name") or f"{guest_type}-{vmid}",
guest_type=guest_type,
notes=guest_notes(raw_resource),
node=string_value(raw_resource, "node"),
status=string_value(raw_resource, "status"),
raw=dict(raw_resource),
)
def guest_notes(data: dict[str, Any]) -> str | None:
value = string_value(data, "description", "notes", "comment")
if value is None:
return None
value = value.strip()
return value or None
def normalize_pool(pool_id: str, raw_pool_detail: dict[str, Any]) -> Pool:
members = raw_pool_detail.get("members")
vmids: set[int] = set()
if isinstance(members, list):
for member in members:
if not isinstance(member, dict):
continue
member_type = string_value(member, "type")
if member_type not in {"qemu", "lxc"}:
continue
vmid = parse_vmid(member.get("vmid"))
if vmid is None:
member_id = string_value(member, "id")
if member_id and "/" in member_id:
vmid = parse_vmid(member_id.rsplit("/", 1)[-1])
if vmid is not None:
vmids.add(vmid)
return Pool(
pool_id=pool_id,
vmids=vmids,
raw=dict(raw_pool_detail),
)
def normalize_last_backup_result(raw_task: dict[str, Any]) -> LastBackupResult | None:
if string_value(raw_task, "type") != "vzdump":
return None
vmid = extract_task_vmid(raw_task)
if vmid is None:
return None
finished_at = parse_timestamp(raw_task.get("endtime"))
if finished_at is None:
return None
status = backup_task_status(raw_task.get("status"))
return LastBackupResult(
vmid=vmid,
status=status,
finished_at=finished_at,
duration_seconds=task_duration_seconds(raw_task),
node=string_value(raw_task, "node"),
raw=dict(raw_task),
)
def parse_vzdump_task_log(
raw_task: dict[str, Any],
lines: list[str],
) -> list[LastBackupResult]:
started_vmids: set[int] = set()
job_vmids: set[int] = set()
skipped_vmids: set[int] = set()
results: dict[int, LastBackupResult] = {}
finished_at = parse_timestamp(raw_task.get("endtime"))
task_duration = task_duration_seconds(raw_task)
node = string_value(raw_task, "node")
for line in lines:
job_vmids.update(extract_job_vmids_from_log_line(line))
skipped_vmids.update(extract_skipped_vmids_from_log_line(line))
started_vmid = extract_vmid_from_log_line(
line,
pattern=r"Starting Backup of (?:VM|CT)\s+(\d+)",
)
if started_vmid is not None:
started_vmids.add(started_vmid)
failed_vmid = extract_vmid_from_log_line(
line,
pattern=r"Backup of (?:VM|CT)\s+(\d+)\s+failed",
)
if failed_vmid is not None:
results[failed_vmid] = LastBackupResult(
vmid=failed_vmid,
status="echec",
finished_at=finished_at,
duration_seconds=task_duration,
node=node,
raw=dict(raw_task),
)
finished_vmid, duration_seconds = extract_finished_backup_from_log_line(line)
if finished_vmid is not None and finished_vmid not in results:
results[finished_vmid] = LastBackupResult(
vmid=finished_vmid,
status="succes",
finished_at=finished_at,
duration_seconds=duration_seconds,
node=node,
raw=dict(raw_task),
)
if raw_task.get("status") == "OK":
successful_vmids = started_vmids | (job_vmids - skipped_vmids)
for vmid in successful_vmids:
results.setdefault(
vmid,
LastBackupResult(
vmid=vmid,
status="succes",
finished_at=finished_at,
duration_seconds=task_duration if vmid in started_vmids else None,
node=node,
raw=dict(raw_task),
),
)
return list(results.values())
def extract_job_vmids_from_log_line(line: str) -> set[int]:
match = re.search(
r"starting new backup job:\s+vzdump\s+(.+?)(?:\s+--|$)",
line,
flags=re.IGNORECASE,
)
if not match:
return set()
return parse_vmid_sequence(match.group(1))
def extract_skipped_vmids_from_log_line(line: str) -> set[int]:
match = re.search(
r"skip external VMs:\s+(.+)$",
line,
flags=re.IGNORECASE,
)
if not match:
return set()
return parse_vmid_sequence(match.group(1))
def parse_vmid_sequence(value: str) -> set[int]:
vmids: set[int] = set()
for item in re.split(r"[\s,;]+", value.strip()):
vmid = parse_vmid(item)
if vmid is not None:
vmids.add(vmid)
return vmids
def extract_vmid_from_log_line(line: str, pattern: str) -> int | None:
match = re.search(pattern, line, flags=re.IGNORECASE)
if not match:
return None
return parse_vmid(match.group(1))
def extract_finished_backup_from_log_line(line: str) -> tuple[int | None, int | None]:
match = re.search(
r"Finished Backup of (?:VM|CT)\s+(\d+)(?:\s+\(([^)]+)\))?",
line,
flags=re.IGNORECASE,
)
if not match:
return None, None
return parse_vmid(match.group(1)), parse_duration_seconds(match.group(2))
def task_log_line(entry: Any) -> str | None:
if isinstance(entry, str):
return entry
if isinstance(entry, dict):
for key in ("t", "text", "message"):
value = entry.get(key)
if isinstance(value, str):
return value
return None
def backup_task_status(value: Any) -> str:
if value == "OK":
return "succes"
if value is None or str(value).strip() == "":
return "indetermine"
return "echec"
def extract_task_vmid(raw_task: dict[str, Any]) -> int | None:
for key in ("vmid", "id", "guest"):
vmid = parse_vmid(raw_task.get(key))
if vmid is not None:
return vmid
value = string_value(raw_task, key)
if value and "/" in value:
vmid = parse_vmid(value.rsplit("/", 1)[-1])
if vmid is not None:
return vmid
upid = string_value(raw_task, "upid")
if not upid:
return None
parts = upid.split(":")
for index, part in enumerate(parts):
if part == "vzdump" and index + 1 < len(parts):
return parse_vmid(parts[index + 1])
return None
def backup_result_sort_key(result: LastBackupResult) -> float:
if result.finished_at is None:
return 0
return result.finished_at.timestamp()
def task_duration_seconds(raw_task: dict[str, Any]) -> int | None:
started_at = parse_timestamp(raw_task.get("starttime"))
finished_at = parse_timestamp(raw_task.get("endtime"))
if started_at is None or finished_at is None:
return None
duration = int(finished_at.timestamp() - started_at.timestamp())
if duration < 0:
return None
return duration
def task_time_sort_key(raw_task: dict[str, Any]) -> float:
timestamp = parse_timestamp(raw_task.get("endtime")) or parse_timestamp(
raw_task.get("starttime")
)
if timestamp is None:
return 0
return timestamp.timestamp()
def pbs_storage_key(storage: PbsStorage) -> tuple[str, str]:
return storage.datastore or "", storage.namespace or "/"
def pbs_auth_user_id(auth_id: str) -> str:
return auth_id.split("!", 1)[0]
def pbs_datastore_acl_path(datastore: str, namespace: str | None = None) -> str:
path = f"/datastore/{datastore}"
if namespace and namespace != "/":
path = f"{path}/{namespace.strip('/')}"
return path
def pbs_snapshot_summary_key(
summary: PbsBackupSnapshotSummary,
) -> tuple[str, str, str, str, int]:
return (
summary.server_name,
summary.datastore,
summary.namespace or "/",
summary.guest_type,
summary.vmid,
)
def merge_pbs_snapshot_summary(
current: PbsBackupSnapshotSummary | None,
item: PbsBackupSnapshotSummary,
) -> PbsBackupSnapshotSummary:
if current is None:
return item
oldest_candidates = [
value for value in (current.oldest_backup_at, item.oldest_backup_at) if value is not None
]
newest_candidates = [
value for value in (current.newest_backup_at, item.newest_backup_at) if value is not None
]
if item.newest_backup_at is not None and (
current.newest_backup_at is None or item.newest_backup_at >= current.newest_backup_at
):
newest_size = item.newest_backup_size_bytes
newest_raw = item.raw
else:
newest_size = current.newest_backup_size_bytes
newest_raw = current.raw
return PbsBackupSnapshotSummary(
server_name=item.server_name,
vmid=item.vmid,
guest_type=item.guest_type,
datastore=item.datastore,
namespace=item.namespace,
snapshot_count=current.snapshot_count + item.snapshot_count,
oldest_backup_at=min(oldest_candidates) if oldest_candidates else None,
newest_backup_at=max(newest_candidates) if newest_candidates else None,
newest_backup_size_bytes=newest_size,
raw=dict(newest_raw),
)
def pbs_guest_type_to_pve_type(value: str) -> str:
if value == "vm":
return "qemu"
if value == "ct":
return "lxc"
return value
def normalize_pbs_namespace(value: str | None) -> str:
if value is None or value == "":
return "/"
return value
def api_host(value: str) -> str:
parsed = urlparse(value)
return parsed.hostname or value
def parse_timestamp(value: Any) -> datetime | None:
if isinstance(value, int):
return datetime.fromtimestamp(value).astimezone()
if isinstance(value, float):
return datetime.fromtimestamp(value).astimezone()
if isinstance(value, str) and value.strip().isdigit():
return datetime.fromtimestamp(int(value.strip())).astimezone()
return None
def parse_duration_seconds(value: Any) -> int | None:
if value is None:
return None
if isinstance(value, int):
return value if value >= 0 else None
text = str(value).strip()
if not text:
return None
if text.isdigit():
return int(text)
parts = text.split(":")
if len(parts) == 2 and all(part.isdigit() for part in parts):
minutes, seconds = [int(part) for part in parts]
return minutes * 60 + seconds
if len(parts) == 3 and all(part.isdigit() for part in parts):
hours, minutes, seconds = [int(part) for part in parts]
return hours * 3600 + minutes * 60 + seconds
return None
def string_value(data: dict[str, Any], *keys: str) -> str | None:
for key in keys:
value = data.get(key)
if value is None:
continue
return str(value)
return None
def first_present_string(data: dict[str, Any], *keys: str) -> str | None:
values: list[str] = []
for key in keys:
value = data.get(key)
if value is None:
continue
if isinstance(value, bool):
values.append(f"{key}={str(value).lower()}")
else:
values.append(f"{key}={value}")
return ", ".join(values) if values else None
def parse_enabled(data: dict[str, Any], default: bool | None = None) -> bool | None:
if "disable" in data:
return not parse_truthy(data["disable"])
if "enabled" in data:
return parse_truthy(data["enabled"])
if "enable" in data:
return parse_truthy(data["enable"])
return default
def parse_truthy(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, int):
return value != 0
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return bool(value)
def parse_vmid(value: Any) -> int | None:
if isinstance(value, int):
return value
if isinstance(value, str) and value.strip().isdigit():
return int(value.strip())
return None
def parse_optional_int(value: Any) -> int | None:
if isinstance(value, int):
return value
if isinstance(value, str) and value.strip().isdigit():
return int(value.strip())
return None
def parse_snapshot_size(raw_snapshot: dict[str, Any]) -> int | None:
for key in (
"size",
"backup-size",
"size-on-disk",
"backup-size-bytes",
"size-bytes",
"bytes",
):
size = parse_optional_int(raw_snapshot.get(key))
if size is not None and size >= 0:
return size
return None
def add_issue(
issues: list[CollectionIssue] | None,
severity: str,
component: str,
message: str,
details: str | None = None,
) -> None:
if issues is None:
return
issues.append(
CollectionIssue(
severity=severity,
component=component,
message=message,
details=details,
)
)