Files
PVE-Backup-Report/tests/test_collectors.py
T
2026-05-13 16:04:17 +02:00

858 lines
26 KiB
Python

from pve_backup_report.collectors import (
collect_pbs_datastore_usages,
collect_pbs_access_users,
collect_pbs_gc_statuses,
collect_pbs_retention_policies,
collect_pbs_snapshot_summaries,
collect_guests,
extract_finished_backup_from_log_line,
extract_task_vmid,
guest_notes,
is_missing_pbs_snapshot_namespace,
normalize_backup_job,
normalize_guest,
normalize_last_backup_result,
normalize_pbs_storage,
normalize_pbs_retention_policy,
normalize_pbs_snapshot,
normalize_pbs_datastore_usage,
normalize_pbs_gc_status,
normalize_pbs_namespace,
normalize_pbs_access_user,
normalize_pool,
pbs_auth_user_id,
pbs_datastore_acl_path,
pbs_client_storages,
pbs_snapshot_scopes,
parse_vzdump_task_log,
merge_pbs_snapshot_summary,
task_duration_seconds,
)
from pve_backup_report.models import PbsStorage
from pve_backup_report.pbs_client import PbsHttpError
from pve_backup_report.pve_client import PveApiError
class FakePveClient:
def __init__(
self,
raw_resources: list[dict] | object,
configs: dict[tuple[str, str, int], dict] | None = None,
config_errors: dict[tuple[str, str, int], PveApiError] | None = None,
) -> None:
self._raw_resources = raw_resources
self._configs = configs or {}
self._config_errors = config_errors or {}
def get_cluster_resources(self) -> list[dict] | object:
return self._raw_resources
def get_qemu_config(self, node: str, vmid: int) -> dict:
return self._guest_config("qemu", node, vmid)
def get_lxc_config(self, node: str, vmid: int) -> dict:
return self._guest_config("lxc", node, vmid)
def _guest_config(self, guest_type: str, node: str, vmid: int) -> dict:
key = (guest_type, node, vmid)
error = self._config_errors.get(key)
if error is not None:
raise error
return self._configs.get(key, {})
class FakePbsClient:
def __init__(
self,
server_name: str,
api_url: str | None = "https://backup.example.invalid:8007",
pbs_hostnames: dict[str, str] | None = None,
raw_policies: list[dict] | None = None,
raw_datastores: list[dict] | None = None,
raw_namespaces: dict[str, list[dict]] | None = None,
raw_snapshots: dict[tuple[str, str], list[dict]] | None = None,
snapshot_errors: dict[tuple[str, str], Exception] | None = None,
raw_statuses: dict[str, dict] | None = None,
raw_gc_statuses: dict[str, dict] | None = None,
raw_users: list[dict] | None = None,
raw_permissions: dict[tuple[str, str], dict] | None = None,
) -> None:
self.server_name = server_name
self.api_url = api_url
self.config = type("FakeConfig", (), {"pbs_hostnames": pbs_hostnames or {}})()
self._raw_policies = raw_policies or []
self._raw_datastores = raw_datastores or []
self._raw_namespaces = raw_namespaces or {}
self._raw_snapshots = raw_snapshots or {}
self._snapshot_errors = snapshot_errors or {}
self._raw_statuses = raw_statuses or {}
self._raw_gc_statuses = raw_gc_statuses or {}
self._raw_users = raw_users or []
self._raw_permissions = raw_permissions or {}
def get_prune_jobs(self) -> list[dict]:
return self._raw_policies
def get_datastores(self) -> list[dict]:
return self._raw_datastores
def get_datastore_namespaces(self, datastore: str) -> list[dict]:
return self._raw_namespaces.get(datastore, [])
def get_datastore_snapshots(self, datastore: str, namespace: str | None) -> list[dict]:
error = self._snapshot_errors.get((datastore, namespace or "/"))
if error is not None:
raise error
return self._raw_snapshots.get((datastore, namespace or "/"), [])
def get_datastore_status(self, datastore: str) -> dict:
return self._raw_statuses[datastore]
def get_datastore_gc_status(self, datastore: str) -> dict:
return self._raw_gc_statuses[datastore]
def get_access_users(self) -> list[dict]:
return self._raw_users
def get_access_permissions(self, auth_id: str, path: str) -> dict:
return self._raw_permissions.get((auth_id, path), {})
def test_normalize_pbs_storage() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"username": "backup@pbs",
"server": "backup.example.invalid",
"datastore": "prod",
"namespace": "pve",
"disable": 0,
}
)
assert storage.storage_id == "backup-storage"
assert storage.username == "backup@pbs"
assert storage.server == "backup.example.invalid"
assert storage.datastore == "prod"
assert storage.namespace == "pve"
assert storage.enabled is True
def test_normalize_pbs_storage_defaults_to_enabled_when_disable_is_absent() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"server": "backup.example.invalid",
"datastore": "prod",
}
)
assert storage.enabled is True
def test_normalize_pbs_storage_marks_disabled() -> None:
storage = normalize_pbs_storage(
{
"storage": "backup-storage",
"type": "pbs",
"server": "backup.example.invalid",
"datastore": "prod",
"disable": 1,
}
)
assert storage.enabled is False
def test_normalize_pbs_access_user() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(
storage_id="backup-storage",
username="backup@pbs!pve",
datastore="RAID5",
namespace="prod",
),
{
"userid": "backup@pbs",
"enable": True,
"expire": "0",
"email": "admin@example.invalid",
"comment": "Compte PVE",
},
{"Datastore.Backup": True, "Datastore.Modify": False},
)
assert user.server_name == "PBS01"
assert user.auth_id == "backup@pbs!pve"
assert user.user_id == "backup@pbs"
assert user.storage_id == "backup-storage"
assert user.enabled is True
assert user.expire == 0
assert user.expire == 0
assert user.email == "admin@example.invalid"
assert user.comment == "Compte PVE"
assert user.permissions == {"Datastore.Backup": True, "Datastore.Modify": False}
def test_normalize_pbs_access_user_defaults_to_enabled_when_enable_is_absent() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(storage_id="backup-storage", username="backup@pbs"),
{"userid": "backup@pbs", "comment": "Compte PVE"},
)
assert user.enabled is True
def test_normalize_pbs_access_user_keeps_unknown_enabled_when_user_is_absent() -> None:
user = normalize_pbs_access_user(
"PBS01",
PbsStorage(storage_id="backup-storage", username="missing@pbs"),
{},
)
assert user.enabled is None
assert user.expire is None
def test_collect_pbs_access_users_matches_pve_storages_and_permissions() -> None:
users = collect_pbs_access_users(
[
FakePbsClient(
"PBS01",
raw_users=[
{
"userid": "backup@pbs",
"enable": True,
"email": "admin@example.invalid",
}
],
raw_permissions={
(
"backup@pbs!pve",
"/datastore/RAID5/prod",
): {
"/datastore/RAID5/prod": {
"Datastore.Backup": True,
"Datastore.Modify": False,
}
}
},
)
],
[
PbsStorage(
storage_id="backup-storage",
username="backup@pbs!pve",
server="PBS01",
datastore="RAID5",
namespace="prod",
)
],
)
assert len(users) == 1
assert users[0].server_name == "PBS01"
assert users[0].auth_id == "backup@pbs!pve"
assert users[0].user_id == "backup@pbs"
assert users[0].email == "admin@example.invalid"
assert users[0].permissions == {
"Datastore.Backup": True,
"Datastore.Modify": False,
}
def test_pbs_auth_user_id_removes_token_suffix() -> None:
assert pbs_auth_user_id("backup@pbs!pve") == "backup@pbs"
assert pbs_auth_user_id("backup@pbs") == "backup@pbs"
def test_pbs_datastore_acl_path() -> None:
assert pbs_datastore_acl_path("RAID5", None) == "/datastore/RAID5"
assert pbs_datastore_acl_path("RAID5", "/") == "/datastore/RAID5"
assert pbs_datastore_acl_path("RAID5", "prod") == "/datastore/RAID5/prod"
def test_normalize_pbs_retention_policy() -> None:
policy = normalize_pbs_retention_policy(
"PBS01",
{
"id": "prune-prod",
"store": "RAID5",
"ns": "serveurs-internes",
"schedule": "daily",
"keep-daily": "14",
"keep-weekly": 8,
"max-depth": 0,
"disable": 0,
},
)
assert policy.policy_id == "prune-prod"
assert policy.server_name == "PBS01"
assert policy.datastore == "RAID5"
assert policy.namespace == "serveurs-internes"
assert policy.keep_daily == 14
assert policy.keep_weekly == 8
assert policy.max_depth == 0
assert policy.enabled is True
def test_collect_pbs_retention_policies_supports_multiple_servers() -> None:
policies = collect_pbs_retention_policies(
[
FakePbsClient(
"PBS02",
raw_policies=[
{
"id": "prune-pbs02",
"store": "PBS2RAID5",
"ns": "serveurs-internes",
"schedule": "daily",
"keep-daily": 7,
}
],
),
FakePbsClient(
"PBS03",
raw_policies=[
{
"id": "prune-pbs03",
"store": "BACKUPSTORAGE",
"ns": "serveurs-internes",
"schedule": "weekly",
"keep-weekly": 4,
}
],
),
]
)
assert [policy.server_name for policy in policies] == ["PBS02", "PBS03"]
assert [policy.policy_id for policy in policies] == ["prune-pbs02", "prune-pbs03"]
def test_normalize_pbs_datastore_usage() -> None:
usage = normalize_pbs_datastore_usage(
"PBS01",
"RAID5",
{"total": "1000", "used": 400, "avail": 600},
)
assert usage.server_name == "PBS01"
assert usage.datastore == "RAID5"
assert usage.total_bytes == 1000
assert usage.used_bytes == 400
assert usage.available_bytes == 600
def test_collect_pbs_datastore_usages() -> None:
usages = collect_pbs_datastore_usages(
[
FakePbsClient(
"PBS01",
raw_datastores=[{"name": "RAID5"}],
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
)
],
[],
)
assert len(usages) == 1
assert usages[0].server_name == "PBS01"
assert usages[0].datastore == "RAID5"
assert usages[0].total_bytes == 1000
assert usages[0].used_bytes == 400
assert usages[0].available_bytes == 600
def test_collect_pbs_datastore_usages_uses_pve_storages_as_fallback() -> None:
usages = collect_pbs_datastore_usages(
[
FakePbsClient(
"PBS01",
raw_datastores=[],
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
)
],
[
PbsStorage(
storage_id="BACKUP-PRODR5",
server="PBS01",
datastore="RAID5",
)
],
)
assert len(usages) == 1
assert usages[0].datastore == "RAID5"
def test_pbs_client_storages_matches_hostname_mapping() -> None:
storages = pbs_client_storages(
FakePbsClient(
"PBS01",
api_url="https://backup.example.invalid:8007",
pbs_hostnames={"192.0.2.10": "backup.example.invalid"},
),
[
PbsStorage(
storage_id="BACKUP-PRODR5",
server="192.0.2.10",
datastore="RAID5",
)
],
)
assert [storage.storage_id for storage in storages] == ["BACKUP-PRODR5"]
def test_collect_pbs_datastore_usages_warns_when_no_datastore() -> None:
issues = []
usages = collect_pbs_datastore_usages(
[FakePbsClient("PBS01", raw_datastores=[])],
[],
issues,
)
assert usages == []
assert issues
assert issues[0].component == "pbs_storage_usage"
def test_normalize_pbs_gc_status_detects_running() -> None:
status = normalize_pbs_gc_status(
"PBS02",
"PBS2RAID5",
{"upid": "UPID:pbs02:gc", "schedule": "*:0/30", "next-run": 1778319000},
)
assert status.server_name == "PBS02"
assert status.datastore == "PBS2RAID5"
assert status.status == "en_cours"
assert status.schedule == "*:0/30"
assert status.next_run is not None
def test_collect_pbs_gc_statuses() -> None:
statuses = collect_pbs_gc_statuses(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_gc_statuses={"PBS2RAID5": {"upid": "UPID:pbs02:gc"}},
)
],
[],
)
assert len(statuses) == 1
assert statuses[0].status == "en_cours"
def test_pbs_snapshot_scopes_adds_all_pve_namespaces_to_api_datastores() -> None:
scopes = pbs_snapshot_scopes(
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": [{"ns": "sync-only"}]},
),
[
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
],
)
assert scopes == [
("PBS2RAID5", "Serveurs-PVELAB"),
("PBS2RAID5", "serveurs-internes"),
("PBS2RAID5", "sync-only"),
]
def test_collect_pbs_snapshot_summaries_reads_pve_namespaces_on_indirect_pbs() -> None:
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
raw_snapshots={
("PBS2RAID5", "serveurs-internes"): [
{
"backup-type": "vm",
"backup-id": "100",
"backup-time": 1775849405,
}
],
("PBS2RAID5", "Serveurs-PVELAB"): [
{
"backup-type": "ct",
"backup-id": "200",
"backup-time": 1775849405,
}
],
},
)
],
[
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
],
)
assert set(summaries) == {
("PBS02", "PBS2RAID5", "serveurs-internes", "qemu", 100),
("PBS02", "PBS2RAID5", "Serveurs-PVELAB", "lxc", 200),
}
def test_collect_pbs_snapshot_summaries_ignores_missing_non_root_namespace() -> None:
issues = []
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
snapshot_errors={
("PBS2RAID5", "absente"): PbsHttpError(
"/admin/datastore/PBS2RAID5/snapshots",
400,
"Bad Request",
)
},
)
],
[PbsStorage(storage_id="absente", namespace="absente")],
issues,
)
assert summaries == {}
assert issues == []
def test_collect_pbs_snapshot_summaries_keeps_root_bad_request_as_warning() -> None:
issues = []
summaries = collect_pbs_snapshot_summaries(
[
FakePbsClient(
"PBS02",
raw_datastores=[{"name": "PBS2RAID5"}],
raw_namespaces={"PBS2RAID5": []},
snapshot_errors={
("PBS2RAID5", "/"): PbsHttpError(
"/admin/datastore/PBS2RAID5/snapshots",
400,
"Bad Request",
)
},
)
],
[PbsStorage(storage_id="root", namespace=None)],
issues,
)
assert summaries == {}
assert len(issues) == 1
assert issues[0].component == "pbs_snapshots"
def test_is_missing_pbs_snapshot_namespace_only_matches_non_root_400() -> None:
assert is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
"absente",
)
assert not is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
"/",
)
assert not is_missing_pbs_snapshot_namespace(
PbsHttpError("/admin/datastore/store/snapshots", 500, "Internal Server Error"),
"absente",
)
def test_normalize_pbs_snapshot() -> None:
summary = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775849405,
"size": 123456789,
},
)
assert summary is not None
assert summary.server_name == "PBS01"
assert summary.vmid == 1110001
assert summary.guest_type == "qemu"
assert summary.snapshot_count == 1
assert summary.newest_backup_size_bytes == 123456789
def test_merge_pbs_snapshot_summary_keeps_newest_size() -> None:
older = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775849405,
"size": 100,
},
)
newer = normalize_pbs_snapshot(
"PBS01",
"RAID5",
"serveurs-internes",
{
"backup-type": "vm",
"backup-id": "1110001",
"backup-time": 1775935805,
"size": 200,
},
)
assert older is not None
assert newer is not None
merged = merge_pbs_snapshot_summary(older, newer)
assert merged.snapshot_count == 2
assert merged.newest_backup_size_bytes == 200
def test_normalize_pbs_namespace_root() -> None:
assert normalize_pbs_namespace("") == "/"
assert normalize_pbs_namespace(None) == "/"
assert normalize_pbs_namespace("progiciels") == "progiciels"
def test_normalize_backup_job() -> None:
job = normalize_backup_job(
{
"id": "backup-prod",
"storage": "backup-storage",
"schedule": "daily 02:00",
"disable": 1,
"mode": "snapshot",
"vmid": "100,101",
"exclude": "101",
}
)
assert job.job_id == "backup-prod"
assert job.storage == "backup-storage"
assert job.schedule == "daily 02:00"
assert job.enabled is False
assert job.mode == "snapshot"
assert job.selection == "vmid=100,101"
assert job.excluded == "101"
def test_normalize_guest() -> None:
guest = normalize_guest(
{
"id": "qemu/100",
"vmid": "100",
"type": "qemu",
"name": "srv-app",
"node": "pve01",
"status": "running",
}
)
assert guest is not None
assert guest.vmid == 100
assert guest.name == "srv-app"
assert guest.guest_type == "qemu"
assert guest.node == "pve01"
assert guest.status == "running"
def test_guest_notes_reads_description_and_strips_empty() -> None:
assert guest_notes({"description": " notes applicatives "}) == "notes applicatives"
assert guest_notes({"description": " "}) is None
def test_collect_guests_enriches_notes_from_vm_and_ct_config() -> None:
guests = collect_guests(
FakePveClient(
[
{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"},
{"id": "lxc/200", "vmid": 200, "type": "lxc", "name": "ct", "node": "pve02"},
],
configs={
("qemu", "pve01", 100): {"description": "note VM"},
("lxc", "pve02", 200): {"description": "note CT"},
},
)
)
assert [guest.notes for guest in guests] == ["note VM", "note CT"]
def test_collect_guests_keeps_guest_when_notes_are_unavailable() -> None:
issues = []
guests = collect_guests(
FakePveClient(
[{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"}],
config_errors={
("qemu", "pve01", 100): PveApiError("config indisponible"),
},
),
issues,
)
assert len(guests) == 1
assert guests[0].notes is None
assert len(issues) == 1
assert issues[0].message == "notes VM/CT indisponibles"
def test_normalize_pool_members() -> None:
pool = normalize_pool(
"prod",
{
"members": [
{"type": "qemu", "vmid": 100},
{"type": "lxc", "id": "lxc/101"},
{"type": "storage", "id": "storage/local"},
]
},
)
assert pool.pool_id == "prod"
assert pool.vmids == {100, 101}
def test_normalize_last_backup_result_success() -> None:
result = normalize_last_backup_result(
{
"type": "vzdump",
"id": "100",
"status": "OK",
"starttime": 1778119140,
"endtime": 1778119200,
"node": "pve01",
}
)
assert result is not None
assert result.vmid == 100
assert result.status == "succes"
assert result.duration_seconds == 60
assert result.node == "pve01"
def test_normalize_last_backup_result_failure() -> None:
result = normalize_last_backup_result(
{
"type": "vzdump",
"id": "100",
"status": "interrupted",
"endtime": 1778119200,
}
)
assert result is not None
assert result.status == "echec"
def test_extract_task_vmid_from_guest_id() -> None:
assert extract_task_vmid({"id": "qemu/100"}) == 100
assert extract_task_vmid({"id": "lxc/101"}) == 101
def test_extract_task_vmid_from_upid() -> None:
assert extract_task_vmid({"upid": "UPID:pve01:123:456:789:vzdump:100:root@pam:"}) == 100
def test_parse_vzdump_task_log_extracts_per_guest_results() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve01"},
[
"INFO: Starting Backup of VM 100 (qemu)",
"INFO: Finished Backup of VM 100 (00:00:10)",
"INFO: Starting Backup of CT 101 (lxc)",
"INFO: Finished Backup of CT 101 (00:00:08)",
],
)
assert {result.vmid for result in results} == {100, 101}
assert {result.status for result in results} == {"succes"}
assert {result.vmid: result.duration_seconds for result in results} == {
100: 10,
101: 8,
}
def test_parse_vzdump_task_log_extracts_failure() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "interrupted", "endtime": 1778119200, "node": "pve01"},
[
"INFO: Starting Backup of VM 100 (qemu)",
"ERROR: Backup of VM 100 failed - command failed",
],
)
assert len(results) == 1
assert results[0].vmid == 100
assert results[0].status == "echec"
def test_parse_vzdump_task_log_extracts_job_vmid_list_and_skips_external() -> None:
results = parse_vzdump_task_log(
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve02"},
[
"INFO: starting new backup job: vzdump 100 101 102 --mode snapshot --storage PBS",
"INFO: skip external VMs: 101",
],
)
assert {result.vmid for result in results} == {100, 102}
assert {result.status for result in results} == {"succes"}
def test_parse_vzdump_task_log_uses_task_duration_for_started_guest_without_finished_line() -> None:
results = parse_vzdump_task_log(
{
"type": "vzdump",
"status": "OK",
"starttime": 1778119140,
"endtime": 1778119200,
"node": "pve01",
},
[
"INFO: starting new backup job: vzdump 100 101 --mode snapshot --storage PBS",
"INFO: skip external VMs: 101",
"INFO: Starting Backup of VM 100 (qemu)",
"INFO: creating Proxmox Backup Server archive 'vm/100/2026-05-07T00:00:00Z'",
],
)
assert len(results) == 1
assert results[0].vmid == 100
assert results[0].duration_seconds == 60
def test_extract_finished_backup_from_log_line_extracts_duration() -> None:
assert extract_finished_backup_from_log_line(
"INFO: Finished Backup of VM 100 (01:02:03)"
) == (100, 3723)
def test_task_duration_seconds_from_task_times() -> None:
assert task_duration_seconds({"starttime": 1778119140, "endtime": 1778119200}) == 60