from pve_backup_report.collectors import ( collect_pbs_datastore_usages, collect_pbs_access_users, collect_pbs_gc_statuses, collect_pbs_retention_policies, collect_pbs_snapshot_summaries, collect_guests, extract_finished_backup_from_log_line, extract_task_vmid, guest_notes, is_missing_pbs_snapshot_namespace, normalize_backup_job, normalize_guest, normalize_last_backup_result, normalize_pbs_storage, normalize_pbs_retention_policy, normalize_pbs_snapshot, normalize_pbs_datastore_usage, normalize_pbs_gc_status, normalize_pbs_namespace, normalize_pbs_access_user, normalize_pool, pbs_auth_user_id, pbs_datastore_acl_path, pbs_client_storages, pbs_snapshot_scopes, parse_vzdump_task_log, merge_pbs_snapshot_summary, task_duration_seconds, ) from pve_backup_report.models import PbsStorage from pve_backup_report.pbs_client import PbsHttpError from pve_backup_report.pve_client import PveApiError class FakePveClient: def __init__( self, raw_resources: list[dict] | object, configs: dict[tuple[str, str, int], dict] | None = None, config_errors: dict[tuple[str, str, int], PveApiError] | None = None, ) -> None: self._raw_resources = raw_resources self._configs = configs or {} self._config_errors = config_errors or {} def get_cluster_resources(self) -> list[dict] | object: return self._raw_resources def get_qemu_config(self, node: str, vmid: int) -> dict: return self._guest_config("qemu", node, vmid) def get_lxc_config(self, node: str, vmid: int) -> dict: return self._guest_config("lxc", node, vmid) def _guest_config(self, guest_type: str, node: str, vmid: int) -> dict: key = (guest_type, node, vmid) error = self._config_errors.get(key) if error is not None: raise error return self._configs.get(key, {}) class FakePbsClient: def __init__( self, server_name: str, api_url: str | None = "https://backup.example.invalid:8007", pbs_hostnames: dict[str, str] | None = None, raw_policies: list[dict] | None = None, raw_datastores: list[dict] | None = None, raw_namespaces: dict[str, list[dict]] | None = None, raw_snapshots: dict[tuple[str, str], list[dict]] | None = None, snapshot_errors: dict[tuple[str, str], Exception] | None = None, raw_statuses: dict[str, dict] | None = None, raw_gc_statuses: dict[str, dict] | None = None, raw_users: list[dict] | None = None, raw_permissions: dict[tuple[str, str], dict] | None = None, ) -> None: self.server_name = server_name self.api_url = api_url self.config = type("FakeConfig", (), {"pbs_hostnames": pbs_hostnames or {}})() self._raw_policies = raw_policies or [] self._raw_datastores = raw_datastores or [] self._raw_namespaces = raw_namespaces or {} self._raw_snapshots = raw_snapshots or {} self._snapshot_errors = snapshot_errors or {} self._raw_statuses = raw_statuses or {} self._raw_gc_statuses = raw_gc_statuses or {} self._raw_users = raw_users or [] self._raw_permissions = raw_permissions or {} def get_prune_jobs(self) -> list[dict]: return self._raw_policies def get_datastores(self) -> list[dict]: return self._raw_datastores def get_datastore_namespaces(self, datastore: str) -> list[dict]: return self._raw_namespaces.get(datastore, []) def get_datastore_snapshots(self, datastore: str, namespace: str | None) -> list[dict]: error = self._snapshot_errors.get((datastore, namespace or "/")) if error is not None: raise error return self._raw_snapshots.get((datastore, namespace or "/"), []) def get_datastore_status(self, datastore: str) -> dict: return self._raw_statuses[datastore] def get_datastore_gc_status(self, datastore: str) -> dict: return self._raw_gc_statuses[datastore] def get_access_users(self) -> list[dict]: return self._raw_users def get_access_permissions(self, auth_id: str, path: str) -> dict: return self._raw_permissions.get((auth_id, path), {}) def test_normalize_pbs_storage() -> None: storage = normalize_pbs_storage( { "storage": "backup-storage", "type": "pbs", "username": "backup@pbs", "server": "backup.example.invalid", "datastore": "prod", "namespace": "pve", "disable": 0, } ) assert storage.storage_id == "backup-storage" assert storage.username == "backup@pbs" assert storage.server == "backup.example.invalid" assert storage.datastore == "prod" assert storage.namespace == "pve" assert storage.enabled is True def test_normalize_pbs_storage_defaults_to_enabled_when_disable_is_absent() -> None: storage = normalize_pbs_storage( { "storage": "backup-storage", "type": "pbs", "server": "backup.example.invalid", "datastore": "prod", } ) assert storage.enabled is True def test_normalize_pbs_storage_marks_disabled() -> None: storage = normalize_pbs_storage( { "storage": "backup-storage", "type": "pbs", "server": "backup.example.invalid", "datastore": "prod", "disable": 1, } ) assert storage.enabled is False def test_normalize_pbs_access_user() -> None: user = normalize_pbs_access_user( "PBS01", PbsStorage( storage_id="backup-storage", username="backup@pbs!pve", datastore="RAID5", namespace="prod", ), { "userid": "backup@pbs", "enable": True, "expire": "0", "email": "admin@example.invalid", "comment": "Compte PVE", }, {"Datastore.Backup": True, "Datastore.Modify": False}, ) assert user.server_name == "PBS01" assert user.auth_id == "backup@pbs!pve" assert user.user_id == "backup@pbs" assert user.storage_id == "backup-storage" assert user.enabled is True assert user.expire == 0 assert user.expire == 0 assert user.email == "admin@example.invalid" assert user.comment == "Compte PVE" assert user.permissions == {"Datastore.Backup": True, "Datastore.Modify": False} def test_normalize_pbs_access_user_defaults_to_enabled_when_enable_is_absent() -> None: user = normalize_pbs_access_user( "PBS01", PbsStorage(storage_id="backup-storage", username="backup@pbs"), {"userid": "backup@pbs", "comment": "Compte PVE"}, ) assert user.enabled is True def test_normalize_pbs_access_user_keeps_unknown_enabled_when_user_is_absent() -> None: user = normalize_pbs_access_user( "PBS01", PbsStorage(storage_id="backup-storage", username="missing@pbs"), {}, ) assert user.enabled is None assert user.expire is None def test_collect_pbs_access_users_matches_pve_storages_and_permissions() -> None: users = collect_pbs_access_users( [ FakePbsClient( "PBS01", raw_users=[ { "userid": "backup@pbs", "enable": True, "email": "admin@example.invalid", } ], raw_permissions={ ( "backup@pbs!pve", "/datastore/RAID5/prod", ): { "/datastore/RAID5/prod": { "Datastore.Backup": True, "Datastore.Modify": False, } } }, ) ], [ PbsStorage( storage_id="backup-storage", username="backup@pbs!pve", server="PBS01", datastore="RAID5", namespace="prod", ) ], ) assert len(users) == 1 assert users[0].server_name == "PBS01" assert users[0].auth_id == "backup@pbs!pve" assert users[0].user_id == "backup@pbs" assert users[0].email == "admin@example.invalid" assert users[0].permissions == { "Datastore.Backup": True, "Datastore.Modify": False, } def test_pbs_auth_user_id_removes_token_suffix() -> None: assert pbs_auth_user_id("backup@pbs!pve") == "backup@pbs" assert pbs_auth_user_id("backup@pbs") == "backup@pbs" def test_pbs_datastore_acl_path() -> None: assert pbs_datastore_acl_path("RAID5", None) == "/datastore/RAID5" assert pbs_datastore_acl_path("RAID5", "/") == "/datastore/RAID5" assert pbs_datastore_acl_path("RAID5", "prod") == "/datastore/RAID5/prod" def test_normalize_pbs_retention_policy() -> None: policy = normalize_pbs_retention_policy( "PBS01", { "id": "prune-prod", "store": "RAID5", "ns": "serveurs-internes", "schedule": "daily", "keep-daily": "14", "keep-weekly": 8, "max-depth": 0, "disable": 0, }, ) assert policy.policy_id == "prune-prod" assert policy.server_name == "PBS01" assert policy.datastore == "RAID5" assert policy.namespace == "serveurs-internes" assert policy.keep_daily == 14 assert policy.keep_weekly == 8 assert policy.max_depth == 0 assert policy.enabled is True def test_collect_pbs_retention_policies_supports_multiple_servers() -> None: policies = collect_pbs_retention_policies( [ FakePbsClient( "PBS02", raw_policies=[ { "id": "prune-pbs02", "store": "PBS2RAID5", "ns": "serveurs-internes", "schedule": "daily", "keep-daily": 7, } ], ), FakePbsClient( "PBS03", raw_policies=[ { "id": "prune-pbs03", "store": "BACKUPSTORAGE", "ns": "serveurs-internes", "schedule": "weekly", "keep-weekly": 4, } ], ), ] ) assert [policy.server_name for policy in policies] == ["PBS02", "PBS03"] assert [policy.policy_id for policy in policies] == ["prune-pbs02", "prune-pbs03"] def test_normalize_pbs_datastore_usage() -> None: usage = normalize_pbs_datastore_usage( "PBS01", "RAID5", {"total": "1000", "used": 400, "avail": 600}, ) assert usage.server_name == "PBS01" assert usage.datastore == "RAID5" assert usage.total_bytes == 1000 assert usage.used_bytes == 400 assert usage.available_bytes == 600 def test_collect_pbs_datastore_usages() -> None: usages = collect_pbs_datastore_usages( [ FakePbsClient( "PBS01", raw_datastores=[{"name": "RAID5"}], raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}}, ) ], [], ) assert len(usages) == 1 assert usages[0].server_name == "PBS01" assert usages[0].datastore == "RAID5" assert usages[0].total_bytes == 1000 assert usages[0].used_bytes == 400 assert usages[0].available_bytes == 600 def test_collect_pbs_datastore_usages_uses_pve_storages_as_fallback() -> None: usages = collect_pbs_datastore_usages( [ FakePbsClient( "PBS01", raw_datastores=[], raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}}, ) ], [ PbsStorage( storage_id="BACKUP-PRODR5", server="PBS01", datastore="RAID5", ) ], ) assert len(usages) == 1 assert usages[0].datastore == "RAID5" def test_pbs_client_storages_matches_hostname_mapping() -> None: storages = pbs_client_storages( FakePbsClient( "PBS01", api_url="https://backup.example.invalid:8007", pbs_hostnames={"192.0.2.10": "backup.example.invalid"}, ), [ PbsStorage( storage_id="BACKUP-PRODR5", server="192.0.2.10", datastore="RAID5", ) ], ) assert [storage.storage_id for storage in storages] == ["BACKUP-PRODR5"] def test_collect_pbs_datastore_usages_warns_when_no_datastore() -> None: issues = [] usages = collect_pbs_datastore_usages( [FakePbsClient("PBS01", raw_datastores=[])], [], issues, ) assert usages == [] assert issues assert issues[0].component == "pbs_storage_usage" def test_normalize_pbs_gc_status_detects_running() -> None: status = normalize_pbs_gc_status( "PBS02", "PBS2RAID5", {"upid": "UPID:pbs02:gc", "schedule": "*:0/30", "next-run": 1778319000}, ) assert status.server_name == "PBS02" assert status.datastore == "PBS2RAID5" assert status.status == "en_cours" assert status.schedule == "*:0/30" assert status.next_run is not None def test_collect_pbs_gc_statuses() -> None: statuses = collect_pbs_gc_statuses( [ FakePbsClient( "PBS02", raw_datastores=[{"name": "PBS2RAID5"}], raw_gc_statuses={"PBS2RAID5": {"upid": "UPID:pbs02:gc"}}, ) ], [], ) assert len(statuses) == 1 assert statuses[0].status == "en_cours" def test_pbs_snapshot_scopes_adds_all_pve_namespaces_to_api_datastores() -> None: scopes = pbs_snapshot_scopes( FakePbsClient( "PBS02", raw_datastores=[{"name": "PBS2RAID5"}], raw_namespaces={"PBS2RAID5": [{"ns": "sync-only"}]}, ), [ PbsStorage(storage_id="prod", namespace="serveurs-internes"), PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"), ], ) assert scopes == [ ("PBS2RAID5", "Serveurs-PVELAB"), ("PBS2RAID5", "serveurs-internes"), ("PBS2RAID5", "sync-only"), ] def test_collect_pbs_snapshot_summaries_reads_pve_namespaces_on_indirect_pbs() -> None: summaries = collect_pbs_snapshot_summaries( [ FakePbsClient( "PBS02", raw_datastores=[{"name": "PBS2RAID5"}], raw_namespaces={"PBS2RAID5": []}, raw_snapshots={ ("PBS2RAID5", "serveurs-internes"): [ { "backup-type": "vm", "backup-id": "100", "backup-time": 1775849405, } ], ("PBS2RAID5", "Serveurs-PVELAB"): [ { "backup-type": "ct", "backup-id": "200", "backup-time": 1775849405, } ], }, ) ], [ PbsStorage(storage_id="prod", namespace="serveurs-internes"), PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"), ], ) assert set(summaries) == { ("PBS02", "PBS2RAID5", "serveurs-internes", "qemu", 100), ("PBS02", "PBS2RAID5", "Serveurs-PVELAB", "lxc", 200), } def test_collect_pbs_snapshot_summaries_ignores_missing_non_root_namespace() -> None: issues = [] summaries = collect_pbs_snapshot_summaries( [ FakePbsClient( "PBS02", raw_datastores=[{"name": "PBS2RAID5"}], raw_namespaces={"PBS2RAID5": []}, snapshot_errors={ ("PBS2RAID5", "absente"): PbsHttpError( "/admin/datastore/PBS2RAID5/snapshots", 400, "Bad Request", ) }, ) ], [PbsStorage(storage_id="absente", namespace="absente")], issues, ) assert summaries == {} assert issues == [] def test_collect_pbs_snapshot_summaries_keeps_root_bad_request_as_warning() -> None: issues = [] summaries = collect_pbs_snapshot_summaries( [ FakePbsClient( "PBS02", raw_datastores=[{"name": "PBS2RAID5"}], raw_namespaces={"PBS2RAID5": []}, snapshot_errors={ ("PBS2RAID5", "/"): PbsHttpError( "/admin/datastore/PBS2RAID5/snapshots", 400, "Bad Request", ) }, ) ], [PbsStorage(storage_id="root", namespace=None)], issues, ) assert summaries == {} assert len(issues) == 1 assert issues[0].component == "pbs_snapshots" def test_is_missing_pbs_snapshot_namespace_only_matches_non_root_400() -> None: assert is_missing_pbs_snapshot_namespace( PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"), "absente", ) assert not is_missing_pbs_snapshot_namespace( PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"), "/", ) assert not is_missing_pbs_snapshot_namespace( PbsHttpError("/admin/datastore/store/snapshots", 500, "Internal Server Error"), "absente", ) def test_normalize_pbs_snapshot() -> None: summary = normalize_pbs_snapshot( "PBS01", "RAID5", "serveurs-internes", { "backup-type": "vm", "backup-id": "1110001", "backup-time": 1775849405, "size": 123456789, }, ) assert summary is not None assert summary.server_name == "PBS01" assert summary.vmid == 1110001 assert summary.guest_type == "qemu" assert summary.snapshot_count == 1 assert summary.newest_backup_size_bytes == 123456789 def test_merge_pbs_snapshot_summary_keeps_newest_size() -> None: older = normalize_pbs_snapshot( "PBS01", "RAID5", "serveurs-internes", { "backup-type": "vm", "backup-id": "1110001", "backup-time": 1775849405, "size": 100, }, ) newer = normalize_pbs_snapshot( "PBS01", "RAID5", "serveurs-internes", { "backup-type": "vm", "backup-id": "1110001", "backup-time": 1775935805, "size": 200, }, ) assert older is not None assert newer is not None merged = merge_pbs_snapshot_summary(older, newer) assert merged.snapshot_count == 2 assert merged.newest_backup_size_bytes == 200 def test_normalize_pbs_namespace_root() -> None: assert normalize_pbs_namespace("") == "/" assert normalize_pbs_namespace(None) == "/" assert normalize_pbs_namespace("progiciels") == "progiciels" def test_normalize_backup_job() -> None: job = normalize_backup_job( { "id": "backup-prod", "storage": "backup-storage", "schedule": "daily 02:00", "disable": 1, "mode": "snapshot", "vmid": "100,101", "exclude": "101", } ) assert job.job_id == "backup-prod" assert job.storage == "backup-storage" assert job.schedule == "daily 02:00" assert job.enabled is False assert job.mode == "snapshot" assert job.selection == "vmid=100,101" assert job.excluded == "101" def test_normalize_guest() -> None: guest = normalize_guest( { "id": "qemu/100", "vmid": "100", "type": "qemu", "name": "srv-app", "node": "pve01", "status": "running", } ) assert guest is not None assert guest.vmid == 100 assert guest.name == "srv-app" assert guest.guest_type == "qemu" assert guest.node == "pve01" assert guest.status == "running" def test_guest_notes_reads_description_and_strips_empty() -> None: assert guest_notes({"description": " notes applicatives "}) == "notes applicatives" assert guest_notes({"description": " "}) is None def test_collect_guests_enriches_notes_from_vm_and_ct_config() -> None: guests = collect_guests( FakePveClient( [ {"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"}, {"id": "lxc/200", "vmid": 200, "type": "lxc", "name": "ct", "node": "pve02"}, ], configs={ ("qemu", "pve01", 100): {"description": "note VM"}, ("lxc", "pve02", 200): {"description": "note CT"}, }, ) ) assert [guest.notes for guest in guests] == ["note VM", "note CT"] def test_collect_guests_keeps_guest_when_notes_are_unavailable() -> None: issues = [] guests = collect_guests( FakePveClient( [{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"}], config_errors={ ("qemu", "pve01", 100): PveApiError("config indisponible"), }, ), issues, ) assert len(guests) == 1 assert guests[0].notes is None assert len(issues) == 1 assert issues[0].message == "notes VM/CT indisponibles" def test_normalize_pool_members() -> None: pool = normalize_pool( "prod", { "members": [ {"type": "qemu", "vmid": 100}, {"type": "lxc", "id": "lxc/101"}, {"type": "storage", "id": "storage/local"}, ] }, ) assert pool.pool_id == "prod" assert pool.vmids == {100, 101} def test_normalize_last_backup_result_success() -> None: result = normalize_last_backup_result( { "type": "vzdump", "id": "100", "status": "OK", "starttime": 1778119140, "endtime": 1778119200, "node": "pve01", } ) assert result is not None assert result.vmid == 100 assert result.status == "succes" assert result.duration_seconds == 60 assert result.node == "pve01" def test_normalize_last_backup_result_failure() -> None: result = normalize_last_backup_result( { "type": "vzdump", "id": "100", "status": "interrupted", "endtime": 1778119200, } ) assert result is not None assert result.status == "echec" def test_extract_task_vmid_from_guest_id() -> None: assert extract_task_vmid({"id": "qemu/100"}) == 100 assert extract_task_vmid({"id": "lxc/101"}) == 101 def test_extract_task_vmid_from_upid() -> None: assert extract_task_vmid({"upid": "UPID:pve01:123:456:789:vzdump:100:root@pam:"}) == 100 def test_parse_vzdump_task_log_extracts_per_guest_results() -> None: results = parse_vzdump_task_log( {"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve01"}, [ "INFO: Starting Backup of VM 100 (qemu)", "INFO: Finished Backup of VM 100 (00:00:10)", "INFO: Starting Backup of CT 101 (lxc)", "INFO: Finished Backup of CT 101 (00:00:08)", ], ) assert {result.vmid for result in results} == {100, 101} assert {result.status for result in results} == {"succes"} assert {result.vmid: result.duration_seconds for result in results} == { 100: 10, 101: 8, } def test_parse_vzdump_task_log_extracts_failure() -> None: results = parse_vzdump_task_log( {"type": "vzdump", "status": "interrupted", "endtime": 1778119200, "node": "pve01"}, [ "INFO: Starting Backup of VM 100 (qemu)", "ERROR: Backup of VM 100 failed - command failed", ], ) assert len(results) == 1 assert results[0].vmid == 100 assert results[0].status == "echec" def test_parse_vzdump_task_log_extracts_job_vmid_list_and_skips_external() -> None: results = parse_vzdump_task_log( {"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve02"}, [ "INFO: starting new backup job: vzdump 100 101 102 --mode snapshot --storage PBS", "INFO: skip external VMs: 101", ], ) assert {result.vmid for result in results} == {100, 102} assert {result.status for result in results} == {"succes"} def test_parse_vzdump_task_log_uses_task_duration_for_started_guest_without_finished_line() -> None: results = parse_vzdump_task_log( { "type": "vzdump", "status": "OK", "starttime": 1778119140, "endtime": 1778119200, "node": "pve01", }, [ "INFO: starting new backup job: vzdump 100 101 --mode snapshot --storage PBS", "INFO: skip external VMs: 101", "INFO: Starting Backup of VM 100 (qemu)", "INFO: creating Proxmox Backup Server archive 'vm/100/2026-05-07T00:00:00Z'", ], ) assert len(results) == 1 assert results[0].vmid == 100 assert results[0].duration_seconds == 60 def test_extract_finished_backup_from_log_line_extracts_duration() -> None: assert extract_finished_backup_from_log_line( "INFO: Finished Backup of VM 100 (01:02:03)" ) == (100, 3723) def test_task_duration_seconds_from_task_times() -> None: assert task_duration_seconds({"starttime": 1778119140, "endtime": 1778119200}) == 60