blob: 7367742554a0e733d640f0ca51af1cbf773e4ef0 [file] [log] [blame]
############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import atf
import json
import pytest
import re
import logging
file_in1 = "input1"
file_in2 = "input2"
file_out = "output"
@pytest.fixture(scope="module", autouse=True)
def setup():
"""Setup for the test module"""
atf.require_config_parameter("AccountingStorageType", "accounting_storage/slurmdbd")
atf.require_config_parameter_includes("AccountingStorageTRES", "gres/gpu")
atf.require_config_parameter("SelectType", "select/cons_tres")
atf.require_config_parameter("SelectTypeParameters", "CR_CPU")
atf.require_tty(0)
atf.require_tty(1)
atf.require_config_parameter(
"Name", {"gpu": {"File": "/dev/tty[0-1]"}}, source="gres"
)
atf.require_config_parameter_includes("GresTypes", "gpu")
atf.require_nodes(
2, [("CPUs", 3), ("Sockets", 3), ("Gres", "gpu:2"), ("RealMemory", 1)]
)
atf.require_slurm_running()
def check_accounted_gpus(
job_id, job_gpus, step_gpus, req_gpus, have_gpu_types, batch_gpus
):
"""Validate that the job, batch step and step 0 of a job have the proper GPU counts using JSON.
Args:
job_id: Job ID to check
job_gpus: Expected number of GPUs for the job
step_gpus: Expected number of GPUs per step, None if no step to test
req_gpus: Expected number of GPUs requested per node/task/socket
have_gpu_types: Whether to look for GPU types in the accounting data
batch_gpus: Expected number of GPUs for the batch step
"""
def _get_job_with_sacct(job_id):
output = atf.run_command_output(
f"sacct --job={job_id} --json --start=now-15minutes",
fatal=True,
quiet=True,
)
data = json.loads(output)
jobs = data.get("jobs", [])
if len(jobs) != 1:
logging.debug("sacct reported wrong number jobs ({len(jobs)})")
return None
return jobs[0]
def _get_job_gpus(job_id, field):
job = _get_job_with_sacct(job_id)
if not job:
logging.debug("no job retrieved")
return -1
gpu = _gpus_from_tres_arrays([job.get("tres", {}).get(field, [])])
logging.debug(f"GPUs in {field} are {gpu}")
return gpu
def _get_step_gpus(job_id, step_id, field):
job = _get_job_with_sacct(job_id)
if not job:
logging.debug("no job retrieved")
return -1
step = next(
(
step
for step in job.get("steps", [])
if step.get("step", {}).get("id", "") == f"{job_id}.{step_id}"
),
None,
)
if not step:
logging.debug("sacct did not report step_id {job_id}.{step_id}")
return -1
gpu = _gpus_from_tres_arrays([step.get("tres", {}).get(field, [])])
logging.debug(f"GPUs in {field} of step {step_id} are {gpu}")
return gpu
def _gpus_from_tres_arrays(tres_arrays):
"""Helper to count GPU TRES from sacct JSON tres arrays."""
gpu_count = 0
for tres_array in tres_arrays:
for tres_item in tres_array:
if tres_item.get("type") == "gres" and tres_item.get("name") == "gpu":
gpu_count += tres_item.get("count", 0)
return gpu_count
# Check allocated GPUs for the whole job (AllocTRES)
assert atf.repeat_until(
lambda: _get_job_gpus(job_id, "allocated"),
lambda allocated_gpus: allocated_gpus == job_gpus,
timeout=10,
), f"Allocated GPUs reported by sacct in the job should be {job_gpus}"
# Check requested GPUs for the whole job (ReqTRES)
assert atf.repeat_until(
lambda: _get_job_gpus(job_id, "requested"),
lambda requested_gpus: requested_gpus == job_gpus,
timeout=10,
), f"Requested GPUs reported by sacct in the job should be {job_gpus}"
# Check allocated gpus for the step 0
if step_gpus is not None:
assert atf.repeat_until(
lambda: _get_step_gpus(job_id, 0, "allocated"),
lambda allocated_gpus: allocated_gpus == step_gpus,
timeout=10,
), f"Allocated GPUs reported by sacct in step 0 should be {step_gpus}"
# Check allocated gpus for the batch step
assert atf.repeat_until(
lambda: _get_step_gpus(job_id, "batch", "allocated"),
lambda allocated_gpus: allocated_gpus == batch_gpus,
timeout=10,
), f"Allocated GPUs reported by sacct in batch step should be {batch_gpus}"
def check_allocated_gpus(job_id, target):
"""Validate the job has the proper GPU counts.
Args:
job_id: Job ID to check
target: Expected GPU count
"""
# Extract GPU count from allocated TRES
tres_string = atf.get_job_parameter(job_id, "AllocTRES", "")
if not tres_string:
gpu_count = 0
else:
gpu_match = re.search(r"gres/gpu[=:](\d+)", tres_string)
gpu_count = int(gpu_match.group(1)) if gpu_match else 0
assert (
gpu_count == target
), f"GPUs accounted should be {target}, but found {gpu_count}"
def get_batch_gpus():
"""Helper function to find batch_gpus from different outputs.
Returns:
Number of batch GPUs
"""
batch_host = "unknown"
output = atf.run_command_output(f"cat {file_out}", fatal=True)
nodes_lines = re.findall(r" Nodes=+.*", output, re.MULTILINE)
if not nodes_lines:
pytest.fail("No Nodes lines found in output file")
node_line = nodes_lines[0]
if len(nodes_lines) > 1:
# Output type where nodes are split on 2 lines
# BatchHost=74dc179a_n1
# ...
# >Nodes=74dc179a_n1 CPU_IDs=0-1 Mem=150 GRES=[[gpu:2]](IDX:0-1)<
# Nodes=74dc179a_n2 CPU_IDs=0-1 Mem=150 GRES=gpu:1(IDX:0)
batch_host_match = re.search(r"BatchHost=(.*)", output)
if batch_host_match:
batch_host = batch_host_match.group(1)
for line in nodes_lines:
if batch_host in line:
node_line = line
break
batch_gpus_match = re.search(r"gpu:(?:[^:( ]+:)?(\d+)", node_line)
assert batch_gpus_match, "Unable to get batch_gpus"
return int(batch_gpus_match.group(1))
@pytest.fixture(scope="function", autouse=True)
def clear_output_file():
yield
atf.run_command(f"rm -f {file_out}", quiet=True)
def test_gpus_per_node_job():
"""Test --gpus-per-node option by job."""
atf.make_bash_script(
file_in1,
"""
scontrol -dd show job ${SLURM_JOBID}
exit 0
""",
)
nb_nodes = 2
req_gpus = 2
target = nb_nodes * req_gpus
job_id = atf.submit_job_sbatch(
f"--gpus-per-node={req_gpus} -N{nb_nodes} -t1 -o {file_out} -J test_gpus_per_node_job {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, None, req_gpus, False, batch_gpus)
def test_gpus_job():
"""Test --gpus option by job."""
atf.make_bash_script(
file_in1,
"""
scontrol -dd show job ${SLURM_JOBID}
exit 0
""",
)
target = 2
job_id = atf.submit_job_sbatch(
f"--gpus={target} -N2 -t1 -o {file_out} -J test_gpus_job {file_in1}", fatal=True
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, None, target, False, batch_gpus)
def test_gpus_per_task_job():
"""Test --gpus-per-task option by job."""
atf.make_bash_script(
file_in1,
"""
scontrol -dd show job ${SLURM_JOBID}
exit 0
""",
)
nb_tasks = 3
req_gpus = 1
job_id = atf.submit_job_sbatch(
f"--gpus-per-task={req_gpus} -N2 -n{nb_tasks} -t1 -o {file_out} -J test_gpus_per_task_job {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, nb_tasks)
check_accounted_gpus(job_id, nb_tasks, None, req_gpus, False, batch_gpus)
# TODO: Remove xfail once ticket 19605 is fixed.
@pytest.mark.xfail(reason="Ticket 19605. ReqTRES should not be > AllocTRES.")
def test_gpus_per_socket_job():
"""Test --gpus-per-socket option by job."""
atf.make_bash_script(
file_in1,
"""
scontrol -dd show job ${SLURM_JOBID}
exit 0
""",
)
nb_nodes = 2
nb_sockets = 2
cpus_per_task = 1
req_gpus = 1
target = nb_nodes
job_id = atf.submit_job_sbatch(
f"--gpus-per-socket={req_gpus} -N{nb_nodes} --ntasks={nb_nodes} --sockets-per-node={nb_sockets} --cpus-per-task={cpus_per_task} -t1 -o {file_out} -J test_gpus_per_socket_job {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, None, req_gpus, False, batch_gpus)
def test_gpus_per_node_step():
"""Test --gpus-per-node option by step."""
atf.make_bash_script(
file_in1,
f"""
srun {file_in2}
exit 0
""",
)
atf.make_bash_script(
file_in2,
"""
if [ $SLURM_PROCID -eq 0 ]; then
scontrol -dd show job ${SLURM_JOBID}
scontrol show step ${SLURM_JOBID}.${SLURM_STEPID}
fi
exit 0
""",
)
nb_nodes = 2
req_gpus = 2
target = nb_nodes * req_gpus
job_id = atf.submit_job_sbatch(
f"--gpus-per-node={req_gpus} -N{nb_nodes} -t1 -o {file_out} -J test_gpus_per_node_step {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, target, req_gpus, False, batch_gpus)
def test_gpus_step():
"""Test --gpus option by step."""
atf.make_bash_script(
file_in1,
f"""
srun {file_in2}
exit 0
""",
)
atf.make_bash_script(
file_in2,
"""
if [ $SLURM_PROCID -eq 0 ]; then
scontrol -dd show job ${SLURM_JOBID}
scontrol show step ${SLURM_JOBID}.${SLURM_STEPID}
fi
exit 0
""",
)
nb_nodes = 2
target = 2
job_id = atf.submit_job_sbatch(
f"--gpus={target} -N{nb_nodes} -t1 -o {file_out} -J test_gpus_step {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, target, target, False, batch_gpus)
def test_gpus_per_task_step():
"""Test --gpus-per-task option by step."""
atf.make_bash_script(
file_in1,
f"""
srun {file_in2}
exit 0
""",
)
atf.make_bash_script(
file_in2,
"""
if [ $SLURM_PROCID -eq 0 ]; then
scontrol -dd show job ${SLURM_JOBID}
scontrol show step ${SLURM_JOBID}.${SLURM_STEPID}
fi
exit 0
""",
)
nb_nodes = 2
nb_tasks = 3
req_gpus = 1
job_id = atf.submit_job_sbatch(
f"--gpus-per-task={req_gpus} -N{nb_nodes} -n{nb_tasks} -t1 -o {file_out} -J test_gpus_per_task_step {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, nb_tasks)
check_accounted_gpus(job_id, nb_tasks, nb_tasks, req_gpus, False, batch_gpus)
# TODO: Remove xfail once ticket 19605 is fixed.
@pytest.mark.xfail(reason="Ticket 19605. ReqTRES should not be > AllocTRES.")
def test_gpus_per_socket_step():
"""Test --gpus-per-socket option by step."""
atf.make_bash_script(
file_in1,
f"""
srun {file_in2}
exit 0
""",
)
atf.make_bash_script(
file_in2,
"""
if [ $SLURM_PROCID -eq 0 ]; then
scontrol -dd show job ${SLURM_JOBID}
scontrol show step ${SLURM_JOBID}.${SLURM_STEPID}
fi
exit 0
""",
)
nb_nodes = 2
nb_sockets = 2
cpus_per_task = 1
req_gpus = 1
target = nb_nodes
job_id = atf.submit_job_sbatch(
f"--gpus-per-socket={req_gpus} -N{nb_nodes} --ntasks={nb_nodes} --sockets-per-node={nb_sockets} --cpus-per-task={cpus_per_task} -t1 -o {file_out} -J test_gpus_per_socket_step {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, target)
check_accounted_gpus(job_id, target, target, req_gpus, False, batch_gpus)
def test_gpus_per_task_with_explicit_step():
"""Test --gpus-per-task option with explicit step node/tasks."""
nb_nodes = 2
step_nodes = 2
job_tasks = 3
step_tasks = 2
req_gpus = 1
atf.make_bash_script(
file_in1,
f"""
srun -N{step_nodes} -n{step_tasks} {file_in2}
exit 0
""",
)
atf.make_bash_script(
file_in2,
"""
if [ $SLURM_PROCID -eq 0 ]; then
scontrol -dd show job ${SLURM_JOBID}
scontrol show step ${SLURM_JOBID}.${SLURM_STEPID}
fi
exit 0
""",
)
job_id = atf.submit_job_sbatch(
f"--gpus-per-task={req_gpus} -N{nb_nodes} -n{job_tasks} -t1 -o {file_out} -J test_gpus_per_task_with_explicit_step {file_in1}",
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
batch_gpus = get_batch_gpus()
check_allocated_gpus(job_id, job_tasks)
check_accounted_gpus(job_id, job_tasks, step_tasks, req_gpus, False, batch_gpus)