blob: 9e9c9a4dccd3b1b837deee71064a3d2b9b198a10 [file] [log] [blame] [edit]
############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import atf
import pytest
import logging
import re
resources_yaml = """
- resource: power
mode: MODE_3
variables:
- name: full_node
value: 2000
- name: full_gpu_node
value: 5000
layers:
- nodes:
- "node[1-8]"
count: 40000
base:
- name: storage
value: 5000
- nodes:
- "node[9-16]"
count: 40000
- nodes:
- "node[17-24]"
count: 40000
- nodes:
- "node[25-32]"
count: 60000
- nodes:
- "node[1-16]"
count: 60000
base:
- name: network1
value: 3000
- nodes:
- "node[17-32]"
count: 80000
base:
- name: network2
value: 2000
- nodes:
- "node[1-32]"
count: 130000
base:
- name: acUnit1
value: 10000
- name: acUnit2
value: 8000
"""
# Setup
@pytest.fixture(scope="module", autouse=True)
def setup():
atf.require_version((25, 11), component="bin/sacctmgr")
atf.require_nodes(32)
atf.require_config_parameter("SelectType", "select/cons_tres")
atf.require_config_parameter("SelectTypeParameters", "CR_CPU")
atf.add_config_parameter_value(
"SchedulerParameters", "bf_interval=1,sched_interval=1"
)
atf.require_config_file("resources.yaml", resources_yaml)
atf.require_slurm_running()
def test_const1():
"""Test vars"""
job_id = atf.submit_job_sbatch(
'-N 4 --exclusive --resources=power:full_gpu_node --mem=1 --wrap="hostname"'
)
assert job_id != 0, "Job should be accepted with a valid var"
assert atf.wait_for_job_state(job_id, "DONE"), "Job should run with a valid var"
assert (
atf.submit_job_sbatch(
'-N 4 --exclusive --resources=power:foo --mem=1 --wrap="hostname"',
xfail=True,
)
== 0
), "Job should fail -- invalid var "
def test_enforce1():
"""Test one level enforcing"""
job_id = atf.submit_job_sbatch(
'-N 4 --exclusive --resources=power:10000 -w node[9-12] --mem=1 --wrap="hostname"'
)
assert job_id != 0, "Job should be accepted with power value at one level"
assert atf.wait_for_job_state(
job_id, "DONE"
), "Job should run with power value at one level"
assert (
atf.submit_job_sbatch(
'-N 4 --exclusive --resources=power:10000 -w node[1-4] --mem=1 --wrap="sleep 20"',
xfail=True,
)
== 0
), "Job should fail when not enough power on node[1-8] layer"
def test_enforce2():
"""Test multiple levels enforcing"""
job_id = atf.submit_job_sbatch(
'-N 10 --exclusive --resources=power:10000 --mem=1 --wrap="hostname"'
)
assert job_id != 0, "Job should be accepted with power value at two levels"
assert atf.wait_for_job_state(
job_id, "DONE"
), "Job should run with power value at two levels"
assert (
atf.submit_job_sbatch(
'-N 11 --exclusive --resources=power:10000 --mem=1 --wrap="sleep 20"',
xfail=True,
)
== 0
), "Job should fail when not enough power on node[1-32] layer"
def test_sched1():
"""Test multiple levels enforcing"""
job_power_per_node = 10000
job_id = atf.submit_job_sbatch(
f'-N 10 --exclusive --resources=power:{job_power_per_node} --mem=1 --wrap="hostname"',
fatal=True,
)
atf.wait_for_job_state(job_id, "DONE", fatal=True)
job_nodelist = set(
atf.node_range_to_list(atf.get_job_parameter(job_id, "NodeList"))
)
logging.info(f"NodeList:{job_nodelist}")
output = atf.run_command_output("scontrol -o show license power", fatal=True)
matches = re.findall(r"Total=(\d+).*?Nodes=([\w\[\]-]+)", output)
power_layers = [
(set(atf.node_range_to_list(nodeset)), int(total)) for total, nodeset in matches
]
logging.info(f"Power:{power_layers}")
for layer_nodes, layer_total in power_layers:
overlapping_nodes = job_nodelist.intersection(layer_nodes)
if overlapping_nodes:
job_usage_on_layer = len(overlapping_nodes) * job_power_per_node
assert (
job_usage_on_layer <= layer_total
), f"Job usage ({job_usage_on_layer}) on layer {layer_nodes} should not be bigger than total on layer ({layer_total})"