blob: 6bd9b95e5f4d21aa14077a53cfa2b4acf6ddcd99 [file] [edit]
############################################################################
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
############################################################################
import re
import pytest
import atf
pytestmark = pytest.mark.slow
def setup_module():
atf.require_nodes(4, [("CPUs", 2)])
atf.require_slurm_running()
def submit_running_job():
"""Submit a job and wait for it to be running. Returns job_id."""
job_id = atf.submit_job_sbatch("-n1 --wrap 'srun sleep infinity'", fatal=True)
atf.wait_for_job_state(job_id, "RUNNING", fatal=True)
atf.wait_for_step(job_id, 0, fatal=True)
return job_id
def submit_multistep_job():
"""Submit a batch job that launches 2 steps in parallel which sleep
indefinitely. Returns jobid.
"""
script = atf.module_tmp_path / "multistep.sh"
atf.make_bash_script(
script,
"""srun -n1 --mem=0 --overlap sleep infinity &
srun -n1 --mem=0 --overlap sleep infinity &
wait""",
)
job_id = atf.submit_job_sbatch(f"-N2 -O {script}", fatal=True)
atf.wait_for_job_state(job_id, "RUNNING", fatal=True)
atf.wait_for_step(job_id, 0, fatal=True)
atf.wait_for_step(job_id, 1, fatal=True)
return job_id
def get_running_user_steps(job_id):
"""Return set of running user step keys like {'<job_id>.0', '<job_id>.1'}."""
steps = atf.get_steps(job_id, quiet=True)
return {
k
for k, v in steps.items()
if v.get("State", "") == "RUNNING" and ".batch" not in k and ".extern" not in k
}
@pytest.mark.parametrize(
"id_type,step",
[
("JobId", False),
("JobId", True),
("SLUID", False),
("SLUID", True),
],
)
def test_verbose_signal(id_type, step):
"""Verify scancel --verbose shows correct 'Signal N to job/step <id>' message."""
if id_type == "SLUID":
atf.require_version(
(26, 5),
"bin/scancel",
reason="Ticket 22180: SLUID availability added in 26.05+",
)
job_id = submit_running_job()
identifier = str(atf.get_job_parameter(job_id, id_type))
target = f"{identifier}.0" if step else identifier
entity = "step" if step else "job"
result = atf.run_command(f"scancel --signal SIGHUP {target} --verbose")
assert re.search(
rf"Signal 1 to {entity} {re.escape(target)}", result["stderr"]
), f"Expected 'Signal 1 to {entity} {target}' in stderr: {result['stderr']}"
atf.wait_for_job_state(job_id, "FAILED", fatal=True)
@pytest.mark.skipif(
atf.get_version("bin/scancel") < (26, 5),
reason="Ticket 22180: SLUID availability added in 26.05+",
)
def test_verbose_mixed_ids():
"""Verify scancel --verbose output with a mix of job IDs, step IDs, SLUIDs, and SLUID steps.
Equivalent to:
scancel --signal SIGHUP <job1>,<job2>.0,<sluid3>,<sluid4>.0 --verbose
Expected output lines (in any order):
scancel: Signal 1 to job <job1>
scancel: Signal 1 to step <job2>.0
scancel: Signal 1 to job <sluid3>
scancel: Signal 1 to step <sluid4>.0
"""
job1 = submit_running_job()
job2 = submit_running_job()
job3 = submit_running_job()
job4 = submit_running_job()
sluid3 = atf.get_job_parameter(job3, "SLUID")
sluid4 = atf.get_job_parameter(job4, "SLUID")
ids = f"{job1},{job2}.0,{sluid3},{sluid4}.0"
result = atf.run_command(f"scancel --signal SIGHUP {ids} --verbose")
stderr = result["stderr"]
assert re.search(
rf"Signal 1 to job {job1}", stderr
), f"Missing 'Signal 1 to job {job1}' in: {stderr}"
assert re.search(
rf"Signal 1 to step {job2}\.0", stderr
), f"Missing 'Signal 1 to step {job2}.0' in: {stderr}"
assert re.search(
rf"Signal 1 to job {re.escape(sluid3)}", stderr
), f"Missing 'Signal 1 to job {sluid3}' in: {stderr}"
assert re.search(
rf"Signal 1 to step {re.escape(sluid4)}\.0", stderr
), f"Missing 'Signal 1 to step {sluid4}.0' in: {stderr}"
# Verify all four jobs are actually terminated
for jid in [job1, job2, job3, job4]:
atf.wait_for_job_state(jid, "FAILED", fatal=True)
@pytest.mark.parametrize(
"id_type,step,expected_state",
[
("JobId", False, "CANCELLED"),
("JobId", True, "FAILED"),
("SLUID", False, "CANCELLED"),
("SLUID", True, "FAILED"),
],
)
def test_cancel_terminates(id_type, step, expected_state):
"""Verify scancel with job/step ID or SLUID actually terminates the job."""
if id_type == "SLUID":
atf.require_version(
(26, 5),
"bin/scancel",
reason="Ticket 22180: SLUID availability added in 26.05+",
)
job_id = submit_running_job()
identifier = str(atf.get_job_parameter(job_id, id_type))
target = f"{identifier}.0" if step else identifier
atf.run_command(f"scancel {target}", fatal=True)
atf.wait_for_job_state(job_id, expected_state, fatal=True)
@pytest.mark.parametrize("id_type", ["JobId", "SLUID"])
def test_signal_delivers(id_type):
"""Verify scancel --signal with a job ID or SLUID delivers SIGSTOP/SIGCONT."""
if id_type == "SLUID":
atf.require_version(
(26, 5),
"bin/scancel",
reason="Ticket 22180: SLUID availability added in 26.05+",
)
job_id = submit_running_job()
identifier = str(atf.get_job_parameter(job_id, id_type))
atf.run_command(f"scancel --signal SIGSTOP {identifier}", fatal=True)
atf.wait_for_job_state(job_id, "STOPPED", fatal=True)
atf.run_command(f"scancel --signal SIGCONT {identifier}", fatal=True)
atf.wait_for_job_state(job_id, "RUNNING", fatal=True)
atf.run_command(f"scancel {identifier}", fatal=True)
atf.wait_for_job_state(job_id, "CANCELLED", fatal=True)
@pytest.mark.skipif(
atf.get_version("bin/scancel") < (26, 5),
reason="Ticket 22180: SLUID availability added in 26.05+",
)
def test_cancel_mixed_terminates():
"""Verify scancel with mixed job IDs, step IDs, SLUIDs, and SLUID steps terminates all jobs."""
job1 = submit_running_job()
job2 = submit_running_job()
job3 = submit_running_job()
job4 = submit_running_job()
sluid3 = atf.get_job_parameter(job3, "SLUID")
sluid4 = atf.get_job_parameter(job4, "SLUID")
ids = f"{job1},{job2}.0,{sluid3},{sluid4}.0"
atf.run_command(f"scancel {ids}", fatal=True)
# job cancel -> CANCELLED, step cancel -> FAILED
atf.wait_for_job_state(job1, "CANCELLED", fatal=True)
atf.wait_for_job_state(job2, "FAILED", fatal=True)
atf.wait_for_job_state(job3, "CANCELLED", fatal=True)
atf.wait_for_job_state(job4, "FAILED", fatal=True)
@pytest.mark.parametrize("id_type", ["JobId", "SLUID"])
def test_cancel_one_step(id_type):
"""Cancel only step 0 in a multi-step job; step 1 must survive."""
if id_type == "SLUID":
atf.require_version(
(26, 5),
"bin/scancel",
reason="Ticket 22180: SLUID availability added in 26.05+",
)
job_id = submit_multistep_job()
identifier = str(atf.get_job_parameter(job_id, id_type))
result = atf.run_command(f"scancel {identifier}.0 --verbose")
assert re.search(
rf"Terminating step {re.escape(identifier)}\.0", result["stderr"]
), f"Expected 'Terminating step {identifier}.0' in stderr: {result['stderr']}"
# Step 1 should still be running
assert atf.repeat_until(
lambda: get_running_user_steps(job_id),
lambda steps: f"{job_id}.0" not in steps and f"{job_id}.1" in steps,
poll_interval=1,
), "Step 1 should still be running after cancelling step 0"