blob: d692abc9d57cc9f5aca16a488e42f602376f0ae2 [file] [log] [blame]
############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import atf
import pytest
import os
# import re
# import time
import pwd
max_mem_cpu = 2
max_mem_node = 1
min_num_nodes = 2
max_num_nodes = 1
max_time = 1
node_list = []
current_policy = ""
previous_limit = ""
p1_node_str = ""
# Limits in development that are not included in this test:
# AllocNodes, AllowGroups, QOS usage threshold
limits_dict = {
"MaxMemPerCPU": {
"flag": "--mem-per-cpu=",
"fail": max_mem_cpu + 1,
"pass": max_mem_cpu,
"_set": max_mem_cpu,
},
"MaxMemPerNode": {
"flag": "--mem=",
"fail": max_mem_node + 1,
"pass": max_mem_node,
"_set": max_mem_node,
},
"MinNodes": {
"flag": "-N",
"fail": min_num_nodes - 1,
"pass": min_num_nodes,
"_set": min_num_nodes,
},
"MaxNodes": {
"flag": "-N",
"fail": max_num_nodes + 1,
"pass": max_num_nodes,
"_set": max_num_nodes,
},
"MaxTime": {"flag": "-t", "fail": max_time + 1, "pass": max_time, "_set": max_time},
"AllowAccounts": {
"flag": "",
"fail": "--account=bad_account",
"pass": "--account=good_account",
"_set": "good_account",
},
"AllowQos": {
"flag": "",
"fail": "--qos=bad_qos",
"pass": "--qos=good_qos",
"_set": "good_qos",
},
}
# Setup
@pytest.fixture(scope="module", autouse=True)
def setup():
global node_list, p1_node_str
atf.require_auto_config("wants to change partitions, modify/create nodes")
atf.require_accounting(modify=True)
atf.require_config_parameter("AccountingStorageEnforce", "limits")
# Gather a list of nodes that meet the RealMemory Requirement (node_list)
atf.require_nodes(4, [("RealMemory", max_mem_node + 1)])
p1_node_str = "node1,node2"
atf.require_config_parameter(
"PartitionName",
{
"p1": {"Nodes": p1_node_str, "Default": "NO", "State": "UP"},
"p2": {
"Nodes": "node3,node4",
"State": "UP",
"MaxTime": "INFINITE",
},
},
)
atf.require_slurm_running()
@pytest.fixture(scope="module")
def setup_account():
test_user = pwd.getpwuid(os.getuid())[0]
atf.run_command(
"sacctmgr -vi add account good_account",
user=atf.properties["slurm-user"],
fatal=True,
)
atf.run_command(
f"sacctmgr -vi add user {test_user} account=good_account",
user=atf.properties["slurm-user"],
fatal=True,
)
atf.run_command(
"sacctmgr -vi add qos good_qos,bad_qos",
user=atf.properties["slurm-user"],
fatal=True,
)
atf.run_command(
f"sacctmgr -vi modify user {test_user} set qos=good_qos,bad_qos",
user=atf.properties["slurm-user"],
fatal=True,
)
@pytest.fixture(scope="function")
def cancel_jobs():
"""Cancel all jobs after each test"""
yield
atf.cancel_all_jobs(fatal=True)
# Helper funcs:
def set_enforce_part_limits_policy(policy):
global current_policy
if current_policy != policy:
atf.set_config_parameter("EnforcePartLimits", policy)
current_policy = policy
def set_partition_limit(limit_name, limit_value):
global previous_limit
if previous_limit:
atf.set_partition_parameter("p1", previous_limit, None)
atf.set_partition_parameter("p1", limit_name, limit_value)
previous_limit = limit_name
def satisfy_pending_job_limit(job_id, limit_name, val_pass):
atf.wait_for_job_state(job_id, "PENDING", poll_interval=0.1, fatal=True, quiet=True)
# Update partition limit to comply
atf.run_command(
f"scontrol update partitionname=p1 {limit_name}={val_pass}",
user=atf.properties["slurm-user"],
fatal=True,
quiet=True,
)
# Allow time for job to requeue and complete
atf.wait_for_job_state(job_id, "DONE", poll_interval=0.5, fatal=True, quiet=True)
# Test functions
def enforce_ALL(limit_name, flag, val_fail, val_pass):
# Must account for the higher value needed to pass first assert when using
# MaxMemPerCPU as it will allocate more cpus to fill the memory requirement
# If it can't allocate more cpus it will fail, so the limit will still be imposed
# on p2 (test 3) in a way because the MaxMemPerCPU will exceed the nodes RealMemory
# see https://slurm.schedmd.com/slurm.conf.html#OPT_MaxMemPerCPU
custom_val_fail = val_fail
if limit_name == "MaxMemPerCPU":
custom_val_fail = val_fail - 1
else:
if limit_name == "AllowAccounts":
custom_val_fail = ""
# 1 Reject p1,p2 with no p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_fail} --wrap "hostname" -o /dev/null'
)
== 0
), f"Job should fail on p1,p2 due to {limit_name} limit not met on the required partition p1 with EnforcePartLimits=ALL"
# 2 Reject p1 no limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_fail} --wrap "hostname" -o /dev/null')
== 0
), f"Job should fail on p1 due to {limit_name} limit not met on the required partition p1 with EnforcePartLimits=ALL"
# 3 Accept p2 no limit met ** This one and the first have a memory conflict
assert (
atf.submit_job_sbatch(
f'-p p2 {flag}{custom_val_fail} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p2 despite {limit_name} limit not met on the required partition p1 with EnforcePartLimits=ALL"
# 4 Accept p1 with limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_pass} --wrap "hostname" -o /dev/null')
!= 0
), f"Job should pass on p1 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=ALL"
# 5 Accept p1,p2 with p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_pass} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p1,p2 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=ALL"
def enforce_ANY(limit_name, flag, val_fail, val_pass):
# Must account for the higher value needed to pass first assert (from 'ALL') when using
# MaxMemPerCPU as it will allocate more cpus to fill the memory requirement
custom_val_fail = val_fail
if limit_name == "MaxMemPerCPU":
custom_val_fail = val_fail - 1
else:
if limit_name == "AllowAccounts":
custom_val_fail = ""
# 1 Accept p1,p2 with no p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{custom_val_fail} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p1,p2 despite {limit_name} not met on p1 with EnforcePartLimits=ANY"
# 2 Reject p1 no limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_fail} --wrap "hostname" -o /dev/null')
== 0
), f"Job should fail on p1 due to {limit_name} limit not met on the required partition p1 with EnforcePartLimits=ANY"
# 3 Accept p2 no limit met ** This one and the first have a memory conflict
assert (
atf.submit_job_sbatch(
f'-p p2 {flag}{custom_val_fail} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p2 despite {limit_name} limit not met on the required partition p1 with EnforcePartLimits=ANY"
# 4 Accept p1 with limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_pass} --wrap "hostname" -o /dev/null')
!= 0
), f"Job should pass on p1 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=ANY"
# 5 Accept p1,p2 with p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_pass} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p1,p2 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=ANY"
def enforce_NO(limit_name, flag, val_fail, val_pass):
# Must account for the higher value needed to pass first assert (from 'ALL') when using
# MaxMemPerCPU as it will allocate more cpus to fill the memory requirement
custom_val_fail = val_fail
if limit_name == "MaxMemPerCPU":
custom_val_fail = val_fail - 1
else:
if limit_name == "AllowAccounts":
custom_val_fail = ""
# 1 Submit -> pend on p1,p2 with bad p1 limit set -> complete with p1 limit met
job_id = atf.submit_job_sbatch(
f'-p p1,p2 {flag}{custom_val_fail} --wrap "hostname&" -o /dev/null'
)
satisfy_pending_job_limit(job_id, limit_name, custom_val_fail)
assert (
atf.get_job_parameter(job_id, "JobState", quiet=True) == "COMPLETED"
), f"Job should submit, pend, then complete on p1,p2 with updated limit {limit_name} on partition p1 to passing valueswith EnforcePartLimits=NO"
# 2 Submit -> pend on just p1 with bad limit, then complete with good limit
job_id = atf.submit_job_sbatch(
f'-p p1 {flag}{custom_val_fail} --wrap "hostname&" -o /dev/null'
)
satisfy_pending_job_limit(job_id, limit_name, custom_val_fail)
assert (
atf.get_job_parameter(job_id, "JobState", quiet=True) == "COMPLETED"
), f"Job should submit, pend, then complete on p1 with updated limit {limit_name} on partition p1 to passing values with EnforcePartLimits=NO"
# 3 Submit -> complete on p2 with no limit set
assert (
atf.submit_job_sbatch(
f'-p p2, {flag}{custom_val_fail} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p2 despite {limit_name} limit not met on the required partition p1 with EnforcePartLimits=NO"
# 4 Submit -> complete on p1 with p1 limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_pass} --wrap "hostname" -o /dev/null')
!= 0
), f"Job should pass on p1 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=NO"
# 5 Submit -> complete on p1,p2 with p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_pass} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p1,p2 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=NO"
def enforce_NO_QOS(limit_name, flag, val_fail, val_pass):
flag = "--qos="
val_pass = "good_qos"
val_fail = "bad_qos"
# 1 Submit -> pend on p1,p2 with bad p1 limit set -> complete with p1 limit met
job_id = atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_fail} --wrap "hostname&" -o /dev/null'
)
satisfy_pending_job_limit(job_id, limit_name, f"{val_pass},{val_fail}")
assert (
atf.get_job_parameter(job_id, "JobState", quiet=True) == "COMPLETED"
), f"Job should submit, pend, then complete on p1,p2 with updated limit {limit_name} on partition p1 to passing valueswith EnforcePartLimits=NO"
# Reset partition QOS
atf.run_command(
f"scontrol update partitionname=p1 {limit_name}={val_pass}",
user=atf.properties["slurm-user"],
fatal=True,
quiet=True,
)
# 2 Submit -> pend on just p1 with bad limit, then complete with good limit
job_id = atf.submit_job_sbatch(
f'-p p1 {flag}{val_fail} --wrap "hostname&" -o /dev/null'
)
satisfy_pending_job_limit(job_id, limit_name, f"{val_pass},{val_fail}")
assert (
atf.get_job_parameter(job_id, "JobState", quiet=True) == "COMPLETED"
), f"Job should submit, pend, then complete on p1 with updated limit {limit_name} on partition p1 to passing values with EnforcePartLimits=NO"
# Reset partition QOS
atf.run_command(
f"scontrol update partitionname=p1 {limit_name}={val_pass}",
user=atf.properties["slurm-user"],
fatal=True,
quiet=True,
)
# 3 Submit -> complete on p2 with no limit set
assert (
atf.submit_job_sbatch(f'-p p2, {flag}{val_fail} --wrap "hostname" -o /dev/null')
!= 0
), f"Job should pass on p2 despite {limit_name} limit not met on the required partition p1 with EnforcePartLimits=NO"
# 4 Submit -> complete on p1 with p1 limit met
assert (
atf.submit_job_sbatch(f'-p p1 {flag}{val_pass} --wrap "hostname" -o /dev/null')
!= 0
), f"Job should pass on p1 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=NO"
# 5 Submit -> complete on p1,p2 with p1 limit met
assert (
atf.submit_job_sbatch(
f'-p p1,p2 {flag}{val_pass} --wrap "hostname" -o /dev/null'
)
!= 0
), f"Job should pass on p1,p2 due to {limit_name} limit met on the required partition p1 with EnforcePartLimits=NO"
# Tests:
@pytest.mark.parametrize("limit_name", limits_dict.keys())
def test_ALL(limit_name, setup_account, cancel_jobs):
"""Verify jobs are accepted and rejected with EnforePartLimits=ALL"""
set_enforce_part_limits_policy("ALL")
value = limits_dict[limit_name]
set_partition_limit(limit_name, value["_set"])
enforce_ALL(limit_name, value["flag"], value["fail"], value["pass"])
@pytest.mark.parametrize("limit_name", limits_dict.keys())
def test_ANY(limit_name, setup_account, cancel_jobs):
"""Verify jobs are accepted and rejected with EnforePartLimits=ANY"""
set_enforce_part_limits_policy("ANY")
value = limits_dict[limit_name]
set_partition_limit(limit_name, value["_set"])
enforce_ANY(limit_name, value["flag"], value["fail"], value["pass"])
@pytest.mark.parametrize("limit_name", limits_dict.keys())
def test_NO(limit_name, setup_account, cancel_jobs):
"""Verify jobs are accepted and rejected with EnforePartLimits=NO"""
set_enforce_part_limits_policy("NO")
value = limits_dict[limit_name]
set_partition_limit(limit_name, value["_set"])
if limit_name == "AllowQos":
enforce_NO_QOS(limit_name, value["flag"], value["fail"], value["pass"])
else:
enforce_NO(limit_name, value["flag"], value["fail"], value["pass"])