blob: d9481af7548dbe1c165fa00ebe34335a95658970 [file] [edit]
############################################################################
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
############################################################################
import atf
import pytest
import re
@pytest.fixture(scope="module", autouse=True)
def setup():
atf.require_version(
(26, 5), "bin/sprio", reason="Ticket 22180: SLUID availability added in 26.05+"
)
atf.require_accounting()
atf.require_config_parameter("PriorityType", "priority/multifactor")
atf.require_slurm_running()
@pytest.fixture(scope="module")
def default_partition():
return atf.default_partition()
@pytest.fixture(scope="function")
def queued_job(default_partition):
nodes_dict = atf.get_nodes()
total_cpus = 0
for node_name in nodes_dict:
node_dict = nodes_dict[node_name]
node_partitions = node_dict["partitions"]
if default_partition in node_partitions and "IDLE" in node_dict["state"]:
total_cpus += node_dict["cpus"]
atf.submit_job_sbatch(
f'--output=/dev/null --error=/dev/null -n {total_cpus} --exclusive --wrap="sleep infinity"',
fatal=True,
)
queued_job_id = atf.submit_job_sbatch(
f'--output=/dev/null --error=/dev/null -n {total_cpus} --exclusive --wrap="sleep infinity"',
fatal=True,
)
return queued_job_id
def test_sprio_sluid(queued_job, default_partition):
"""Verify sprio can query by SLUID, filter, and print with %s option."""
sluid = atf.get_job_parameter(queued_job, "SLUID")
assert sluid is not None, f"Job {queued_job} has no SLUID"
fmt = "--noheader -o '%.6i %.14s'"
# sprio -j <job_id> prints the SLUID with %s
output = atf.run_command_output(f"sprio -j {queued_job} {fmt}", fatal=True)
assert re.search(
rf"{queued_job}\s+{re.escape(sluid)}", output
), f"Expected job_id and SLUID in output: {output}"
# sprio -j <SLUID> filters by SLUID
output = atf.run_command_output(f"sprio -j {sluid} {fmt}", fatal=True)
assert re.search(
rf"{queued_job}\s+{re.escape(sluid)}", output
), f"Expected job found by SLUID in output: {output}"
# sprio -j <job_id>,<SLUID> (both refer to the same job)
output = atf.run_command_output(f"sprio -j {queued_job},{sluid} {fmt}", fatal=True)
assert re.search(
rf"{queued_job}\s+{re.escape(sluid)}", output
), f"Expected job found by mixed ids in output: {output}"