858 lines
26 KiB
Python
858 lines
26 KiB
Python
from pve_backup_report.collectors import (
|
|
collect_pbs_datastore_usages,
|
|
collect_pbs_access_users,
|
|
collect_pbs_gc_statuses,
|
|
collect_pbs_retention_policies,
|
|
collect_pbs_snapshot_summaries,
|
|
collect_guests,
|
|
extract_finished_backup_from_log_line,
|
|
extract_task_vmid,
|
|
guest_notes,
|
|
is_missing_pbs_snapshot_namespace,
|
|
normalize_backup_job,
|
|
normalize_guest,
|
|
normalize_last_backup_result,
|
|
normalize_pbs_storage,
|
|
normalize_pbs_retention_policy,
|
|
normalize_pbs_snapshot,
|
|
normalize_pbs_datastore_usage,
|
|
normalize_pbs_gc_status,
|
|
normalize_pbs_namespace,
|
|
normalize_pbs_access_user,
|
|
normalize_pool,
|
|
pbs_auth_user_id,
|
|
pbs_datastore_acl_path,
|
|
pbs_client_storages,
|
|
pbs_snapshot_scopes,
|
|
parse_vzdump_task_log,
|
|
merge_pbs_snapshot_summary,
|
|
task_duration_seconds,
|
|
)
|
|
from pve_backup_report.models import PbsStorage
|
|
from pve_backup_report.pbs_client import PbsHttpError
|
|
from pve_backup_report.pve_client import PveApiError
|
|
|
|
|
|
class FakePveClient:
|
|
def __init__(
|
|
self,
|
|
raw_resources: list[dict] | object,
|
|
configs: dict[tuple[str, str, int], dict] | None = None,
|
|
config_errors: dict[tuple[str, str, int], PveApiError] | None = None,
|
|
) -> None:
|
|
self._raw_resources = raw_resources
|
|
self._configs = configs or {}
|
|
self._config_errors = config_errors or {}
|
|
|
|
def get_cluster_resources(self) -> list[dict] | object:
|
|
return self._raw_resources
|
|
|
|
def get_qemu_config(self, node: str, vmid: int) -> dict:
|
|
return self._guest_config("qemu", node, vmid)
|
|
|
|
def get_lxc_config(self, node: str, vmid: int) -> dict:
|
|
return self._guest_config("lxc", node, vmid)
|
|
|
|
def _guest_config(self, guest_type: str, node: str, vmid: int) -> dict:
|
|
key = (guest_type, node, vmid)
|
|
error = self._config_errors.get(key)
|
|
if error is not None:
|
|
raise error
|
|
return self._configs.get(key, {})
|
|
|
|
|
|
class FakePbsClient:
|
|
def __init__(
|
|
self,
|
|
server_name: str,
|
|
api_url: str | None = "https://backup.example.invalid:8007",
|
|
pbs_hostnames: dict[str, str] | None = None,
|
|
raw_policies: list[dict] | None = None,
|
|
raw_datastores: list[dict] | None = None,
|
|
raw_namespaces: dict[str, list[dict]] | None = None,
|
|
raw_snapshots: dict[tuple[str, str], list[dict]] | None = None,
|
|
snapshot_errors: dict[tuple[str, str], Exception] | None = None,
|
|
raw_statuses: dict[str, dict] | None = None,
|
|
raw_gc_statuses: dict[str, dict] | None = None,
|
|
raw_users: list[dict] | None = None,
|
|
raw_permissions: dict[tuple[str, str], dict] | None = None,
|
|
) -> None:
|
|
self.server_name = server_name
|
|
self.api_url = api_url
|
|
self.config = type("FakeConfig", (), {"pbs_hostnames": pbs_hostnames or {}})()
|
|
self._raw_policies = raw_policies or []
|
|
self._raw_datastores = raw_datastores or []
|
|
self._raw_namespaces = raw_namespaces or {}
|
|
self._raw_snapshots = raw_snapshots or {}
|
|
self._snapshot_errors = snapshot_errors or {}
|
|
self._raw_statuses = raw_statuses or {}
|
|
self._raw_gc_statuses = raw_gc_statuses or {}
|
|
self._raw_users = raw_users or []
|
|
self._raw_permissions = raw_permissions or {}
|
|
|
|
def get_prune_jobs(self) -> list[dict]:
|
|
return self._raw_policies
|
|
|
|
def get_datastores(self) -> list[dict]:
|
|
return self._raw_datastores
|
|
|
|
def get_datastore_namespaces(self, datastore: str) -> list[dict]:
|
|
return self._raw_namespaces.get(datastore, [])
|
|
|
|
def get_datastore_snapshots(self, datastore: str, namespace: str | None) -> list[dict]:
|
|
error = self._snapshot_errors.get((datastore, namespace or "/"))
|
|
if error is not None:
|
|
raise error
|
|
return self._raw_snapshots.get((datastore, namespace or "/"), [])
|
|
|
|
def get_datastore_status(self, datastore: str) -> dict:
|
|
return self._raw_statuses[datastore]
|
|
|
|
def get_datastore_gc_status(self, datastore: str) -> dict:
|
|
return self._raw_gc_statuses[datastore]
|
|
|
|
def get_access_users(self) -> list[dict]:
|
|
return self._raw_users
|
|
|
|
def get_access_permissions(self, auth_id: str, path: str) -> dict:
|
|
return self._raw_permissions.get((auth_id, path), {})
|
|
|
|
|
|
def test_normalize_pbs_storage() -> None:
|
|
storage = normalize_pbs_storage(
|
|
{
|
|
"storage": "backup-storage",
|
|
"type": "pbs",
|
|
"username": "backup@pbs",
|
|
"server": "backup.example.invalid",
|
|
"datastore": "prod",
|
|
"namespace": "pve",
|
|
"disable": 0,
|
|
}
|
|
)
|
|
|
|
assert storage.storage_id == "backup-storage"
|
|
assert storage.username == "backup@pbs"
|
|
assert storage.server == "backup.example.invalid"
|
|
assert storage.datastore == "prod"
|
|
assert storage.namespace == "pve"
|
|
assert storage.enabled is True
|
|
|
|
|
|
def test_normalize_pbs_storage_defaults_to_enabled_when_disable_is_absent() -> None:
|
|
storage = normalize_pbs_storage(
|
|
{
|
|
"storage": "backup-storage",
|
|
"type": "pbs",
|
|
"server": "backup.example.invalid",
|
|
"datastore": "prod",
|
|
}
|
|
)
|
|
|
|
assert storage.enabled is True
|
|
|
|
|
|
def test_normalize_pbs_storage_marks_disabled() -> None:
|
|
storage = normalize_pbs_storage(
|
|
{
|
|
"storage": "backup-storage",
|
|
"type": "pbs",
|
|
"server": "backup.example.invalid",
|
|
"datastore": "prod",
|
|
"disable": 1,
|
|
}
|
|
)
|
|
|
|
assert storage.enabled is False
|
|
|
|
|
|
def test_normalize_pbs_access_user() -> None:
|
|
user = normalize_pbs_access_user(
|
|
"PBS01",
|
|
PbsStorage(
|
|
storage_id="backup-storage",
|
|
username="backup@pbs!pve",
|
|
datastore="RAID5",
|
|
namespace="prod",
|
|
),
|
|
{
|
|
"userid": "backup@pbs",
|
|
"enable": True,
|
|
"expire": "0",
|
|
"email": "admin@example.invalid",
|
|
"comment": "Compte PVE",
|
|
},
|
|
{"Datastore.Backup": True, "Datastore.Modify": False},
|
|
)
|
|
|
|
assert user.server_name == "PBS01"
|
|
assert user.auth_id == "backup@pbs!pve"
|
|
assert user.user_id == "backup@pbs"
|
|
assert user.storage_id == "backup-storage"
|
|
assert user.enabled is True
|
|
assert user.expire == 0
|
|
assert user.expire == 0
|
|
assert user.email == "admin@example.invalid"
|
|
assert user.comment == "Compte PVE"
|
|
assert user.permissions == {"Datastore.Backup": True, "Datastore.Modify": False}
|
|
|
|
|
|
def test_normalize_pbs_access_user_defaults_to_enabled_when_enable_is_absent() -> None:
|
|
user = normalize_pbs_access_user(
|
|
"PBS01",
|
|
PbsStorage(storage_id="backup-storage", username="backup@pbs"),
|
|
{"userid": "backup@pbs", "comment": "Compte PVE"},
|
|
)
|
|
|
|
assert user.enabled is True
|
|
|
|
|
|
def test_normalize_pbs_access_user_keeps_unknown_enabled_when_user_is_absent() -> None:
|
|
user = normalize_pbs_access_user(
|
|
"PBS01",
|
|
PbsStorage(storage_id="backup-storage", username="missing@pbs"),
|
|
{},
|
|
)
|
|
|
|
assert user.enabled is None
|
|
assert user.expire is None
|
|
|
|
|
|
def test_collect_pbs_access_users_matches_pve_storages_and_permissions() -> None:
|
|
users = collect_pbs_access_users(
|
|
[
|
|
FakePbsClient(
|
|
"PBS01",
|
|
raw_users=[
|
|
{
|
|
"userid": "backup@pbs",
|
|
"enable": True,
|
|
"email": "admin@example.invalid",
|
|
}
|
|
],
|
|
raw_permissions={
|
|
(
|
|
"backup@pbs!pve",
|
|
"/datastore/RAID5/prod",
|
|
): {
|
|
"/datastore/RAID5/prod": {
|
|
"Datastore.Backup": True,
|
|
"Datastore.Modify": False,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
],
|
|
[
|
|
PbsStorage(
|
|
storage_id="backup-storage",
|
|
username="backup@pbs!pve",
|
|
server="PBS01",
|
|
datastore="RAID5",
|
|
namespace="prod",
|
|
)
|
|
],
|
|
)
|
|
|
|
assert len(users) == 1
|
|
assert users[0].server_name == "PBS01"
|
|
assert users[0].auth_id == "backup@pbs!pve"
|
|
assert users[0].user_id == "backup@pbs"
|
|
assert users[0].email == "admin@example.invalid"
|
|
assert users[0].permissions == {
|
|
"Datastore.Backup": True,
|
|
"Datastore.Modify": False,
|
|
}
|
|
|
|
|
|
def test_pbs_auth_user_id_removes_token_suffix() -> None:
|
|
assert pbs_auth_user_id("backup@pbs!pve") == "backup@pbs"
|
|
assert pbs_auth_user_id("backup@pbs") == "backup@pbs"
|
|
|
|
|
|
def test_pbs_datastore_acl_path() -> None:
|
|
assert pbs_datastore_acl_path("RAID5", None) == "/datastore/RAID5"
|
|
assert pbs_datastore_acl_path("RAID5", "/") == "/datastore/RAID5"
|
|
assert pbs_datastore_acl_path("RAID5", "prod") == "/datastore/RAID5/prod"
|
|
|
|
|
|
def test_normalize_pbs_retention_policy() -> None:
|
|
policy = normalize_pbs_retention_policy(
|
|
"PBS01",
|
|
{
|
|
"id": "prune-prod",
|
|
"store": "RAID5",
|
|
"ns": "serveurs-internes",
|
|
"schedule": "daily",
|
|
"keep-daily": "14",
|
|
"keep-weekly": 8,
|
|
"max-depth": 0,
|
|
"disable": 0,
|
|
},
|
|
)
|
|
|
|
assert policy.policy_id == "prune-prod"
|
|
assert policy.server_name == "PBS01"
|
|
assert policy.datastore == "RAID5"
|
|
assert policy.namespace == "serveurs-internes"
|
|
assert policy.keep_daily == 14
|
|
assert policy.keep_weekly == 8
|
|
assert policy.max_depth == 0
|
|
assert policy.enabled is True
|
|
|
|
|
|
def test_collect_pbs_retention_policies_supports_multiple_servers() -> None:
|
|
policies = collect_pbs_retention_policies(
|
|
[
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_policies=[
|
|
{
|
|
"id": "prune-pbs02",
|
|
"store": "PBS2RAID5",
|
|
"ns": "serveurs-internes",
|
|
"schedule": "daily",
|
|
"keep-daily": 7,
|
|
}
|
|
],
|
|
),
|
|
FakePbsClient(
|
|
"PBS03",
|
|
raw_policies=[
|
|
{
|
|
"id": "prune-pbs03",
|
|
"store": "BACKUPSTORAGE",
|
|
"ns": "serveurs-internes",
|
|
"schedule": "weekly",
|
|
"keep-weekly": 4,
|
|
}
|
|
],
|
|
),
|
|
]
|
|
)
|
|
|
|
assert [policy.server_name for policy in policies] == ["PBS02", "PBS03"]
|
|
assert [policy.policy_id for policy in policies] == ["prune-pbs02", "prune-pbs03"]
|
|
|
|
|
|
def test_normalize_pbs_datastore_usage() -> None:
|
|
usage = normalize_pbs_datastore_usage(
|
|
"PBS01",
|
|
"RAID5",
|
|
{"total": "1000", "used": 400, "avail": 600},
|
|
)
|
|
|
|
assert usage.server_name == "PBS01"
|
|
assert usage.datastore == "RAID5"
|
|
assert usage.total_bytes == 1000
|
|
assert usage.used_bytes == 400
|
|
assert usage.available_bytes == 600
|
|
|
|
|
|
def test_collect_pbs_datastore_usages() -> None:
|
|
usages = collect_pbs_datastore_usages(
|
|
[
|
|
FakePbsClient(
|
|
"PBS01",
|
|
raw_datastores=[{"name": "RAID5"}],
|
|
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
|
|
)
|
|
],
|
|
[],
|
|
)
|
|
|
|
assert len(usages) == 1
|
|
assert usages[0].server_name == "PBS01"
|
|
assert usages[0].datastore == "RAID5"
|
|
assert usages[0].total_bytes == 1000
|
|
assert usages[0].used_bytes == 400
|
|
assert usages[0].available_bytes == 600
|
|
|
|
|
|
def test_collect_pbs_datastore_usages_uses_pve_storages_as_fallback() -> None:
|
|
usages = collect_pbs_datastore_usages(
|
|
[
|
|
FakePbsClient(
|
|
"PBS01",
|
|
raw_datastores=[],
|
|
raw_statuses={"RAID5": {"total": 1000, "used": 400, "avail": 600}},
|
|
)
|
|
],
|
|
[
|
|
PbsStorage(
|
|
storage_id="BACKUP-PRODR5",
|
|
server="PBS01",
|
|
datastore="RAID5",
|
|
)
|
|
],
|
|
)
|
|
|
|
assert len(usages) == 1
|
|
assert usages[0].datastore == "RAID5"
|
|
|
|
|
|
def test_pbs_client_storages_matches_hostname_mapping() -> None:
|
|
storages = pbs_client_storages(
|
|
FakePbsClient(
|
|
"PBS01",
|
|
api_url="https://backup.example.invalid:8007",
|
|
pbs_hostnames={"192.0.2.10": "backup.example.invalid"},
|
|
),
|
|
[
|
|
PbsStorage(
|
|
storage_id="BACKUP-PRODR5",
|
|
server="192.0.2.10",
|
|
datastore="RAID5",
|
|
)
|
|
],
|
|
)
|
|
|
|
assert [storage.storage_id for storage in storages] == ["BACKUP-PRODR5"]
|
|
|
|
|
|
def test_collect_pbs_datastore_usages_warns_when_no_datastore() -> None:
|
|
issues = []
|
|
|
|
usages = collect_pbs_datastore_usages(
|
|
[FakePbsClient("PBS01", raw_datastores=[])],
|
|
[],
|
|
issues,
|
|
)
|
|
|
|
assert usages == []
|
|
assert issues
|
|
assert issues[0].component == "pbs_storage_usage"
|
|
|
|
|
|
def test_normalize_pbs_gc_status_detects_running() -> None:
|
|
status = normalize_pbs_gc_status(
|
|
"PBS02",
|
|
"PBS2RAID5",
|
|
{"upid": "UPID:pbs02:gc", "schedule": "*:0/30", "next-run": 1778319000},
|
|
)
|
|
|
|
assert status.server_name == "PBS02"
|
|
assert status.datastore == "PBS2RAID5"
|
|
assert status.status == "en_cours"
|
|
assert status.schedule == "*:0/30"
|
|
assert status.next_run is not None
|
|
|
|
|
|
def test_collect_pbs_gc_statuses() -> None:
|
|
statuses = collect_pbs_gc_statuses(
|
|
[
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_datastores=[{"name": "PBS2RAID5"}],
|
|
raw_gc_statuses={"PBS2RAID5": {"upid": "UPID:pbs02:gc"}},
|
|
)
|
|
],
|
|
[],
|
|
)
|
|
|
|
assert len(statuses) == 1
|
|
assert statuses[0].status == "en_cours"
|
|
|
|
|
|
def test_pbs_snapshot_scopes_adds_all_pve_namespaces_to_api_datastores() -> None:
|
|
scopes = pbs_snapshot_scopes(
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_datastores=[{"name": "PBS2RAID5"}],
|
|
raw_namespaces={"PBS2RAID5": [{"ns": "sync-only"}]},
|
|
),
|
|
[
|
|
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
|
|
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
|
|
],
|
|
)
|
|
|
|
assert scopes == [
|
|
("PBS2RAID5", "Serveurs-PVELAB"),
|
|
("PBS2RAID5", "serveurs-internes"),
|
|
("PBS2RAID5", "sync-only"),
|
|
]
|
|
|
|
|
|
def test_collect_pbs_snapshot_summaries_reads_pve_namespaces_on_indirect_pbs() -> None:
|
|
summaries = collect_pbs_snapshot_summaries(
|
|
[
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_datastores=[{"name": "PBS2RAID5"}],
|
|
raw_namespaces={"PBS2RAID5": []},
|
|
raw_snapshots={
|
|
("PBS2RAID5", "serveurs-internes"): [
|
|
{
|
|
"backup-type": "vm",
|
|
"backup-id": "100",
|
|
"backup-time": 1775849405,
|
|
}
|
|
],
|
|
("PBS2RAID5", "Serveurs-PVELAB"): [
|
|
{
|
|
"backup-type": "ct",
|
|
"backup-id": "200",
|
|
"backup-time": 1775849405,
|
|
}
|
|
],
|
|
},
|
|
)
|
|
],
|
|
[
|
|
PbsStorage(storage_id="prod", namespace="serveurs-internes"),
|
|
PbsStorage(storage_id="lab", namespace="Serveurs-PVELAB"),
|
|
],
|
|
)
|
|
|
|
assert set(summaries) == {
|
|
("PBS02", "PBS2RAID5", "serveurs-internes", "qemu", 100),
|
|
("PBS02", "PBS2RAID5", "Serveurs-PVELAB", "lxc", 200),
|
|
}
|
|
|
|
|
|
def test_collect_pbs_snapshot_summaries_ignores_missing_non_root_namespace() -> None:
|
|
issues = []
|
|
|
|
summaries = collect_pbs_snapshot_summaries(
|
|
[
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_datastores=[{"name": "PBS2RAID5"}],
|
|
raw_namespaces={"PBS2RAID5": []},
|
|
snapshot_errors={
|
|
("PBS2RAID5", "absente"): PbsHttpError(
|
|
"/admin/datastore/PBS2RAID5/snapshots",
|
|
400,
|
|
"Bad Request",
|
|
)
|
|
},
|
|
)
|
|
],
|
|
[PbsStorage(storage_id="absente", namespace="absente")],
|
|
issues,
|
|
)
|
|
|
|
assert summaries == {}
|
|
assert issues == []
|
|
|
|
|
|
def test_collect_pbs_snapshot_summaries_keeps_root_bad_request_as_warning() -> None:
|
|
issues = []
|
|
|
|
summaries = collect_pbs_snapshot_summaries(
|
|
[
|
|
FakePbsClient(
|
|
"PBS02",
|
|
raw_datastores=[{"name": "PBS2RAID5"}],
|
|
raw_namespaces={"PBS2RAID5": []},
|
|
snapshot_errors={
|
|
("PBS2RAID5", "/"): PbsHttpError(
|
|
"/admin/datastore/PBS2RAID5/snapshots",
|
|
400,
|
|
"Bad Request",
|
|
)
|
|
},
|
|
)
|
|
],
|
|
[PbsStorage(storage_id="root", namespace=None)],
|
|
issues,
|
|
)
|
|
|
|
assert summaries == {}
|
|
assert len(issues) == 1
|
|
assert issues[0].component == "pbs_snapshots"
|
|
|
|
|
|
def test_is_missing_pbs_snapshot_namespace_only_matches_non_root_400() -> None:
|
|
assert is_missing_pbs_snapshot_namespace(
|
|
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
|
|
"absente",
|
|
)
|
|
assert not is_missing_pbs_snapshot_namespace(
|
|
PbsHttpError("/admin/datastore/store/snapshots", 400, "Bad Request"),
|
|
"/",
|
|
)
|
|
assert not is_missing_pbs_snapshot_namespace(
|
|
PbsHttpError("/admin/datastore/store/snapshots", 500, "Internal Server Error"),
|
|
"absente",
|
|
)
|
|
|
|
|
|
def test_normalize_pbs_snapshot() -> None:
|
|
summary = normalize_pbs_snapshot(
|
|
"PBS01",
|
|
"RAID5",
|
|
"serveurs-internes",
|
|
{
|
|
"backup-type": "vm",
|
|
"backup-id": "1110001",
|
|
"backup-time": 1775849405,
|
|
"size": 123456789,
|
|
},
|
|
)
|
|
|
|
assert summary is not None
|
|
assert summary.server_name == "PBS01"
|
|
assert summary.vmid == 1110001
|
|
assert summary.guest_type == "qemu"
|
|
assert summary.snapshot_count == 1
|
|
assert summary.newest_backup_size_bytes == 123456789
|
|
|
|
|
|
def test_merge_pbs_snapshot_summary_keeps_newest_size() -> None:
|
|
older = normalize_pbs_snapshot(
|
|
"PBS01",
|
|
"RAID5",
|
|
"serveurs-internes",
|
|
{
|
|
"backup-type": "vm",
|
|
"backup-id": "1110001",
|
|
"backup-time": 1775849405,
|
|
"size": 100,
|
|
},
|
|
)
|
|
newer = normalize_pbs_snapshot(
|
|
"PBS01",
|
|
"RAID5",
|
|
"serveurs-internes",
|
|
{
|
|
"backup-type": "vm",
|
|
"backup-id": "1110001",
|
|
"backup-time": 1775935805,
|
|
"size": 200,
|
|
},
|
|
)
|
|
|
|
assert older is not None
|
|
assert newer is not None
|
|
merged = merge_pbs_snapshot_summary(older, newer)
|
|
|
|
assert merged.snapshot_count == 2
|
|
assert merged.newest_backup_size_bytes == 200
|
|
|
|
|
|
def test_normalize_pbs_namespace_root() -> None:
|
|
assert normalize_pbs_namespace("") == "/"
|
|
assert normalize_pbs_namespace(None) == "/"
|
|
assert normalize_pbs_namespace("progiciels") == "progiciels"
|
|
|
|
|
|
def test_normalize_backup_job() -> None:
|
|
job = normalize_backup_job(
|
|
{
|
|
"id": "backup-prod",
|
|
"storage": "backup-storage",
|
|
"schedule": "daily 02:00",
|
|
"disable": 1,
|
|
"mode": "snapshot",
|
|
"vmid": "100,101",
|
|
"exclude": "101",
|
|
}
|
|
)
|
|
|
|
assert job.job_id == "backup-prod"
|
|
assert job.storage == "backup-storage"
|
|
assert job.schedule == "daily 02:00"
|
|
assert job.enabled is False
|
|
assert job.mode == "snapshot"
|
|
assert job.selection == "vmid=100,101"
|
|
assert job.excluded == "101"
|
|
|
|
|
|
def test_normalize_guest() -> None:
|
|
guest = normalize_guest(
|
|
{
|
|
"id": "qemu/100",
|
|
"vmid": "100",
|
|
"type": "qemu",
|
|
"name": "srv-app",
|
|
"node": "pve01",
|
|
"status": "running",
|
|
}
|
|
)
|
|
|
|
assert guest is not None
|
|
assert guest.vmid == 100
|
|
assert guest.name == "srv-app"
|
|
assert guest.guest_type == "qemu"
|
|
assert guest.node == "pve01"
|
|
assert guest.status == "running"
|
|
|
|
|
|
def test_guest_notes_reads_description_and_strips_empty() -> None:
|
|
assert guest_notes({"description": " notes applicatives "}) == "notes applicatives"
|
|
assert guest_notes({"description": " "}) is None
|
|
|
|
|
|
def test_collect_guests_enriches_notes_from_vm_and_ct_config() -> None:
|
|
guests = collect_guests(
|
|
FakePveClient(
|
|
[
|
|
{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"},
|
|
{"id": "lxc/200", "vmid": 200, "type": "lxc", "name": "ct", "node": "pve02"},
|
|
],
|
|
configs={
|
|
("qemu", "pve01", 100): {"description": "note VM"},
|
|
("lxc", "pve02", 200): {"description": "note CT"},
|
|
},
|
|
)
|
|
)
|
|
|
|
assert [guest.notes for guest in guests] == ["note VM", "note CT"]
|
|
|
|
|
|
def test_collect_guests_keeps_guest_when_notes_are_unavailable() -> None:
|
|
issues = []
|
|
|
|
guests = collect_guests(
|
|
FakePveClient(
|
|
[{"id": "qemu/100", "vmid": 100, "type": "qemu", "name": "vm", "node": "pve01"}],
|
|
config_errors={
|
|
("qemu", "pve01", 100): PveApiError("config indisponible"),
|
|
},
|
|
),
|
|
issues,
|
|
)
|
|
|
|
assert len(guests) == 1
|
|
assert guests[0].notes is None
|
|
assert len(issues) == 1
|
|
assert issues[0].message == "notes VM/CT indisponibles"
|
|
|
|
|
|
def test_normalize_pool_members() -> None:
|
|
pool = normalize_pool(
|
|
"prod",
|
|
{
|
|
"members": [
|
|
{"type": "qemu", "vmid": 100},
|
|
{"type": "lxc", "id": "lxc/101"},
|
|
{"type": "storage", "id": "storage/local"},
|
|
]
|
|
},
|
|
)
|
|
|
|
assert pool.pool_id == "prod"
|
|
assert pool.vmids == {100, 101}
|
|
|
|
|
|
def test_normalize_last_backup_result_success() -> None:
|
|
result = normalize_last_backup_result(
|
|
{
|
|
"type": "vzdump",
|
|
"id": "100",
|
|
"status": "OK",
|
|
"starttime": 1778119140,
|
|
"endtime": 1778119200,
|
|
"node": "pve01",
|
|
}
|
|
)
|
|
|
|
assert result is not None
|
|
assert result.vmid == 100
|
|
assert result.status == "succes"
|
|
assert result.duration_seconds == 60
|
|
assert result.node == "pve01"
|
|
|
|
|
|
def test_normalize_last_backup_result_failure() -> None:
|
|
result = normalize_last_backup_result(
|
|
{
|
|
"type": "vzdump",
|
|
"id": "100",
|
|
"status": "interrupted",
|
|
"endtime": 1778119200,
|
|
}
|
|
)
|
|
|
|
assert result is not None
|
|
assert result.status == "echec"
|
|
|
|
|
|
def test_extract_task_vmid_from_guest_id() -> None:
|
|
assert extract_task_vmid({"id": "qemu/100"}) == 100
|
|
assert extract_task_vmid({"id": "lxc/101"}) == 101
|
|
|
|
|
|
def test_extract_task_vmid_from_upid() -> None:
|
|
assert extract_task_vmid({"upid": "UPID:pve01:123:456:789:vzdump:100:root@pam:"}) == 100
|
|
|
|
|
|
def test_parse_vzdump_task_log_extracts_per_guest_results() -> None:
|
|
results = parse_vzdump_task_log(
|
|
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve01"},
|
|
[
|
|
"INFO: Starting Backup of VM 100 (qemu)",
|
|
"INFO: Finished Backup of VM 100 (00:00:10)",
|
|
"INFO: Starting Backup of CT 101 (lxc)",
|
|
"INFO: Finished Backup of CT 101 (00:00:08)",
|
|
],
|
|
)
|
|
|
|
assert {result.vmid for result in results} == {100, 101}
|
|
assert {result.status for result in results} == {"succes"}
|
|
assert {result.vmid: result.duration_seconds for result in results} == {
|
|
100: 10,
|
|
101: 8,
|
|
}
|
|
|
|
|
|
def test_parse_vzdump_task_log_extracts_failure() -> None:
|
|
results = parse_vzdump_task_log(
|
|
{"type": "vzdump", "status": "interrupted", "endtime": 1778119200, "node": "pve01"},
|
|
[
|
|
"INFO: Starting Backup of VM 100 (qemu)",
|
|
"ERROR: Backup of VM 100 failed - command failed",
|
|
],
|
|
)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].vmid == 100
|
|
assert results[0].status == "echec"
|
|
|
|
|
|
def test_parse_vzdump_task_log_extracts_job_vmid_list_and_skips_external() -> None:
|
|
results = parse_vzdump_task_log(
|
|
{"type": "vzdump", "status": "OK", "endtime": 1778119200, "node": "pve02"},
|
|
[
|
|
"INFO: starting new backup job: vzdump 100 101 102 --mode snapshot --storage PBS",
|
|
"INFO: skip external VMs: 101",
|
|
],
|
|
)
|
|
|
|
assert {result.vmid for result in results} == {100, 102}
|
|
assert {result.status for result in results} == {"succes"}
|
|
|
|
|
|
def test_parse_vzdump_task_log_uses_task_duration_for_started_guest_without_finished_line() -> None:
|
|
results = parse_vzdump_task_log(
|
|
{
|
|
"type": "vzdump",
|
|
"status": "OK",
|
|
"starttime": 1778119140,
|
|
"endtime": 1778119200,
|
|
"node": "pve01",
|
|
},
|
|
[
|
|
"INFO: starting new backup job: vzdump 100 101 --mode snapshot --storage PBS",
|
|
"INFO: skip external VMs: 101",
|
|
"INFO: Starting Backup of VM 100 (qemu)",
|
|
"INFO: creating Proxmox Backup Server archive 'vm/100/2026-05-07T00:00:00Z'",
|
|
],
|
|
)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].vmid == 100
|
|
assert results[0].duration_seconds == 60
|
|
|
|
|
|
def test_extract_finished_backup_from_log_line_extracts_duration() -> None:
|
|
assert extract_finished_backup_from_log_line(
|
|
"INFO: Finished Backup of VM 100 (01:02:03)"
|
|
) == (100, 3723)
|
|
|
|
|
|
def test_task_duration_seconds_from_task_times() -> None:
|
|
assert task_duration_seconds({"starttime": 1778119140, "endtime": 1778119200}) == 60
|