| ############################################################################ |
| # Copyright (C) SchedMD LLC. |
| ############################################################################ |
| """ |
| Expedited requeue (--requeue=expedite) — expected behavior per SOW. |
| |
| Requirements (SOW): |
| - Job success (exit 0): job completes; no requeue or epilog check. |
| - Job failure (non-zero exit): allocated resources remain reserved until |
| Epilog has completed on all nodes (SOW 2a). |
| - Epilog indicates one or more nodes have a hardware failure (epilog |
| non-zero or node down): job presumed failed due to hardware and gets |
| expedited requeue scheduling (SOW 2b). |
| - All Epilogs indicate healthy nodes: job presumed failed due to |
| job-specific problems; no expedited requeue; all resources released |
| (SOW 2c). |
| - Expedited requeue jobs: effective infinite priority; time limit |
| recalculated (original minus prior run plus slack); not blocked by |
| accounting limits (SOW 3). |
| - No cred_expire delay before relaunch; SLUID tracks invocations (SOW 1). |
| |
| Tested in this file: |
| - Job success (exit 0): job completes, no requeue. |
| - Job failure, all epilogs succeed: job requeued in REQUEUE_HOLD, resources released. |
| - Job failure, at least one epilog fails: expedited requeue, node drained. |
| - Job success, epilog fails: node drained; job completes (no requeue criteria set, so no requeue). |
| - Node failure during job: expedited requeue, allocation cleared. |
| |
| Untested (not covered by this file): |
| - Resources kept idle until epilog completes (SOW 2a); no assertion. |
| - Expedited requeue disabled (enable_expedited_requeue off); submit rejected. |
| - SOW item 3: infinite priority, time limit recalculation, accounting bypass. |
| - Multi-node job with partial epilog failure (some nodes fail, some succeed). |
| - Node down or timeout while waiting for epilog complete. |
| - Requeue delay removed (no cred_expire wait before relaunch). |
| """ |
| import atf |
| import pytest |
| |
| pytestmark = pytest.mark.slow |
| |
| |
| # Setup |
| @pytest.fixture(scope="module", autouse=True) |
| def setup(): |
| atf.require_version( |
| (25, 11), |
| component="bin/sbatch", |
| reason="The --requeue=expedite option was added in 25.11", |
| ) |
| atf.require_auto_config("wants to set and unset Epilog") |
| atf.require_config_parameter_includes( |
| "SlurmctldParameters", "enable_expedited_requeue" |
| ) |
| atf.require_slurm_running() |
| |
| |
| @pytest.fixture(scope="function") |
| def node(setup): |
| yield next(iter(atf.nodes)) |
| |
| |
| @pytest.fixture(scope="function", autouse=True) |
| def resume_node(setup, node): |
| yield |
| atf.run_command( |
| f"scontrol update nodename={node} state=RESUME", |
| user=atf.properties["slurm-user"], |
| quiet=True, |
| ) |
| atf.wait_for_node_state(node, "IDLE", fatal=True) |
| |
| |
| @pytest.mark.xfail( |
| atf.get_version() < (25, 11, 3), |
| reason="Ticket 24564: Expedite requeue was transitioning jobs to REQUEUE_HOLD on success", |
| ) |
| def test_expedited_requeue_success(): |
| """Test that --requeue=expedite does NOT requeue on successful completion.""" |
| |
| # Submit a job that will succeed |
| job_id = atf.submit_job_sbatch( |
| '--requeue=expedite --wrap "true"', |
| fatal=True, |
| ) |
| |
| # Wait for job to complete |
| atf.wait_for_job_state(job_id, "DONE", fatal=True) |
| |
| # Verify job completed (not requeued) |
| job_state = atf.get_job_parameter(job_id, "JobState") |
| assert job_state == "COMPLETED", f"Job should have COMPLETED state, got {job_state}" |
| |
| |
| def test_expedited_requeue_failure(): |
| """Test that --requeue=expedite requeues with hold when job fails and all epilogs succeed (job-specific failure).""" |
| |
| # Submit a job that will fail (epilogs succeed by default) |
| job_id = atf.submit_job_sbatch( |
| '--requeue=expedite --wrap "false"', |
| fatal=True, |
| ) |
| |
| # Wait for the job to reach REQUEUE_HOLD (job failed, epilogs OK -> not expedited) |
| atf.repeat_until( |
| lambda: atf.get_job_parameter(job_id, "JobState"), |
| lambda state: state == "REQUEUE_HOLD", |
| ) |
| job_state = atf.get_job_parameter(job_id, "JobState") |
| assert ( |
| job_state == "REQUEUE_HOLD" |
| ), f"Job should be REQUEUE_HOLD when job fails and all epilogs succeed (no expedited requeue), got {job_state}" |
| |
| # Verify ExpeditedRequeue flag is properly set |
| expedited_requeue = atf.get_job_parameter(job_id, "ExpeditedRequeue") |
| assert ( |
| expedited_requeue == "Yes" |
| ), f"ExpeditedRequeue flag should be set for job {job_id}, got {expedited_requeue}" |
| |
| # Verify job was requeued (Restarts > 0 and ExitCode shows failure) |
| restart_cnt = atf.get_job_parameter(job_id, "Restarts") |
| assert ( |
| int(restart_cnt) > 0 |
| ), f"Job should have Restarts > 0 after requeue, got {restart_cnt}" |
| |
| exit_code = atf.get_job_parameter(job_id, "ExitCode") |
| assert exit_code.startswith( |
| "1:" |
| ), f"Job should have failed with exit code 1, got {exit_code}" |
| |
| |
| @pytest.fixture(scope="function") |
| def epilog_failure(tmp_path): |
| """ |
| Set Epilog to a failing script |
| """ |
| prev_epilog = atf.get_config_parameter("Epilog") |
| |
| epilog = str(tmp_path / "epilog.sh") |
| atf.make_bash_script( |
| epilog, |
| """#!/bin/bash |
| # Epilog that always fails to simulate failure detection |
| exit 1 |
| """, |
| ) |
| atf.set_config_parameter("Epilog", epilog) |
| |
| yield |
| |
| atf.set_config_parameter("Epilog", prev_epilog) |
| atf.run_command(f"rm -f {epilog}", fatal=True) |
| |
| |
| @pytest.mark.xfail( |
| atf.get_version() < (25, 11, 3), |
| reason="Ticket 24564: Expedite requeue was transitioning jobs to REQUEUE_HOLD on success", |
| ) |
| def test_expedited_requeue_epilog_failure(epilog_failure, node): |
| """When epilog fails with job exit 0, node is drained; job completes (no requeue criteria, so no requeue).""" |
| |
| # Submit a job that succeeds but epilog will fail (run long enough to see RUNNING) |
| job_id = atf.submit_job_sbatch( |
| f'--requeue=expedite -w {node} --wrap "true"', |
| fatal=True, |
| ) |
| |
| # Wait for job to complete (epilog fails, node drains; job completes) |
| assert atf.wait_for_job_state( |
| job_id, "COMPLETED" |
| ), "Job should be COMPLETED even if epilog failed" |
| |
| # Epilog failure must drain the node |
| assert atf.wait_for_node_state( |
| node, "DRAIN" |
| ), f"Node {node} should be drained when epilog failed" |
| |
| |
| def test_expedited_requeue_job_and_epilog_failure(epilog_failure, node): |
| """Test that --requeue=expedite does expedited requeue when both job and epilog fail.""" |
| |
| # Submit a job that fails (non-zero exit) AND epilog will fail (run long enough to see RUNNING) |
| job_id = atf.submit_job_sbatch( |
| f'--requeue=expedite -w {node} --wrap "false"', |
| fatal=True, |
| ) |
| |
| # Expedited requeue: job must NOT be in REQUEUE_HOLD (that would mean epilogs all succeeded) |
| assert atf.wait_for_job_state( |
| job_id, "EXPEDITING" |
| ), "Job should not be REQUEUE_HOLD when epilog failed (expedited requeue expected)" |
| |
| # Verify the node is unavailable (epilog failure drains the node). |
| # Assert on node state rather than job Reason: Reason only shows "DRAINED" |
| # after the scheduler runs; with SchedulerParameters=defer_batch or |
| # SlurmctldParameters=enable_rpc_queue the scheduler may not have run yet. |
| assert atf.wait_for_node_state( |
| node, "DRAIN" |
| ), f"Node {node} should be drained when epilog failed" |
| |
| # Verify ExpeditedRequeue flag is set |
| expedited_requeue = atf.get_job_parameter(job_id, "ExpeditedRequeue") |
| assert ( |
| expedited_requeue == "Yes" |
| ), f"ExpeditedRequeue flag should be set for job {job_id}, got {expedited_requeue}" |
| |
| |
| def test_expedited_requeue_node_failure(node): |
| """Test that --requeue=expedite does expedited requeue when node fails during job.""" |
| |
| # Submit a long-running job with expedited requeue on the specific node |
| job_id = atf.submit_job_sbatch( |
| f'--requeue=expedite -N1 -w {node} --wrap "sleep 60"', |
| fatal=True, |
| ) |
| |
| # Wait for job to start running |
| atf.wait_for_job_state(job_id, "RUNNING", fatal=True) |
| |
| # Simulate node failure by setting it DOWN (this test brings the node back up in finally) |
| atf.run_command( |
| f"scontrol update nodename={node} state=DOWN reason=test_node_failure", |
| user=atf.properties["slurm-user"], |
| fatal=True, |
| ) |
| |
| # Wait for job to be requeued (EXPEDITING) after node DOWN |
| assert atf.wait_for_job_state( |
| job_id, "EXPEDITING" |
| ), "Job should be EXPEDITING when node goes down" |
| |
| # Requeued job should lose its node that now is down |
| atf.repeat_until( |
| lambda: atf.get_job_parameter(job_id, "NodeList"), |
| lambda new_node: new_node != node, |
| ) |
| new_node = atf.get_job_parameter(job_id, "NodeList") |
| assert new_node != node, f"{node} should be removed from job's NodeList" |
| |
| # Verify ExpeditedRequeue flag is set |
| expedited_requeue = atf.get_job_parameter(job_id, "ExpeditedRequeue") |
| assert ( |
| expedited_requeue == "Yes" |
| ), f"ExpeditedRequeue flag should be set for job {job_id}, got {expedited_requeue}" |
| |
| # Verify job was requeued (Restarts > 0) |
| restart_cnt = atf.get_job_parameter(job_id, "Restarts") |
| assert ( |
| int(restart_cnt) > 0 |
| ), f"Job should have Restarts > 0 after node failure, got {restart_cnt}" |