blob: bbfe48442831726f8edd285ab757e0484f88929d [file] [edit]
############################################################################
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved
############################################################################
import atf
import itertools
import pytest
topology_yaml = """
- topology: topo1
cluster_default: true
torus3d:
toruses:
- name: pod1
dims:
x: 4
y: 4
z: 2
regions:
- anchor: {x: 0, y: 0, z: 0}
dims: {x: 4, y: 4, z: 1}
nodes: node[1-16]
- anchor: {x: 0, y: 0, z: 1}
dims: {x: 4, y: 4, z: 1}
nodes: node[17-32]
placements:
- dims:
x: 2
y: 2
z: 2
- dims:
x: 2
y: 2
z: 1
- dims:
x: 4
y: 4
z: 2
"""
# All valid anchored 2x2x1 sub-cubes in the 4x4x2 torus (anchor_spacing=dims)
VALID_4NODE = [
{"node1", "node2", "node5", "node6"},
{"node3", "node4", "node7", "node8"},
{"node9", "node10", "node13", "node14"},
{"node11", "node12", "node15", "node16"},
{"node17", "node18", "node21", "node22"},
{"node19", "node20", "node23", "node24"},
{"node25", "node26", "node29", "node30"},
{"node27", "node28", "node31", "node32"},
]
# All valid anchored 2x2x2 sub-cubes
VALID_8NODE = [
{"node1", "node2", "node5", "node6", "node17", "node18", "node21", "node22"},
{"node3", "node4", "node7", "node8", "node19", "node20", "node23", "node24"},
{"node9", "node10", "node13", "node14", "node25", "node26", "node29", "node30"},
{"node11", "node12", "node15", "node16", "node27", "node28", "node31", "node32"},
]
# Valid 16-node allocations under --segment=8: union of any two distinct
# 8-node placements (C(4, 2) = 6 pairs)
VALID_16NODE_SEGMENT = [a | b for a, b in itertools.combinations(VALID_8NODE, 2)]
def assert_valid_placement(job_id, valid_sets):
"""Assert that the job's NodeList matches one of the valid placement sets."""
node_list = atf.get_job_parameter(job_id, "NodeList")
nodes = set(atf.node_range_to_list(node_list))
assert any(
nodes == v for v in valid_sets
), f"Job {job_id} should be allocated in a valid placement ({valid_sets}), but was allocated in {node_list}"
@pytest.fixture(scope="module", autouse=True)
def setup():
atf.require_version(
(26, 5),
component="sbin/slurmd",
reason="topology/torus3d was added in 26.05",
)
atf.require_config_parameter_includes("SchedulerParameters", "bf_interval=1")
atf.require_nodes(32)
atf.require_config_parameter("SelectType", "select/cons_tres")
atf.require_config_parameter("SelectTypeParameters", "CR_CPU")
atf.require_config_file("topology.yaml", topology_yaml)
atf.require_slurm_running()
def test_basic_placement():
"""Test basic 4-node placement (2x2x1)"""
job_id = atf.submit_job_sbatch('-N 4 --exclusive --mem=1 --wrap="hostname"')
assert job_id != 0, "Job should be accepted"
atf.wait_for_job_state(job_id, "DONE", fatal=True)
assert_valid_placement(job_id, VALID_4NODE)
def test_8_node_placement():
"""Test 8-node placement (2x2x2)"""
job_id = atf.submit_job_sbatch('-N 8 --exclusive --mem=1 --wrap="hostname"')
assert job_id != 0, "Job should be accepted"
atf.wait_for_job_state(job_id, "DONE", fatal=True)
assert_valid_placement(job_id, VALID_8NODE)
def test_full_torus():
"""Test full torus allocation (4x4x2 = 32 nodes)"""
job_id = atf.submit_job_sbatch('-N 32 --exclusive --mem=1 --wrap="hostname"')
assert job_id != 0, "Job should be accepted"
atf.wait_for_job_state(job_id, "DONE", fatal=True)
def test_unsupported_size():
"""Test that unsupported placement size is rejected"""
assert (
atf.submit_job_sbatch('-N 3 --exclusive --mem=1 --wrap="hostname"', xfail=True)
== 0
), "3-node job should fail -- no 3-node placement exists"
def test_concurrent_placements():
"""Test two concurrent 4-node placements fill without conflict"""
job_id_1 = atf.submit_job_sbatch('-N 4 --exclusive --mem=1 --wrap="sleep infinity"')
job_id_2 = atf.submit_job_sbatch('-N 4 --exclusive --mem=1 --wrap="sleep infinity"')
atf.wait_for_job_state(job_id_1, "RUNNING", fatal=True)
atf.wait_for_job_state(job_id_2, "RUNNING", fatal=True)
nodes_1 = set(atf.node_range_to_list(atf.get_job_parameter(job_id_1, "NodeList")))
nodes_2 = set(atf.node_range_to_list(atf.get_job_parameter(job_id_2, "NodeList")))
assert not nodes_1.intersection(nodes_2), "Concurrent jobs should not share nodes"
def test_frag_aware_placement():
"""Test that frag-aware placement fills a partially used 2x2x2 block.
Pin job1 to z=0 anchor (0,0,0): node[3,4,7,8].
Job2 (also 2x2x1) should land at z=1 same (x,y): node[19,20,23,24],
completing the 2x2x2 block rather than consuming a fresh z=0 placement
which would block another 2x2x2 anchor.
"""
expected_nodes = {"node19", "node20", "node23", "node24"}
job_id_1 = atf.submit_job_sbatch(
'-N 4 -w node[3,4,7,8] --exclusive --mem=1 --wrap="sleep infinity"'
)
atf.wait_for_job_state(job_id_1, "RUNNING", fatal=True)
job_id_2 = atf.submit_job_sbatch('-N 4 --exclusive --mem=1 --wrap="sleep infinity"')
atf.wait_for_job_state(job_id_2, "RUNNING", fatal=True)
nodes_2 = set(atf.node_range_to_list(atf.get_job_parameter(job_id_2, "NodeList")))
assert nodes_2 == expected_nodes, (
f"Frag-aware placement should fill z=1 below job1 (node[19,20,23,24]), "
f"but job2 got {nodes_2}"
)
def test_torus_constrained():
"""Test that allocating all z=0 nodes prevents 8-node (2x2x2) jobs.
Torus 4x4x2 flat mapping (x varies fastest):
z=0: node[1-16], z=1: node[17-32]
Every 2x2x2 placement needs 4 nodes from z=0.
With all z=0 nodes busy, no 2x2x2 placement can be satisfied,
even though 16 z=1 nodes are idle.
"""
z0_placements = [
"node[1,2,5,6]",
"node[3,4,7,8]",
"node[9,10,13,14]",
"node[11,12,15,16]",
]
job_ids = []
for nodes in z0_placements:
job_id = atf.submit_job_sbatch(
f'-N 4 -w {nodes} --exclusive --mem=1 --wrap="sleep infinity"'
)
job_ids.append(job_id)
for job_id in job_ids:
atf.wait_for_job_state(job_id, "RUNNING", fatal=True)
job_id_big = atf.submit_job_sbatch(
'-N 8 --exclusive --mem=1 --wrap="sleep infinity"'
)
assert job_id_big != 0, "Job should be accepted"
atf.wait_for_job_state(
job_id_big,
"PENDING",
desired_reason="No_suitable_topology_unit_found",
fatal=True,
)
def test_segment():
"""Test segment support -- 16 nodes as 2 segments of 8"""
job_id = atf.submit_job_sbatch(
'-N 16 --segment=8 --exclusive --mem=1 --wrap="hostname"'
)
assert job_id != 0, "Segmented job should be accepted"
atf.wait_for_job_state(job_id, "DONE", fatal=True)
assert_valid_placement(job_id, VALID_16NODE_SEGMENT)
def test_segment_bad_divisor():
"""Test that segment size not dividing job size is rejected"""
assert (
atf.submit_job_sbatch(
'-N 8 --segment=3 --exclusive --mem=1 --wrap="hostname"',
xfail=True,
)
== 0
), "Job should fail -- 8 % 3 != 0"