Initial commit

This commit is contained in:
2026-05-13 16:04:17 +02:00
commit b66612d672
43 changed files with 10515 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
+200
View File
@@ -0,0 +1,200 @@
from pathlib import Path
from pve_backup_report.models import PbsAccessUser, PbsBackupSnapshotSummary, ReportData
from pve_backup_report.cli import (
build_parser,
configured_pbs_clients,
datastore_name_from_raw,
ensure_report_output_dir_writable,
ensure_writable_directory,
run,
)
from pve_backup_report.config import AppConfig, PbsServerConfig
def test_cli_check_config(tmp_path, monkeypatch) -> None:
env_file = tmp_path / ".env"
env_file.write_text(
"\n".join(
[
"PVE_API_URL=https://pve.example.invalid:8006",
"PVE_API_TOKEN_ID=backup-report@pve!report",
"PVE_API_TOKEN_SECRET=secret",
]
),
encoding="utf-8",
)
monkeypatch.chdir(tmp_path)
assert run(["--check-config"]) == 0
def test_cli_has_dump_pbs_storage_usages() -> None:
args = build_parser().parse_args(["--dump-pbs-storage-usages"])
assert args.dump_pbs_storage_usages is True
def test_cli_has_dump_pbs_users() -> None:
args = build_parser().parse_args(["--dump-pbs-users"])
assert args.dump_pbs_users is True
def test_dump_report_data_does_not_export_sensitive_raw_fields(
monkeypatch,
capsys,
) -> None:
config = AppConfig(
pve_api_url="https://pve.example.invalid:8006",
pve_api_token_id="backup-report@pve!report",
pve_api_token_secret="secret",
report_output_dir=Path("reports"),
report_timezone="Europe/Paris",
pve_verify_tls=True,
pve_ca_bundle=None,
pve_timeout_seconds=30,
pve_backup_jobs_endpoint="/cluster/backup",
pve_task_history_limit=500,
pve_task_log_limit=5000,
pbs_hostnames={},
pbs_servers=(),
log_level="INFO",
report_filename_prefix="rapport-sauvegardes-pve",
)
report_data = ReportData(
pbs_access_users=[
PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs",
user_id="backup@pbs",
storage_id="BACKUP-PROD",
raw={
"Authorization": "PBSAPIToken=abc:secret",
"password": "secret-password",
},
)
],
pbs_snapshot_summaries={
("PBS01", "RAID5", "serveurs", "qemu", 100): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs",
snapshot_count=1,
raw={
"fingerprint": "aa:bb:cc",
"files": [{"filename": "index.json.blob"}],
"raw": {"secret": "secret-value"},
},
)
},
)
monkeypatch.setattr("pve_backup_report.cli.load_config", lambda: config)
monkeypatch.setattr(
"pve_backup_report.cli.collect_data_or_log_error",
lambda loaded_config, label: report_data,
)
assert run(["--dump-report-data"]) == 0
output = capsys.readouterr().out
assert "fingerprint" not in output
assert '"raw"' not in output
assert '"files"' not in output
assert "PBSAPIToken=abc:secret" not in output
assert "secret-password" not in output
assert "secret-value" not in output
assert '"pbs_access_users"' in output
assert '"auth_id": "backup@pbs"' in output
assert '"pbs_snapshot_summaries"' in output
assert '"snapshot_count": 1' in output
def test_datastore_name_from_raw() -> None:
assert datastore_name_from_raw({"name": "RAID5"}) == "RAID5"
assert datastore_name_from_raw({"store": "PBS2RAID5"}) == "PBS2RAID5"
assert datastore_name_from_raw({"datastore": "BACKUPSTORAGE"}) == "BACKUPSTORAGE"
assert datastore_name_from_raw({}) is None
def test_report_output_dir_must_be_directory(tmp_path) -> None:
output_file = tmp_path / "reports"
output_file.write_text("not a directory", encoding="utf-8")
try:
ensure_report_output_dir_writable(output_file)
except OSError as exc:
assert "REPORT_OUTPUT_DIR doit pointer vers un repertoire" in str(exc)
else:
raise AssertionError("OSError attendu")
def test_docker_report_output_dir_falls_back_to_local_reports(
tmp_path,
monkeypatch,
) -> None:
def fake_ensure_writable_directory(path: Path) -> None:
if path == Path("/reports"):
raise OSError("permission denied")
ensure_writable_directory(path)
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(
"pve_backup_report.cli.ensure_writable_directory",
fake_ensure_writable_directory,
)
assert ensure_report_output_dir_writable(Path("/reports")) == Path("reports")
assert (tmp_path / "reports").is_dir()
def test_configured_pbs_clients_uses_every_configured_server() -> None:
config = AppConfig(
pve_api_url="https://pve.example.invalid:8006",
pve_api_token_id="backup-report@pve!report",
pve_api_token_secret="secret",
report_output_dir=Path("reports"),
report_timezone="Europe/Paris",
pve_verify_tls=True,
pve_ca_bundle=None,
pve_timeout_seconds=30,
pve_backup_jobs_endpoint="/cluster/backup",
pve_task_history_limit=500,
pve_task_log_limit=5000,
pbs_hostnames={},
pbs_servers=(
PbsServerConfig(
prefix="PBS01",
name="PBS01",
api_url="https://backup-a.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret",
verify_tls=True,
ca_bundle=None,
timeout_seconds=30,
),
PbsServerConfig(
prefix="PBS04",
name="PBS04",
api_url="https://backup-d.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret4",
verify_tls=True,
ca_bundle=None,
timeout_seconds=30,
),
),
log_level="INFO",
report_filename_prefix="rapport-sauvegardes-pve",
)
clients = configured_pbs_clients(config)
try:
assert [client.server_name for client in clients] == ["PBS01", "PBS04"]
finally:
for client in clients:
client.close()
+857
View File
@@ -0,0 +1,857 @@
from pve_backup_report.collectors import (
collect_pbs_datastore_usages,
collect_pbs_access_users,
collect_pbs_gc_statuses,
collect_pbs_retention_policies,
collect_pbs_snapshot_summaries,
collect_guests,
extract_finished_backup_from_log_line,
extract_task_vmid,
guest_notes,
is_missing_pbs_snapshot_namespace,
normalize_backup_job,
normalize_guest,
normalize_last_backup_result,
normalize_pbs_storage,
normalize_pbs_retention_policy,
normalize_pbs_snapshot,
normalize_pbs_datastore_usage,
normalize_pbs_gc_status,
normalize_pbs_namespace,
normalize_pbs_access_user,
normalize_pool,
pbs_auth_user_id,
pbs_datastore_acl_path,
pbs_client_storages,
pbs_snapshot_scopes,
parse_vzdump_task_log,
merge_pbs_snapshot_summary,
task_duration_seconds,
)
from pve_backup_report.models import PbsStorage
from pve_backup_report.pbs_client import PbsHttpError
from pve_backup_report.pve_client import PveApiError
class FakePveClient:
def __init__(
self,
raw_resources: list[dict] | object,
configs: dict[tuple[str, str, int], dict] | None = None,
config_errors: dict[tuple[str, str, int], PveApiError] | None = None,
) -> None:
self._raw_resources = raw_resources
self._configs = configs or {}
self._config_errors = config_errors or {}
def get_cluster_resources(self) -> list[dict] | object:
return self._raw_resources
def get_qemu_config(self, node: str, vmid: int) -> dict:
return self._guest_config("qemu", node, vmid)
def get_lxc_config(self, node: str, vmid: int) -> dict:
return self._guest_config("lxc", node, vmid)
def _guest_config(self, guest_type: str, node: str, vmid: int) -> dict:
key = (guest_type, node, vmid)
error = self._config_errors.get(key)
if error is not None:
raise error
return self._configs.get(key, {})
class FakePbsClient:
def __init__(
self,
server_name: str,
api_url: str | None = "https://backup.example.invalid:8007",
pbs_hostnames: dict[str, str] | None = None,
raw_policies: list[dict] | None = None,
raw_datastores: list[dict] | None = None,
raw_namespaces: dict[str, list[dict]] | None = None,
raw_snapshots: dict[tuple[str, str], list[dict]] | None = None,
snapshot_errors: dict[tuple[str, str], Exception] | None = None,
raw_statuses: dict[str, dict] | None = None,
raw_gc_statuses: dict[str, dict] | None = None,
raw_users: list[dict] | None = None,
raw_permissions: dict[tuple[str, str], dict] | None = None,
) -> None:
self.server_name = server_name
self.api_url = api_url
self.config = type("FakeConfig", (), {"pbs_hostnames": pbs_hostnames or {}})()
self._raw_policies = raw_policies or []
self._raw_datastores = raw_datastores or []
self._raw_namespaces = raw_namespaces or {}
self._raw_snapshots = raw_snapshots or {}
self._snapshot_errors = snapshot_errors or {}
self._raw_statuses = raw_statuses or {}
self._raw_gc_statuses = raw_gc_statuses or {}
self._raw_users = raw_users or []
self._raw_permissions = raw_permissions or {}
def get_prune_jobs(self) -> list[dict]:
return self._raw_policies
def get_datastores(self) -> list[dict]:
return self._raw_datastores
def get_datastore_namespaces(self, datastore: str) -> list[dict]:
return self._raw_namespaces.get(datastore, [])
def get_datastore_snapshots(self, datastore: str, namespace: str | None) -> list[dict]:
error = self._snapshot_errors.get((datastore, namespace or "/"))
if error is not None:
raise error
return self._raw_snapshots.get((datastore, namespace or "/"), [])
def get_datastore_status(self, datastore: str) -> dict:
return self._raw_statuses[datastore]
def get_datastore_gc_status(self, datastore: str) -> dict:
return self._raw_gc_statuses[datastore]
def get_access_users(self) -> list[dict]:
return self._raw_users
def get_access_permissions(self, auth_id: str, path: str) -> dict:
return self._raw_permissions.get((auth_id, path), {})
def test_normalize_pbs_storage() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"username": "backup@pbs",
"server": "backup.example.invalid",
"datastore": "prod",
"namespace": "pve",
"disable": 0,
}
)
assert storage.storage_id == "backup-storage"
assert storage.username == "backup@pbs"
assert storage.server == "backup.example.invalid"
assert storage.datastore == "prod"
assert storage.namespace == "pve"
assert storage.enabled is True
def test_normalize_pbs_storage_defaults_to_enabled_when_disable_is_absent() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"server": "backup.example.invalid",
"datastore": "prod",
}
)
assert storage.enabled is True
def test_normalize_pbs_storage_marks_disabled() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"server": "backup.example.invalid",
"datastore": "prod",
"disable": 1,
}
)
assert storage.enabled is False
def test_normalize_pbs_access_user() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(
storage_id="backup-storage",
username="backup@pbs!pve",
datastore="RAID5",
namespace="prod",
),
{
"userid": "backup@pbs",
"enable": True,
"expire": "0",
"email": "admin@example.invalid",
"comment": "Compte PVE",
},
{"Datastore.Backup": True, "Datastore.Modify": False},
)
assert user.server_name == "PBS01"
assert user.auth_id == "backup@pbs!pve"
assert user.user_id == "backup@pbs"
assert user.storage_id == "backup-storage"
assert user.enabled is True
assert user.expire == 0
assert user.expire == 0
assert user.email == "admin@example.invalid"
assert user.comment == "Compte PVE"
assert user.permissions == {"Datastore.Backup": True, "Datastore.Modify": False}
def test_normalize_pbs_access_user_defaults_to_enabled_when_enable_is_absent() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(storage_id="backup-storage", username="backup@pbs"),
{"userid": "backup@pbs", "comment": "Compte PVE"},
)
assert user.enabled is True
def test_normalize_pbs_access_user_keeps_unknown_enabled_when_user_is_absent() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(storage_id="backup-storage", username="missing@pbs"),
{},
)
assert user.enabled is None
assert user.expire is None
def test_collect_pbs_access_users_matches_pve_storages_and_permissions() -> None:
users = collect_pbs_access_users(
[
FakePbsClient(
"PBS01",
raw_users=[
{
"userid": "backup@pbs",
"enable": True,
"email": "admin@example.invalid",
}
],
raw_permissions={
(
"backup@pbs!pve",
"/datastore/RAID5/prod",
): {
"/datastore/RAID5/prod": {
"Datastore.Backup": True,
"Datastore.Modify": False,
}
}
},
)
],
[
PbsStorage(
storage_id="backup-storage",
username="backup@pbs!pve",
server="PBS01",
datastore="RAID5",
namespace="prod",
)
],
)
assert len(users) == 1
assert users[0].server_name == "PBS01"
assert users[0].auth_id == "backup@pbs!pve"
assert users[0].user_id == "backup@pbs"
assert users[0].email == "admin@example.invalid"
assert users[0].permissions == {
"Datastore.Backup": True,
"Datastore.Modify": False,
}
def test_pbs_auth_user_id_removes_token_suffix() -> None:
assert pbs_auth_user_id("backup@pbs!pve") == "backup@pbs"
assert pbs_auth_user_id("backup@pbs") == "backup@pbs"
def test_pbs_datastore_acl_path() -> None:
assert pbs_datastore_acl_path("RAID5", None) == "/datastore/RAID5"
assert pbs_datastore_acl_path("RAID5", "/") == "/datastore/RAID5"
assert pbs_datastore_acl_path("RAID5", "prod") == "/datastore/RAID5/prod"
def test_normalize_pbs_retention_policy() -> None:
policy = normalize_pbs_retention_policy(
"PBS01",
{
"id": "prune-prod",
"store": "RAID5",
"ns": "serveurs-internes",
"schedule": "daily",
"keep-daily": "14",
"keep-weekly": 8,
"max-depth": 0,
"disable": 0,
},
)
assert policy.policy_id == "prune-prod"
assert policy.server_name == "PBS01"
assert policy.datastore == "RAID5"
assert policy.namespace == "serveurs-internes"
assert policy.keep_daily == 14
assert policy.keep_weekly == 8
assert policy.max_depth == 0
assert policy.enabled is True
def test_collect_pbs_retention_policies_supports_multiple_servers() -> None:
policies = collect_pbs_retention_policies(
[
FakePbsClient(
"PBS02",
raw_policies=[
{
"id": "prune-pbs02",
"store": "PBS2RAID5",
"ns": "serveurs-internes",
"schedule": "daily",
"keep-daily": 7,
}
],
),
FakePbsClient(
"PBS03",
raw_policies=[
{
"id": "prune-pbs03",
"store": "BACKUPSTORAGE",
"ns": "serveurs-internes",
"schedule": "weekly",
"keep-weekly": 4,
}
],
),
]
)
assert [policy.server_name for policy in policies] == ["PBS02", "PBS03"]
assert [policy.policy_id for policy in policies] == ["prune-pbs02", "prune-pbs03"]
def test_normalize_pbs_datastore_usage() -> None:
usage = normalize_pbs_datastore_usage(
"PBS01",
"RAID5",
{"total": "1000", "used": 400, "avail": 600},
)
assert usage.server_name == "PBS01"
assert usage.datastore == "RAID5"
assert usage.total_bytes == 1000
assert usage.used_bytes == 400
assert usage.available_bytes == 600
def test_collect_pbs_datastore_usages() -> None:
usages = collect_pbs_datastore_usages(
[
FakePbsClient(
"PBS01",
raw_datastores=[{"name": "RAID5"}],
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
)
],
[],
)
assert len(usages) == 1
assert usages[0].server_name == "PBS01"
assert usages[0].datastore == "RAID5"
assert usages[0].total_bytes == 1000
assert usages[0].used_bytes == 400
assert usages[0].available_bytes == 600
def test_collect_pbs_datastore_usages_uses_pve_storages_as_fallback() -> None:
usages = collect_pbs_datastore_usages(
[
FakePbsClient(
"PBS01",
raw_datastores=[],
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
)
],
[
PbsStorage(
storage_id="BACKUP-PRODR5",
server="PBS01",
datastore="RAID5",
)
],
)
assert len(usages) == 1
assert usages[0].datastore == "RAID5"
def test_pbs_client_storages_matches_hostname_mapping() -> None:
storages = pbs_client_storages(
FakePbsClient(
"PBS01",
api_url="https://backup.example.invalid:8007",
pbs_hostnames={"192.0.2.10": "backup.example.invalid"},
),
[
PbsStorage(
storage_id="BACKUP-PRODR5",
server="192.0.2.10",
datastore="RAID5",
)
],
)
assert [storage.storage_id for storage in storages] == ["BACKUP-PRODR5"]
def test_collect_pbs_datastore_usages_warns_when_no_datastore() -> None:
issues = []
usages = collect_pbs_datastore_usages(
[FakePbsClient("PBS01", raw_datastores=[])],
[],
issues,
)
assert usages == []
assert issues
assert issues[0].component == "pbs_storage_usage"
def test_normalize_pbs_gc_status_detects_running() -> None:
status = normalize_pbs_gc_status(
"PBS02",
"PBS2RAID5",
{"upid": "UPID:pbs02:gc", "schedule": "*:0/30", "next-run": 1778319000},
)
assert status.server_name == "PBS02"
assert status.datastore == "PBS2RAID5"
assert status.status == "en_cours"
assert status.schedule == "*:0/30"
assert status.next_run is not None
def test_collect_pbs_gc_statuses() -> None:
statuses = collect_pbs_gc_statuses(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_gc_statuses={"PBS2RAID5": {"upid": "UPID:pbs02:gc"}},
)
],
[],
)
assert len(statuses) == 1
assert statuses[0].status == "en_cours"
def test_pbs_snapshot_scopes_adds_all_pve_namespaces_to_api_datastores() -> None:
scopes = pbs_snapshot_scopes(
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": [{"ns": "sync-only"}]},
),
[
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
],
)
assert scopes == [
("PBS2RAID5", "Serveurs-PVELAB"),
("PBS2RAID5", "serveurs-internes"),
("PBS2RAID5", "sync-only"),
]
def test_collect_pbs_snapshot_summaries_reads_pve_namespaces_on_indirect_pbs() -> None:
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
raw_snapshots={
("PBS2RAID5", "serveurs-internes"): [
{
"backup-type": "vm",
"backup-id": "100",
"backup-time": 1775849405,
}
],
("PBS2RAID5", "Serveurs-PVELAB"): [
{
"backup-type": "ct",
"backup-id": "200",
"backup-time": 1775849405,
}
],
},
)
],
[
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
],
)
assert set(summaries) == {
("PBS02", "PBS2RAID5", "serveurs-internes", "qemu", 100),
("PBS02", "PBS2RAID5", "Serveurs-PVELAB", "lxc", 200),
}
def test_collect_pbs_snapshot_summaries_ignores_missing_non_root_namespace() -> None:
issues = []
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
snapshot_errors={
("PBS2RAID5", "absente"): PbsHttpError(
"/admin/datastore/PBS2RAID5/snapshots",
400,
"Bad Request",
)
},
)
],
[PbsStorage(storage_id="absente", namespace="absente")],
issues,
)
assert summaries == {}
assert issues == []
def test_collect_pbs_snapshot_summaries_keeps_root_bad_request_as_warning() -> None:
issues = []
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
snapshot_errors={
("PBS2RAID5", "/"): PbsHttpError(
"/admin/datastore/PBS2RAID5/snapshots",
400,
"Bad Request",
)
},
)
],
[PbsStorage(storage_id="root", namespace=None)],
issues,
)
assert summaries == {}
assert len(issues) == 1
assert issues[0].component == "pbs_snapshots"
def test_is_missing_pbs_snapshot_namespace_only_matches_non_root_400() -> None:
assert is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
"absente",
)
assert not is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
"/",
)
assert not is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 500, "Internal Server Error"),
"absente",
)
def test_normalize_pbs_snapshot() -> None:
summary = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775849405,
"size": 123456789,
},
)
assert summary is not None
assert summary.server_name == "PBS01"
assert summary.vmid == 1110001
assert summary.guest_type == "qemu"
assert summary.snapshot_count == 1
assert summary.newest_backup_size_bytes == 123456789
def test_merge_pbs_snapshot_summary_keeps_newest_size() -> None:
older = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775849405,
"size": 100,
},
)
newer = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775935805,
"size": 200,
},
)
assert older is not None
assert newer is not None
merged = merge_pbs_snapshot_summary(older, newer)
assert merged.snapshot_count == 2
assert merged.newest_backup_size_bytes == 200
def test_normalize_pbs_namespace_root() -> None:
assert normalize_pbs_namespace("") == "/"
assert normalize_pbs_namespace(None) == "/"
assert normalize_pbs_namespace("progiciels") == "progiciels"
def test_normalize_backup_job() -> None:
job = normalize_backup_job(
{
"id": "backup-prod",
"storage": "backup-storage",
"schedule": "daily 02:00",
"disable": 1,
"mode": "snapshot",
"vmid": "100,101",
"exclude": "101",
}
)
assert job.job_id == "backup-prod"
assert job.storage == "backup-storage"
assert job.schedule == "daily 02:00"
assert job.enabled is False
assert job.mode == "snapshot"
assert job.selection == "vmid=100,101"
assert job.excluded == "101"
def test_normalize_guest() -> None:
guest = normalize_guest(
{
"id": "qemu/100",
"vmid": "100",
"type": "qemu",
"name": "srv-app",
"node": "pve01",
"status": "running",
}
)
assert guest is not None
assert guest.vmid == 100
assert guest.name == "srv-app"
assert guest.guest_type == "qemu"
assert guest.node == "pve01"
assert guest.status == "running"
def test_guest_notes_reads_description_and_strips_empty() -> None:
assert guest_notes({"description": " notes applicatives "}) == "notes applicatives"
assert guest_notes({"description": " "}) is None
def test_collect_guests_enriches_notes_from_vm_and_ct_config() -> None:
guests = collect_guests(
FakePveClient(
[
{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"},
{"id": "lxc/200", "vmid": 200, "type": "lxc", "name": "ct", "node": "pve02"},
],
configs={
("qemu", "pve01", 100): {"description": "note VM"},
("lxc", "pve02", 200): {"description": "note CT"},
},
)
)
assert [guest.notes for guest in guests] == ["note VM", "note CT"]
def test_collect_guests_keeps_guest_when_notes_are_unavailable() -> None:
issues = []
guests = collect_guests(
FakePveClient(
[{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"}],
config_errors={
("qemu", "pve01", 100): PveApiError("config indisponible"),
},
),
issues,
)
assert len(guests) == 1
assert guests[0].notes is None
assert len(issues) == 1
assert issues[0].message == "notes VM/CT indisponibles"
def test_normalize_pool_members() -> None:
pool = normalize_pool(
"prod",
{
"members": [
{"type": "qemu", "vmid": 100},
{"type": "lxc", "id": "lxc/101"},
{"type": "storage", "id": "storage/local"},
]
},
)
assert pool.pool_id == "prod"
assert pool.vmids == {100, 101}
def test_normalize_last_backup_result_success() -> None:
result = normalize_last_backup_result(
{
"type": "vzdump",
"id": "100",
"status": "OK",
"starttime": 1778119140,
"endtime": 1778119200,
"node": "pve01",
}
)
assert result is not None
assert result.vmid == 100
assert result.status == "succes"
assert result.duration_seconds == 60
assert result.node == "pve01"
def test_normalize_last_backup_result_failure() -> None:
result = normalize_last_backup_result(
{
"type": "vzdump",
"id": "100",
"status": "interrupted",
"endtime": 1778119200,
}
)
assert result is not None
assert result.status == "echec"
def test_extract_task_vmid_from_guest_id() -> None:
assert extract_task_vmid({"id": "qemu/100"}) == 100
assert extract_task_vmid({"id": "lxc/101"}) == 101
def test_extract_task_vmid_from_upid() -> None:
assert extract_task_vmid({"upid": "UPID:pve01:123:456:789:vzdump:100:root@pam:"}) == 100
def test_parse_vzdump_task_log_extracts_per_guest_results() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve01"},
[
"INFO: Starting Backup of VM 100 (qemu)",
"INFO: Finished Backup of VM 100 (00:00:10)",
"INFO: Starting Backup of CT 101 (lxc)",
"INFO: Finished Backup of CT 101 (00:00:08)",
],
)
assert {result.vmid for result in results} == {100, 101}
assert {result.status for result in results} == {"succes"}
assert {result.vmid: result.duration_seconds for result in results} == {
100: 10,
101: 8,
}
def test_parse_vzdump_task_log_extracts_failure() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "interrupted", "endtime": 1778119200, "node": "pve01"},
[
"INFO: Starting Backup of VM 100 (qemu)",
"ERROR: Backup of VM 100 failed - command failed",
],
)
assert len(results) == 1
assert results[0].vmid == 100
assert results[0].status == "echec"
def test_parse_vzdump_task_log_extracts_job_vmid_list_and_skips_external() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve02"},
[
"INFO: starting new backup job: vzdump 100 101 102 --mode snapshot --storage PBS",
"INFO: skip external VMs: 101",
],
)
assert {result.vmid for result in results} == {100, 102}
assert {result.status for result in results} == {"succes"}
def test_parse_vzdump_task_log_uses_task_duration_for_started_guest_without_finished_line() -> None:
results = parse_vzdump_task_log(
{
"type": "vzdump",
"status": "OK",
"starttime": 1778119140,
"endtime": 1778119200,
"node": "pve01",
},
[
"INFO: starting new backup job: vzdump 100 101 --mode snapshot --storage PBS",
"INFO: skip external VMs: 101",
"INFO: Starting Backup of VM 100 (qemu)",
"INFO: creating Proxmox Backup Server archive 'vm/100/2026-05-07T00:00:00Z'",
],
)
assert len(results) == 1
assert results[0].vmid == 100
assert results[0].duration_seconds == 60
def test_extract_finished_backup_from_log_line_extracts_duration() -> None:
assert extract_finished_backup_from_log_line(
"INFO: Finished Backup of VM 100 (01:02:03)"
) == (100, 3723)
def test_task_duration_seconds_from_task_times() -> None:
assert task_duration_seconds({"starttime": 1778119140, "endtime": 1778119200}) == 60
+78
View File
@@ -0,0 +1,78 @@
import os
import pytest
from pve_backup_report.config import ConfigError, load_config, parse_pbs_servers
def test_load_config_from_env_file(tmp_path, monkeypatch) -> None:
env_file = tmp_path / ".env"
env_file.write_text(
"\n".join(
[
"PVE_API_URL=https://pve.example.invalid:8006",
"PVE_API_TOKEN_ID=backup-report@pve!report",
"PVE_API_TOKEN_SECRET=secret",
"PVE_VERIFY_TLS=false",
"PVE_TIMEOUT_SECONDS=10",
"PBS_HOSTNAMES=192.0.2.10=backup-a,192.0.2.11=backup-b",
]
),
encoding="utf-8",
)
monkeypatch.delenv("PVE_API_URL", raising=False)
monkeypatch.delenv("PVE_API_TOKEN_ID", raising=False)
monkeypatch.delenv("PVE_API_TOKEN_SECRET", raising=False)
for key in list(os.environ):
if key.startswith("PBS") and key != "PBS_HOSTNAMES":
monkeypatch.delenv(key, raising=False)
config = load_config(env_file)
assert config.pve_api_url == "https://pve.example.invalid:8006"
assert config.pve_verify_tls is False
assert config.pve_timeout_seconds == 10
assert config.pve_backup_jobs_endpoint == "/cluster/backup"
assert config.pve_task_history_limit == 500
assert config.pve_task_log_limit == 5000
assert config.configured_pbs_servers == ()
assert config.pbs_hostnames == {
"192.0.2.10": "backup-a",
"192.0.2.11": "backup-b",
}
def test_parse_pbs_servers_detects_unbounded_numeric_prefixes() -> None:
servers = parse_pbs_servers(
{
"PBS10_NAME": "PBS10",
"PBS10_API_URL": "https://backup-j.example.invalid:8007",
"PBS10_API_TOKEN_ID": "backup-report@pbs!report",
"PBS10_API_TOKEN_SECRET": "secret10",
"PBS02_NAME": "PBS02",
"PBS02_API_URL": "https://backup-b.example.invalid:8007",
"PBS02_API_TOKEN_ID": "backup-report@pbs!report",
"PBS02_API_TOKEN_SECRET": "secret2",
},
pve_verify_tls=True,
pve_timeout_seconds=30,
)
assert [server.prefix for server in servers] == ["PBS02", "PBS10"]
assert [server.name for server in servers] == ["PBS02", "PBS10"]
assert [server.api_url for server in servers] == [
"https://backup-b.example.invalid:8007",
"https://backup-j.example.invalid:8007",
]
def test_parse_pbs_servers_rejects_incomplete_api_block() -> None:
with pytest.raises(ConfigError, match="configuration PBS04 incomplete"):
parse_pbs_servers(
{
"PBS04_API_URL": "https://backup-d.example.invalid:8007",
"PBS04_API_TOKEN_ID": "backup-report@pbs!report",
},
pve_verify_tls=True,
pve_timeout_seconds=30,
)
+159
View File
@@ -0,0 +1,159 @@
from pve_backup_report.coverage import (
STATUS_DISABLED_PBS,
STATUS_INDETERMINATE,
STATUS_MISSING,
STATUS_NON_PBS_PLANNED,
STATUS_PBS_PLANNED,
analyze_backup_coverage,
calculate_backup_coverage,
calculate_explicit_vmid_coverage,
parse_job_vmids,
)
from pve_backup_report.models import (
BackupJob,
Guest,
PbsDatastoreUsage,
PbsGarbageCollectionStatus,
PbsStorage,
Pool,
ReportData,
)
def test_parse_job_vmids_with_exclusions() -> None:
job = BackupJob(
job_id="backup-prod",
enabled=True,
selection="vmid=100,101,102",
excluded="101",
)
assert parse_job_vmids(job) == {100, 102}
def test_calculate_explicit_vmid_coverage() -> None:
guests = [
Guest(vmid=100, name="srv-a", guest_type="qemu"),
Guest(vmid=101, name="srv-b", guest_type="lxc"),
]
jobs = [
BackupJob(
job_id="backup-prod",
enabled=True,
selection="vmid=100",
)
]
coverage = calculate_explicit_vmid_coverage(guests, jobs)
assert coverage[0].status == STATUS_INDETERMINATE
assert coverage[0].jobs[0].job_id == "backup-prod"
assert coverage[1].status == STATUS_MISSING
def test_all_job_covers_all_except_excluded() -> None:
guests = [
Guest(vmid=100, name="srv-a", guest_type="qemu"),
Guest(vmid=101, name="srv-b", guest_type="lxc"),
]
jobs = [
BackupJob(
job_id="backup-all",
storage="backup-storage",
enabled=True,
selection="all=true",
excluded="101",
)
]
storages = [PbsStorage(storage_id="backup-storage", enabled=True)]
coverage = calculate_backup_coverage(guests, jobs, storages)
assert coverage[0].status == STATUS_PBS_PLANNED
assert coverage[1].status == STATUS_MISSING
def test_non_pbs_storage_is_distinguished() -> None:
guests = [Guest(vmid=100, name="srv-a", guest_type="qemu")]
jobs = [
BackupJob(
job_id="backup-local",
storage="local",
enabled=True,
selection="vmid=100",
)
]
coverage = calculate_backup_coverage(guests, jobs, pbs_storages=[])
assert coverage[0].status == STATUS_NON_PBS_PLANNED
def test_disabled_pbs_storage_is_reported() -> None:
guests = [Guest(vmid=100, name="srv-a", guest_type="qemu")]
jobs = [
BackupJob(
job_id="backup-disabled",
storage="pbs-disabled",
enabled=True,
selection="vmid=100",
)
]
storages = [PbsStorage(storage_id="pbs-disabled", enabled=False)]
coverage = calculate_backup_coverage(guests, jobs, storages)
assert coverage[0].status == STATUS_DISABLED_PBS
def test_pool_job_covers_pool_members_except_excluded() -> None:
guests = [
Guest(vmid=100, name="srv-a", guest_type="qemu"),
Guest(vmid=101, name="srv-b", guest_type="lxc"),
Guest(vmid=102, name="srv-c", guest_type="qemu"),
]
jobs = [
BackupJob(
job_id="backup-pool",
storage="backup-storage",
enabled=True,
selection="pool=prod",
excluded="101",
)
]
storages = [PbsStorage(storage_id="backup-storage", enabled=True)]
pools = [Pool(pool_id="prod", vmids={100, 101})]
coverage = calculate_backup_coverage(guests, jobs, storages, pools)
assert coverage[0].status == STATUS_PBS_PLANNED
assert coverage[1].status == STATUS_MISSING
assert coverage[2].status == STATUS_MISSING
def test_analyze_backup_coverage_preserves_pbs_datastore_usages() -> None:
usage = PbsDatastoreUsage(
server_name="PBS01",
datastore="RAID5",
total_bytes=100,
used_bytes=40,
available_bytes=60,
)
report_data = ReportData(pbs_datastore_usages=[usage])
analyzed = analyze_backup_coverage(report_data)
assert analyzed.pbs_datastore_usages == [usage]
def test_analyze_backup_coverage_preserves_pbs_gc_statuses() -> None:
status = PbsGarbageCollectionStatus(
server_name="PBS02",
datastore="PBS2RAID5",
status="en_cours",
)
report_data = ReportData(pbs_gc_statuses=[status])
analyzed = analyze_backup_coverage(report_data)
assert analyzed.pbs_gc_statuses == [status]
+13
View File
@@ -0,0 +1,13 @@
def test_package_imports() -> None:
import pve_backup_report
import pve_backup_report.cli
import pve_backup_report.collectors
import pve_backup_report.config
import pve_backup_report.coverage
import pve_backup_report.logging_config
import pve_backup_report.models
import pve_backup_report.pbs_client
import pve_backup_report.pve_client
import pve_backup_report.report_pdf
assert pve_backup_report.__version__
+248
View File
@@ -0,0 +1,248 @@
from __future__ import annotations
from pathlib import Path
from pve_backup_report.config import AppConfig, PbsServerConfig
from pve_backup_report.pbs_client import PbsClient, PbsConnectionError, PbsHttpError
class FakeResponse:
def __init__(self, status_code: int, payload: dict, reason: str = "OK") -> None:
self.status_code = status_code
self._payload = payload
self.reason = reason
def json(self) -> dict:
return self._payload
class FakeSession:
def __init__(self, response: FakeResponse) -> None:
self.headers: dict[str, str] = {}
self.response = response
self.calls: list[tuple[str, dict[str, object] | None, int, bool | str]] = []
def get(
self,
url: str,
params: dict[str, object] | None,
timeout: int,
verify: bool | str,
) -> FakeResponse:
self.calls.append((url, params, timeout, verify))
return self.response
def close(self) -> None:
pass
def make_config() -> AppConfig:
return AppConfig(
pve_api_url="https://pve.example.invalid:8006",
pve_api_token_id="backup-report@pve!report",
pve_api_token_secret="secret",
report_output_dir=Path("reports"),
report_timezone="Europe/Paris",
pve_verify_tls=False,
pve_ca_bundle=None,
pve_timeout_seconds=30,
pve_backup_jobs_endpoint="/cluster/backup",
pve_task_history_limit=500,
pve_task_log_limit=5000,
pbs_hostnames={},
pbs_servers=(
PbsServerConfig(
prefix="PBS01",
name="PBS01",
api_url="https://backup-a.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret",
verify_tls=False,
ca_bundle=None,
timeout_seconds=30,
),
PbsServerConfig(
prefix="PBS02",
name="PBS02",
api_url="https://backup-b.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret2",
verify_tls=False,
ca_bundle=None,
timeout_seconds=30,
),
PbsServerConfig(
prefix="PBS10",
name="PBS10",
api_url="https://backup-j.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret10",
verify_tls=False,
ca_bundle=None,
timeout_seconds=120,
),
),
log_level="INFO",
report_filename_prefix="rapport-sauvegardes-pve",
)
def test_get_prune_jobs_uses_pbs_token_auth() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_prune_jobs() == []
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/config/prune", None, 30, False)
]
assert session.headers["Authorization"].startswith(
"PBSAPIToken=backup-report@pbs!report:"
)
assert "secret" in session.headers["Authorization"]
def test_get_datastore_snapshots_uses_namespace_param() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_datastore_snapshots("RAID5", "serveurs-internes") == []
assert session.calls == [
(
"https://backup-a.example.invalid:8007/api2/json/admin/datastore/RAID5/snapshots",
{"ns": "serveurs-internes"},
30,
False,
)
]
def test_get_datastores() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_datastores() == []
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/config/datastore", None, 30, False)
]
def test_get_datastore_status() -> None:
session = FakeSession(FakeResponse(200, {"data": {"total": 100, "used": 40, "avail": 60}}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_datastore_status("RAID5") == {"total": 100, "used": 40, "avail": 60}
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/admin/datastore/RAID5/status", None, 30, False)
]
def test_get_datastore_gc_status() -> None:
session = FakeSession(FakeResponse(200, {"data": {"upid": "UPID:pbs:gc"}}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_datastore_gc_status("RAID5") == {"upid": "UPID:pbs:gc"}
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/admin/datastore/RAID5/gc", None, 30, False)
]
def test_get_datastore_namespaces() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_datastore_namespaces("RAID5") == []
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/admin/datastore/RAID5/namespace", None, 30, False)
]
def test_get_access_users() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_access_users() == []
assert session.calls == [
("https://backup-a.example.invalid:8007/api2/json/access/users", None, 30, False)
]
def test_get_access_permissions() -> None:
session = FakeSession(FakeResponse(200, {"data": {"/datastore/RAID5": {}}}))
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_access_permissions("backup@pbs", "/datastore/RAID5") == {
"/datastore/RAID5": {}
}
assert session.calls == [
(
"https://backup-a.example.invalid:8007/api2/json/access/permissions",
{"auth-id": "backup@pbs", "path": "/datastore/RAID5"},
30,
False,
)
]
def test_pbs02_client_uses_pbs02_settings() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
config = make_config()
client = PbsClient(config, server=config.pbs_servers[1], session=session) # type: ignore[arg-type]
assert client.get_prune_jobs() == []
assert session.calls == [
("https://backup-b.example.invalid:8007/api2/json/config/prune", None, 30, False)
]
assert session.headers["Authorization"].startswith(
"PBSAPIToken=backup-report@pbs!report:"
)
assert "secret2" in session.headers["Authorization"]
def test_pbs10_client_uses_dynamically_discovered_settings() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
config = make_config()
client = PbsClient(config, server=config.pbs_servers[2], session=session) # type: ignore[arg-type]
assert client.get_prune_jobs() == []
assert session.calls == [
("https://backup-j.example.invalid:8007/api2/json/config/prune", None, 120, False)
]
assert session.headers["Authorization"].startswith(
"PBSAPIToken=backup-report@pbs!report:"
)
assert "secret10" in session.headers["Authorization"]
def test_pbs_sanitize_exception_masks_sensitive_values() -> None:
message = PbsClient._sanitize_exception(
PbsConnectionError(
"PBSAPIToken=backup@pbs!report:secret password=secret2 secret=secret3"
)
)
assert "report:secret" not in message
assert "secret2" not in message
assert "secret3" not in message
assert "PBSAPIToken=***" in message
assert "password=***" in message
assert "secret=***" in message
def test_pbs_http_error_message_is_sanitized() -> None:
session = FakeSession(
FakeResponse(
500,
{"data": "PBSAPIToken=backup@pbs!report:secret password=secret2"},
"Server Error",
)
)
client = PbsClient(make_config(), session=session) # type: ignore[arg-type]
try:
client.get_prune_jobs()
except PbsHttpError as exc:
assert "report:secret" not in exc.message
assert "secret2" not in exc.message
assert exc.message == "PBSAPIToken=*** password=***"
else:
raise AssertionError("PbsHttpError attendu")
+237
View File
@@ -0,0 +1,237 @@
from __future__ import annotations
from pathlib import Path
import pytest
from pve_backup_report.config import AppConfig
from pve_backup_report.pve_client import PveClient, PveConnectionError, PveHttpError
class FakeResponse:
def __init__(self, status_code: int, payload: dict, reason: str = "OK") -> None:
self.status_code = status_code
self._payload = payload
self.reason = reason
def json(self) -> dict:
return self._payload
class FakeSession:
def __init__(self, response: FakeResponse) -> None:
self.headers: dict[str, str] = {}
self.response = response
self.calls: list[tuple[str, dict[str, object] | None, int, bool | str]] = []
def get(
self,
url: str,
params: dict[str, object] | None,
timeout: int,
verify: bool | str,
) -> FakeResponse:
self.calls.append((url, params, timeout, verify))
return self.response
def close(self) -> None:
pass
def make_config() -> AppConfig:
return AppConfig(
pve_api_url="https://pve.example.invalid:8006",
pve_api_token_id="backup-report@pve!report",
pve_api_token_secret="secret",
report_output_dir=Path("reports"),
report_timezone="Europe/Paris",
pve_verify_tls=True,
pve_ca_bundle=None,
pve_timeout_seconds=30,
pve_backup_jobs_endpoint="/cluster/backup",
pve_task_history_limit=500,
pve_task_log_limit=5000,
pbs_hostnames={},
pbs_servers=(),
log_level="INFO",
report_filename_prefix="rapport-sauvegardes-pve",
)
def test_get_uses_token_auth_and_returns_data() -> None:
session = FakeSession(FakeResponse(200, {"data": [{"node": "pve1"}]}))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
data = client.get_nodes()
assert data == [{"node": "pve1"}]
assert session.calls == [
("https://pve.example.invalid:8006/api2/json/nodes", None, 30, True)
]
assert session.headers["Authorization"].startswith(
"PVEAPIToken=backup-report@pve!report="
)
assert "secret" in session.headers["Authorization"]
def test_http_error_keeps_endpoint_and_status() -> None:
session = FakeSession(FakeResponse(401, {"message": "permission denied"}, "Unauthorized"))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
with pytest.raises(PveHttpError) as exc_info:
client.get_storages()
assert exc_info.value.endpoint == "/storage"
assert exc_info.value.status_code == 401
def test_http_error_message_is_sanitized() -> None:
session = FakeSession(
FakeResponse(
500,
{"message": "PVEAPIToken=backup@pve!report=secret secret=secret2"},
"Server Error",
)
)
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
with pytest.raises(PveHttpError) as exc_info:
client.get_nodes()
assert "report=secret" not in exc_info.value.message
assert "secret2" not in exc_info.value.message
assert exc_info.value.message == "PVEAPIToken=*** secret=***"
def test_ca_bundle_overrides_boolean_verify_tls(tmp_path: Path) -> None:
ca_bundle = tmp_path / "internal-ca.pem"
ca_bundle.write_text("test ca", encoding="utf-8")
config = make_config()
config = AppConfig(
**{
**config.__dict__,
"pve_ca_bundle": ca_bundle,
}
)
session = FakeSession(FakeResponse(200, {"data": []}))
client = PveClient(config, session=session) # type: ignore[arg-type]
client.get_backup_jobs()
assert session.calls[0][3] == str(ca_bundle)
def test_check_required_endpoints_keeps_testing_after_error() -> None:
class MultiResponseSession(FakeSession):
def __init__(self) -> None:
super().__init__(FakeResponse(200, {"data": []}))
self.responses = [
FakeResponse(200, {"data": [{"node": "pve1"}]}),
FakeResponse(200, {"data": [{"storage": "pbs"}]}),
FakeResponse(200, {"data": [{"subdir": "jobs"}]}),
FakeResponse(404, {"message": "No such endpoint"}),
]
def get(
self,
url: str,
params: dict[str, object] | None,
timeout: int,
verify: bool | str,
) -> FakeResponse:
self.calls.append((url, params, timeout, verify))
return self.responses.pop(0)
client = PveClient(make_config(), session=MultiResponseSession()) # type: ignore[arg-type]
results = client.check_required_endpoints()
assert [result.endpoint for result in results] == [
"/nodes",
"/storage",
"/cluster",
"/cluster/backup",
]
assert [result.ok for result in results] == [True, True, True, False]
assert results[2].detail == "sous-endpoints: jobs"
def test_get_cluster_tasks_without_params() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_cluster_tasks() == []
assert session.calls == [
(
"https://pve.example.invalid:8006/api2/json/cluster/tasks",
None,
30,
True,
)
]
def test_get_node_tasks_without_params() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_node_tasks("pve01") == []
assert session.calls == [
(
"https://pve.example.invalid:8006/api2/json/nodes/pve01/tasks",
None,
30,
True,
)
]
def test_get_task_log_encodes_upid() -> None:
session = FakeSession(FakeResponse(200, {"data": []}))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
upid = "UPID:pve01:123:456:789:vzdump:100:root@pam:"
assert client.get_task_log("pve01", upid) == []
assert session.calls == [
(
"https://pve.example.invalid:8006/api2/json/nodes/pve01/tasks/UPID%3Apve01%3A123%3A456%3A789%3Avzdump%3A100%3Aroot%40pam%3A/log",
{"start": 0, "limit": 5000},
30,
True,
)
]
def test_get_guest_config_endpoints() -> None:
session = FakeSession(FakeResponse(200, {"data": {"description": "note"}}))
client = PveClient(make_config(), session=session) # type: ignore[arg-type]
assert client.get_qemu_config("pve01", 100) == {"description": "note"}
assert client.get_lxc_config("pve02", 200) == {"description": "note"}
assert session.calls == [
(
"https://pve.example.invalid:8006/api2/json/nodes/pve01/qemu/100/config",
None,
30,
True,
),
(
"https://pve.example.invalid:8006/api2/json/nodes/pve02/lxc/200/config",
None,
30,
True,
),
]
def test_pve_sanitize_exception_masks_sensitive_values() -> None:
message = PveClient._sanitize_exception(
PveConnectionError(
"PVEAPIToken=backup@pve!report=secret PVE_API_TOKEN_SECRET=secret2"
)
)
assert "report=secret" not in message
assert "secret2" not in message
assert "PVEAPIToken=***" in message
assert "PVE_API_TOKEN_SECRET=***" in message
+216
View File
@@ -0,0 +1,216 @@
from dataclasses import replace
from pathlib import Path
from pve_backup_report.config import AppConfig, PbsServerConfig
from pve_backup_report.coverage import STATUS_MISSING, STATUS_PBS_PLANNED
from pve_backup_report.models import (
BackupCoverage,
BackupJob,
Guest,
LastBackupResult,
PbsAccessUser,
PbsBackupSnapshotSummary,
PbsRetentionPolicy,
PbsStorage,
ReportData,
)
from pve_backup_report.report_data import build_report_summary, prepare_report_data, report_data_to_dict
def make_config() -> AppConfig:
return AppConfig(
pve_api_url="https://pve.example.invalid:8006",
pve_api_token_id="backup-report@pve!report",
pve_api_token_secret="secret",
report_output_dir=Path("reports"),
report_timezone="Europe/Paris",
pve_verify_tls=False,
pve_ca_bundle=None,
pve_timeout_seconds=30,
pve_backup_jobs_endpoint="/cluster/backup",
pve_task_history_limit=500,
pve_task_log_limit=5000,
pbs_hostnames={},
pbs_servers=(),
log_level="INFO",
report_filename_prefix="rapport-sauvegardes-pve",
)
def test_build_report_summary() -> None:
guest_vm = Guest(vmid=100, name="srv-a", guest_type="qemu")
guest_ct = Guest(vmid=101, name="ct-a", guest_type="lxc")
active_job = BackupJob(job_id="backup-a", enabled=True)
inactive_job = BackupJob(job_id="backup-b", enabled=False)
report_data = ReportData(
pbs_storages=[PbsStorage(storage_id="backup-storage")],
guests=[guest_vm, guest_ct],
backup_jobs=[active_job, inactive_job],
coverage=[
BackupCoverage(guest=guest_vm, status=STATUS_PBS_PLANNED),
BackupCoverage(guest=guest_ct, status=STATUS_MISSING),
],
)
summary = build_report_summary(report_data, make_config())
assert summary.total_vm == 1
assert summary.total_ct == 1
assert summary.total_guests == 2
assert summary.pbs_storage_count == 1
assert summary.backup_job_count == 2
assert summary.active_backup_job_count == 1
assert summary.inactive_backup_job_count == 1
assert summary.pbs_planned_count == 1
assert summary.missing_count == 1
def test_report_data_to_dict_keeps_pdf_inputs() -> None:
guest = Guest(vmid=100, name="srv-a", guest_type="qemu", node="pve01")
job = BackupJob(job_id="backup-a", storage="backup-storage", schedule="23:00")
report_data = ReportData(
pbs_storages=[PbsStorage(storage_id="backup-storage", server="backup.example.invalid")],
pbs_access_users=[
PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs",
user_id="backup@pbs",
storage_id="backup-storage",
permissions={"Datastore.Backup": True},
raw={
"Authorization": "PBSAPIToken=backup@pbs!report:secret",
"password": "secret",
"token": "secret",
},
)
],
pbs_retention_policies=[
PbsRetentionPolicy(
policy_id="prune-prod",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs-internes",
keep_daily=14,
)
],
pbs_snapshot_summaries={
("PBS01", "RAID5", "serveurs-internes", "qemu", 100): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
snapshot_count=3,
raw={
"fingerprint": "aa:bb:cc",
"files": [{"filename": "index.json.blob"}],
"owner": "backup@pbs",
},
)
},
guests=[guest],
backup_jobs=[job],
coverage=[
BackupCoverage(
guest=guest,
status=STATUS_PBS_PLANNED,
jobs=[job],
storages=["backup-storage"],
)
],
last_backup_results={100: LastBackupResult(vmid=100, status="succes")},
)
data = report_data_to_dict(report_data)
assert data["pbs_storages"][0]["id"] == "backup-storage"
assert data["pbs_access_users"][0]["auth_id"] == "backup@pbs"
assert data["pbs_access_users"][0]["server"] == "PBS01"
assert data["pbs_access_users"][0]["user_id"] == "backup@pbs"
assert data["pbs_access_users"][0]["permissions"] == {"Datastore.Backup": True}
assert "raw" not in data["pbs_access_users"][0]
assert data["pbs_server_names"] == []
assert data["pbs_retention_policies"][0]["id"] == "prune-prod"
assert data["pbs_retention_policies"][0]["keep_daily"] == 14
assert data["pbs_snapshot_summaries"][0]["snapshot_count"] == 3
assert data["pbs_snapshot_summaries"][0]["server"] == "PBS01"
assert data["pbs_snapshot_summaries"][0]["type"] == "qemu"
assert "raw" not in data["pbs_snapshot_summaries"][0]
assert data["backup_jobs"][0]["id"] == "backup-a"
assert data["coverage"][0]["vmid"] == 100
assert data["coverage"][0]["jobs"] == ["backup-a"]
assert data["coverage"][0]["last_backup"]["status"] == "succes"
def test_report_data_to_dict_redacts_sensitive_nested_fields() -> None:
report_data = ReportData(
pbs_snapshot_summaries={
("PBS01", "RAID5", "serveurs", "qemu", 100): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs",
snapshot_count=1,
raw={
"nested": {
"api_token_secret": "secret",
"safe": "visible",
},
"ticket": "secret-ticket",
},
)
}
)
data = report_data_to_dict(report_data)
text = str(data)
assert "secret" not in text
assert "ticket" not in text
assert "api_token_secret" not in text
def test_prepare_report_data_keeps_only_configured_pbs_servers() -> None:
config = replace(
make_config(),
pbs_servers=(
PbsServerConfig(
prefix="PBS01",
name="PBS01",
api_url="https://backup.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret",
verify_tls=True,
ca_bundle=None,
timeout_seconds=30,
),
PbsServerConfig(
prefix="PBS04",
name="PBS04",
api_url="https://backup-extra.example.invalid:8007",
api_token_id="backup-report@pbs!report",
api_token_secret="secret4",
verify_tls=True,
ca_bundle=None,
timeout_seconds=30,
),
),
)
access_user = PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs",
user_id="backup@pbs",
storage_id="backup-storage",
)
report_data = prepare_report_data(
ReportData(
pbs_server_names=["PBS01", "PBS02", "PBS03", "PBS04"],
pbs_access_users=[access_user],
),
config,
)
assert report_data.pbs_server_names == ["PBS01", "PBS04"]
assert report_data.pbs_access_users == [access_user]
+489
View File
@@ -0,0 +1,489 @@
from datetime import datetime
from pve_backup_report.models import (
BackupCoverage,
BackupJob,
Guest,
LastBackupResult,
PbsAccessUser,
PbsBackupSnapshotSummary,
PbsDatastoreUsage,
PbsRetentionPolicy,
PbsStorage,
ReportData,
)
from pve_backup_report.report_pdf import coverage_sort_key
from pve_backup_report.report_pdf import coverage_row
from pve_backup_report.report_pdf import backup_retention_row
from pve_backup_report.report_pdf import build_table
from pve_backup_report.report_pdf import build_backup_retention_rows
from pve_backup_report.report_pdf import build_styles
from pve_backup_report.report_pdf import display_retention_delta
from pve_backup_report.report_pdf import expected_retention_versions
from pve_backup_report.report_pdf import format_duration
from pve_backup_report.report_pdf import find_snapshot_summary
from pve_backup_report.report_pdf import format_last_backup
from pve_backup_report.report_pdf import format_size
from pve_backup_report.report_pdf import pbs_datastore_usage_row
from pve_backup_report.report_pdf import pbs_access_user_row
from pve_backup_report.report_pdf import retention_policy_row
from pve_backup_report.report_pdf import add_table_of_contents
from pve_backup_report.report_pdf import unique_report_path
from pve_backup_report.report_pdf import format_pbs_server
def test_unique_report_path_uses_timestamp(tmp_path) -> None:
generated_at = datetime(2026, 5, 7, 2, 0, 0)
path = unique_report_path(tmp_path, "rapport", generated_at)
assert path.name == "rapport-2026-05-07-020000.pdf"
def test_unique_report_path_does_not_overwrite(tmp_path) -> None:
generated_at = datetime(2026, 5, 7, 2, 0, 0)
existing = tmp_path / "rapport-2026-05-07-020000.pdf"
existing.write_text("existing", encoding="utf-8")
path = unique_report_path(tmp_path, "rapport", generated_at)
assert path.name == "rapport-2026-05-07-020000-1.pdf"
def test_format_pbs_server_with_hostname_mapping() -> None:
assert (
format_pbs_server("192.0.2.10", {"192.0.2.10": "backup-display"})
== "192.0.2.10 (backup-display)"
)
def test_format_pbs_server_without_mapping() -> None:
assert format_pbs_server("192.0.2.10", {}) == "192.0.2.10"
def test_format_duration() -> None:
assert format_duration(3723) == "01:02:03"
def test_format_size() -> None:
assert format_size(1536) == "1.5 Kio"
assert format_size(2 * 1024 * 1024 * 1024) == "2.0 Gio"
def test_pbs_datastore_usage_row() -> None:
usage = PbsDatastoreUsage(
server_name="PBS01",
datastore="RAID5",
total_bytes=10 * 1024 * 1024 * 1024,
used_bytes=4 * 1024 * 1024 * 1024,
available_bytes=6 * 1024 * 1024 * 1024,
)
assert pbs_datastore_usage_row(usage) == [
"PBS01",
"RAID5",
"10.0 Gio",
"4.0 Gio",
"6.0 Gio",
]
def test_pbs_access_user_row_displays_permissions() -> None:
user = PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs!pve",
user_id="backup@pbs",
storage_id="BACKUP-PROD",
datastore="RAID5",
namespace="serveurs",
enabled=True,
expire=0,
email="admin@example.invalid",
permissions={"Datastore.Backup": True, "Datastore.Modify": False},
comment="Compte PVE",
)
assert pbs_access_user_row(user) == [
"PBS01",
"backup@pbs!pve",
"BACKUP-PROD",
"RAID5",
"serveurs",
"oui",
"aucune",
"admin@example.invalid",
"Datastore.Backup",
"Compte PVE",
]
def test_build_table_centers_table() -> None:
table = build_table([["Colonne"], ["Valeur"]], [4])
assert table.hAlign == "CENTER"
def test_add_table_of_contents_adds_page_break() -> None:
story: list[object] = []
add_table_of_contents(story, build_styles())
assert len(story) == 3
def test_format_last_backup_includes_duration() -> None:
result = LastBackupResult(
vmid=100,
status="succes",
finished_at=datetime(2026, 5, 7, 2, 14),
duration_seconds=222,
)
assert format_last_backup(result) == "Succes - 2026-05-07 02:14 - duree 00:03:42"
def test_retention_policy_row_splits_columns() -> None:
policy = PbsRetentionPolicy(
policy_id="prune-prod",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs-internes",
schedule="daily",
enabled=True,
keep_last=1,
keep_hourly=2,
keep_daily=14,
keep_weekly=8,
keep_monthly=3,
keep_yearly=1,
max_depth=0,
)
assert retention_policy_row(policy) == [
"PBS01",
"RAID5",
"serveurs-internes",
"daily",
"oui",
1,
2,
14,
8,
3,
1,
0,
]
def test_find_snapshot_summary_matches_storage_namespace_and_guest() -> None:
guest = Guest(vmid=100, name="srv", guest_type="qemu")
item = BackupCoverage(
guest=guest,
status="sauvegarde_pbs_planifiee",
storages=["BACKUP-PRODR5"],
)
summary = PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
snapshot_count=13,
)
assert (
find_snapshot_summary(
item,
{"BACKUP-PRODR5": "RAID5"},
{"BACKUP-PRODR5": "serveurs-internes"},
{("PBS01", "RAID5", "serveurs-internes", "qemu", 100): summary},
"PBS01",
)
== summary
)
def test_find_snapshot_summary_matches_pbs02_without_pve_storage_datastore() -> None:
guest = Guest(vmid=100, name="srv", guest_type="qemu")
item = BackupCoverage(
guest=guest,
status="sauvegarde_pbs_planifiee",
storages=["BACKUP-PRODR5"],
)
summary = PbsBackupSnapshotSummary(
server_name="PBS02",
vmid=100,
guest_type="qemu",
datastore="PBS2RAID5",
namespace="serveurs-internes",
snapshot_count=13,
)
assert (
find_snapshot_summary(
item,
{"BACKUP-PRODR5": "RAID5"},
{"BACKUP-PRODR5": "serveurs-internes"},
{("PBS02", "PBS2RAID5", "serveurs-internes", "qemu", 100): summary},
"PBS02",
)
== summary
)
def test_coverage_sort_key_uses_namespace_then_schedule_then_vmid() -> None:
first = BackupCoverage(
guest=Guest(vmid=200, name="b", guest_type="qemu"),
status="sauvegarde_pbs_planifiee",
jobs=[BackupJob(job_id="backup-b", schedule="22:00")],
storages=["STORAGE-B"],
)
second = BackupCoverage(
guest=Guest(vmid=100, name="a", guest_type="qemu"),
status="sauvegarde_pbs_planifiee",
jobs=[BackupJob(job_id="backup-a", schedule="21:00")],
storages=["STORAGE-A"],
)
third = BackupCoverage(
guest=Guest(vmid=50, name="c", guest_type="qemu"),
status="non_sauvegardee",
)
sorted_items = sorted(
[first, second, third],
key=lambda item: coverage_sort_key(
item,
{
"STORAGE-A": "serveurs-internes",
"STORAGE-B": "Serveurs-PVELAB",
},
),
)
assert [item.guest.vmid for item in sorted_items] == [200, 100, 50]
def test_coverage_row_includes_notes_after_name() -> None:
row = coverage_row(
BackupCoverage(
guest=Guest(vmid=100, name="srv", guest_type="qemu", notes="note applicative"),
status="sauvegarde_pbs_planifiee",
),
include_storage=True,
)
assert row[:3] == [100, "srv", "note applicative"]
def test_coverage_row_uses_french_missing_notes_label() -> None:
row = coverage_row(
BackupCoverage(
guest=Guest(vmid=100, name="srv", guest_type="qemu"),
status="sauvegarde_pbs_planifiee",
),
include_storage=True,
)
assert row[2] == "non renseigné"
def test_backup_retention_row_uses_snapshot_summary() -> None:
summary = PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
snapshot_count=13,
oldest_backup_at=datetime(2026, 3, 27, 21, 30),
newest_backup_at=datetime(2026, 5, 7, 21, 30),
newest_backup_size_bytes=2 * 1024 * 1024 * 1024,
)
assert backup_retention_row(
summary,
Guest(vmid=100, name="srv", guest_type="qemu"),
) == [
100,
"srv",
"serveurs-internes",
"RAID5",
"Active sur PVE",
"13",
"non renseigne",
"non renseigne",
"2026-03-27 21:30",
"2026-05-07 21:30",
"2.0 Gio",
]
def test_display_retention_delta_formats_sign() -> None:
assert display_retention_delta(13, 12) == "+1"
assert display_retention_delta(11, 12) == "-1"
assert display_retention_delta(12, 12) == "0"
assert display_retention_delta(12, None) == "non renseigne"
def test_expected_retention_versions_uses_active_policy_for_snapshot_namespace() -> None:
summary = PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
)
report_data = ReportData(
pbs_retention_policies=[
PbsRetentionPolicy(
policy_id="disabled",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs-internes",
enabled=False,
keep_daily=99,
),
PbsRetentionPolicy(
policy_id="active",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs-internes",
keep_last=1,
keep_daily=7,
keep_weekly=4,
),
]
)
assert expected_retention_versions(report_data, summary) == 12
def test_build_backup_retention_rows_includes_inactive_guest() -> None:
report_data = ReportData(
guests=[
Guest(vmid=100, name="srv", guest_type="qemu"),
],
pbs_storages=[
PbsStorage(storage_id="backup-storage", namespace="serveurs-internes"),
PbsStorage(storage_id="pbs-root", namespace=None),
],
pbs_retention_policies=[
PbsRetentionPolicy(
policy_id="prune-root",
server_name="PBS01",
datastore="RAID5",
namespace="/",
keep_last=5,
),
PbsRetentionPolicy(
policy_id="prune-serveurs",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs-internes",
keep_last=1,
keep_daily=7,
keep_weekly=4,
),
],
pbs_snapshot_summaries={
("PBS01", "RAID5", "serveurs-internes", "qemu", 100): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
snapshot_count=13,
oldest_backup_at=datetime(2026, 3, 27, 21, 30),
newest_backup_at=datetime(2026, 5, 7, 21, 30),
newest_backup_size_bytes=10 * 1024 * 1024,
),
("PBS01", "RAID5", "serveurs-internes", "qemu", 200): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=200,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs-internes",
snapshot_count=4,
oldest_backup_at=datetime(2026, 4, 1, 21, 30),
newest_backup_at=datetime(2026, 5, 8, 21, 30),
newest_backup_size_bytes=3 * 1024 * 1024 * 1024,
),
("PBS01", "RAID5", "hors-pve", "qemu", 300): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=300,
guest_type="qemu",
datastore="RAID5",
namespace="hors-pve",
snapshot_count=2,
oldest_backup_at=datetime(2026, 4, 2, 21, 30),
newest_backup_at=datetime(2026, 5, 8, 21, 45),
),
("PBS01", "RAID5", "/", "qemu", 400): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=400,
guest_type="qemu",
datastore="RAID5",
namespace="/",
snapshot_count=5,
oldest_backup_at=datetime(2026, 4, 3, 21, 30),
newest_backup_at=datetime(2026, 5, 8, 22, 0),
newest_backup_size_bytes=512,
raw={"name": "srv-racine"},
),
},
)
rows = build_backup_retention_rows(report_data, "PBS01")
assert rows[0] == [
"VMID",
"Nom VM/CT",
"Namespace",
"Datastore",
"Etat PVE",
"Nombre de versions",
"Nombre attendu de versions",
"Delta",
"Plus ancienne",
"Plus recente",
"Taille",
]
assert rows[1] == [
400,
"srv-racine",
"/",
"RAID5",
"Non-active sur PVE",
"5",
"5",
"0",
"2026-04-03 21:30",
"2026-05-08 22:00",
"512 o",
]
assert rows[2] == [
100,
"srv",
"serveurs-internes",
"RAID5",
"Active sur PVE",
"13",
"12",
"+1",
"2026-03-27 21:30",
"2026-05-07 21:30",
"10.0 Mio",
]
assert rows[3] == [
200,
"non renseigne",
"serveurs-internes",
"RAID5",
"Non-active sur PVE",
"4",
"12",
"-8",
"2026-04-01 21:30",
"2026-05-08 21:30",
"3.0 Gio",
]
+285
View File
@@ -0,0 +1,285 @@
from datetime import datetime
from pve_backup_report.models import (
BackupCoverage,
BackupJob,
Guest,
PbsAccessUser,
PbsBackupSnapshotSummary,
PbsDatastoreUsage,
PbsGarbageCollectionStatus,
PbsRetentionPolicy,
PbsStorage,
ReportData,
ReportSummary,
)
from pve_backup_report.report_weasy_pdf import build_template_context
from pve_backup_report.report_weasy_pdf import render_html
def test_build_template_context_contains_sections() -> None:
report_data = ReportData(
pbs_storages=[
PbsStorage(
storage_id="BACKUP-PROD",
username="backup@pbs",
server="192.0.2.10",
datastore="RAID5",
namespace="serveurs",
enabled=True,
),
PbsStorage(
storage_id="BACKUP-LAB",
username="backup@pbs",
server="192.0.2.10",
datastore="RAID5",
namespace="lab",
enabled=True,
)
],
pbs_datastore_usages=[
PbsDatastoreUsage(
server_name="PBS01",
datastore="RAID5",
total_bytes=100,
used_bytes=60,
available_bytes=40,
)
],
pbs_access_users=[
PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs",
user_id="backup@pbs",
storage_id="BACKUP-PROD",
datastore="RAID5",
namespace="serveurs",
enabled=True,
expire=0,
email="admin@example.invalid",
permissions={"Datastore.Backup": True, "Datastore.Modify": False},
comment="Compte PVE",
)
],
pbs_gc_statuses=[
PbsGarbageCollectionStatus(
server_name="PBS01",
datastore="RAID5",
status="en_cours",
)
],
pbs_retention_policies=[
PbsRetentionPolicy(
policy_id="prune-serveurs",
server_name="PBS01",
datastore="RAID5",
namespace="serveurs",
keep_daily=3,
),
PbsRetentionPolicy(
policy_id="prune-lab",
server_name="PBS01",
datastore="RAID5",
namespace="lab",
keep_daily=3,
),
],
guests=[
Guest(vmid=100, name="srv", guest_type="qemu", notes="note production"),
Guest(vmid=101, name="lab", guest_type="qemu"),
],
coverage=[
BackupCoverage(
guest=Guest(vmid=100, name="srv", guest_type="qemu", notes="note production"),
status="sauvegarde_pbs_planifiee",
jobs=[BackupJob(job_id="backup-prod", schedule="22:00")],
storages=["BACKUP-PROD"],
),
BackupCoverage(
guest=Guest(vmid=101, name="lab", guest_type="qemu"),
status="sauvegarde_pbs_planifiee",
jobs=[BackupJob(job_id="backup-lab", schedule="21:00")],
storages=["BACKUP-LAB"],
)
],
pbs_snapshot_summaries={
("PBS01", "RAID5", "serveurs", "qemu", 100): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=100,
guest_type="qemu",
datastore="RAID5",
namespace="serveurs",
snapshot_count=3,
oldest_backup_at=datetime(2026, 5, 7, 22, 0),
newest_backup_at=datetime(2026, 5, 9, 22, 0),
),
("PBS01", "RAID5", "lab", "qemu", 101): PbsBackupSnapshotSummary(
server_name="PBS01",
vmid=101,
guest_type="qemu",
datastore="RAID5",
namespace="lab",
snapshot_count=2,
oldest_backup_at=datetime(2026, 5, 8, 21, 0),
newest_backup_at=datetime(2026, 5, 9, 21, 0),
),
},
summary=ReportSummary(generated_at=datetime(2026, 5, 9, 2, 0)),
)
context = build_template_context(
report_data,
{"192.0.2.10": "backup-display"},
)
assert context["generated_at"] == "2026-05-09 02:00"
assert [section.title for section in context["sections"]][:4] == [
"Resume",
"Stockages PBS déclarés sur PVE",
"Utilisateurs PBS - Audit des accès",
"Espaces de stockage PBS",
]
access_section = next(
section for section in context["sections"] if section.title == "Utilisateurs PBS - Audit des accès"
)
assert access_section.headers == [
"Serveur PBS",
"Auth-id",
"Storage PVE",
"Datastore",
"Namespace",
"Actif",
"Expiration",
"Email",
"Permissions",
"Commentaire",
]
assert access_section.rows[0] == [
"PBS01",
"backup@pbs",
"BACKUP-PROD",
"RAID5",
"serveurs",
"oui",
"aucune",
"admin@example.invalid",
"Datastore.Backup",
"Compte PVE",
]
missing_section = next(
section for section in context["sections"] if section.title == "VM/CT non sauvegardees"
)
assert missing_section.headers == ["VMID", "Nom", "Notes", "Type", "Noeud", "Etat", "Detail"]
coverage_group = next(
section for section in context["sections"] if section.title == "Sauvegarde des VM/CT"
)
assert coverage_group.level == 1
coverage_sections = [
section
for section in context["sections"]
if section.title.startswith("Sauvegarde des VM/CT -")
]
assert [section.title for section in coverage_sections] == [
"Sauvegarde des VM/CT - lab",
"Sauvegarde des VM/CT - serveurs",
]
coverage_section = next(
section for section in coverage_sections if section.title == "Sauvegarde des VM/CT - serveurs"
)
assert "Namespace" not in coverage_section.headers
assert coverage_section.headers[2] == "Notes"
assert coverage_section.rows[0][2] == "note production"
assert coverage_section.rows[0][7] == "192.0.2.10 (backup-display)"
assert coverage_section.rows[0][9] == "non renseigne"
retention_group = next(
section for section in context["sections"] if section.title == "Retention des sauvegardes VM/CT"
)
assert retention_group.level == 1
assert retention_group.warning is None
retention_sections = [
section
for section in context["sections"]
if section.title.startswith("Retention des sauvegardes VM/CT PBS01")
]
assert [section.title for section in retention_sections] == [
"Retention des sauvegardes VM/CT PBS01 - lab",
"Retention des sauvegardes VM/CT PBS01 - serveurs",
]
assert not any(
section.title.startswith("Retention des sauvegardes VM/CT PBS02")
for section in context["sections"]
)
assert not any(
section.title.startswith("Retention des sauvegardes VM/CT PBS03")
for section in context["sections"]
)
assert all("Namespace" not in section.headers for section in retention_sections)
assert all("Datastore" in section.headers for section in retention_sections)
assert all("Nombre attendu de versions" in section.headers for section in retention_sections)
assert all("Delta" in section.headers for section in retention_sections)
assert all("garbage collector" in (section.warning or "") for section in retention_sections)
assert retention_sections[0].rows[0][0] == 101
assert retention_sections[0].rows[0][2] == "RAID5"
assert retention_sections[0].rows[0][3] == "Active sur PVE"
assert retention_sections[0].rows[0][5] == "3"
assert retention_sections[0].rows[0][6] == "-1"
def test_render_html_keeps_css_unescaped() -> None:
html = render_html(ReportData())
assert 'content: "Rapport des sauvegardes Proxmox VE"' in html
assert """ not in html
assert '<h1 class="section-group-title">' in html
assert "Sauvegarde des VM/CT" in html
assert '<h1 class="section-group-title">Retention des sauvegardes VM/CT</h1>' not in html
def test_pdf_pbs_access_users_table_keeps_expected_fields_without_raw_secrets() -> None:
report_data = ReportData(
pbs_access_users=[
PbsAccessUser(
server_name="PBS01",
auth_id="backup@pbs",
user_id="backup@pbs",
storage_id="BACKUP-PROD",
datastore="RAID5",
namespace="serveurs",
enabled=True,
expire=0,
email="admin@example.invalid",
permissions={"Datastore.Backup": True},
comment="Compte PVE",
raw={
"Authorization": "PBSAPIToken=abc:secret",
"api_token_secret": "secret-value",
"password": "secret-password",
},
)
],
)
html = render_html(report_data)
assert "Utilisateurs PBS - Audit des accès" in html
assert "backup@pbs" in html
assert "BACKUP-PROD" in html
assert "RAID5" in html
assert "serveurs" in html
assert "Datastore.Backup" in html
assert "Compte PVE" in html
assert "PBSAPIToken=abc:secret" not in html
assert "secret-value" not in html
assert "secret-password" not in html
assert "api_token_secret" not in html
def test_build_template_context_omits_unconfigured_retention_servers() -> None:
report_data = ReportData(pbs_server_names=["PBS01"])
context = build_template_context(report_data)
titles = [section.title for section in context["sections"]]
assert "Retention des sauvegardes VM/CT PBS01 - non renseigne" in titles
assert "Retention des sauvegardes VM/CT PBS02 - non renseigne" not in titles
assert "Retention des sauvegardes VM/CT PBS03 - non renseigne" not in titles
+32
View File
@@ -0,0 +1,32 @@
from pve_backup_report.sanitization import sanitize_message
def test_sanitize_message_masks_api_tokens_and_secrets() -> None:
message = (
"PVEAPIToken=user@pve!report=pve-secret "
"PBSAPIToken=user@pbs!report:pbs-secret "
"PBS01_API_TOKEN_SECRET=secret1 "
"password=secret2 "
"secret=secret3"
)
sanitized = sanitize_message(message)
assert "pve-secret" not in sanitized
assert "pbs-secret" not in sanitized
assert "secret1" not in sanitized
assert "secret2" not in sanitized
assert "secret3" not in sanitized
assert "PVEAPIToken=***" in sanitized
assert "PBSAPIToken=***" in sanitized
assert "PBS01_API_TOKEN_SECRET=***" in sanitized
assert "password=***" in sanitized
assert "secret=***" in sanitized
def test_sanitize_message_flattens_newlines() -> None:
assert sanitize_message("line1\nsecret=value") == "line1 secret=***"
def test_sanitize_message_masks_exact_pbs_api_token_shape() -> None:
assert sanitize_message("error PBSAPIToken=abc:secret") == "error PBSAPIToken=***"