blob: a859c4247f3c3c438e1f67647145e9eb0f91b63a [file] [log] [blame]
############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import atf
import pytest
import re
sleep_time = 90
kill_wait = 30
# Setup
@pytest.fixture(scope="module", autouse=True)
def setup():
atf.require_config_parameter("OverTimeLimit", 0)
atf.require_config_parameter("KillWait", kill_wait)
atf.require_config_parameter("InactiveLimit", sleep_time)
atf.require_slurm_running()
def test_t():
"""Verify srun job time limit function works (-t option)"""
# Execute a couple of three minute jobs; one with a one minute time
# limit and the other with a four minute time limit. Confirm jobs
# are terminated on a timeout as required. Note that Slurm time
# limit enforcement has a resolution of about one minute.
#
# Ideally the job gets a "job exceeded time limit" followed by a
# "Terminated" message, but if the timing is bad only the "Terminated"
# message gets sent. This is due to srun recognizing job termination
# prior to the message from slurmd being processed.
timeout = sleep_time + kill_wait
error = atf.run_command_error(
f"srun -t1 sleep {sleep_time}", timeout=timeout, xfail=True
)
assert re.search(r"time limit|terminated", error, re.IGNORECASE) is not None
error = atf.run_command_error(f"srun -t4 sleep {sleep_time}", timeout=timeout)
assert not (re.search(r"time limit|terminated", error, re.IGNORECASE)) is not None