############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import pytest
import atf
import os

# Ensure that job ids are not truncated
os.environ["SLURM_BITSTR_LEN"] = "0"


def resolve_array_job(job_id, task_id):
    return int(
        atf.run_command_output(
            f"squeue -j {job_id}_{task_id} --noheader --state=all --format=%A"
        ).strip()
    )


@pytest.fixture(scope="module", autouse=True)
def setup():
    # Test needs to run a het job with 3 components and 9 parallel tasks of an array
    # We need 9 nodes to be able to run with select_linear
    atf.require_nodes(9)
    atf.require_slurm_running()


@pytest.fixture(scope="function", autouse=True)
def cancel_jobs():
    yield
    atf.cancel_all_jobs()


def test_single_job():
    output = atf.run_command_output(
        "squeue --noheader --only-job-state --format='%i=%T'"
    ).strip()
    assert output == ""

    job_id = atf.submit_job_sbatch("--wrap='srun sleep infinity'")
    atf.wait_for_step(job_id, 0, fatal=True, timeout=60)

    output = atf.run_command_output(
        "squeue --noheader --only-job-state --format='%i=%T'"
    ).strip()
    assert output == f"{job_id}=RUNNING"

    output = atf.run_command_output(
        f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
    ).strip()
    assert output == f"{job_id}=RUNNING"

    output = atf.run_command_output(
        f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id - 1}"
    ).strip()
    assert output == ""

    atf.cancel_jobs([job_id])
    atf.wait_for_job_state(job_id, "DONE", fatal=True, timeout=60)

    output = atf.run_command_output(
        f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
    ).strip()
    assert output == ""

    output = atf.run_command_output(
        "squeue --noheader --only-job-state --format='%i=%T'"
    ).strip()
    assert output == ""

    output = atf.run_command_output(
        f"squeue --noheader --state=all --only-job-state --format='%i=%T' -j {job_id}"
    ).strip()
    assert output == f"{job_id}=CANCELLED"


def test_het_job():
    job_id = atf.submit_job_sbatch("-n1 : -n1 : -n1 --wrap='srun sleep infinity'")
    atf.wait_for_step(job_id, 0, fatal=True, timeout=60)

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id}+0=RUNNING" in output
    assert f"{job_id}+1=RUNNING" in output
    assert f"{job_id}+2=RUNNING" in output
    assert len(output) == 3

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}+0=RUNNING" in output
    assert f"{job_id}+1=RUNNING" in output
    assert f"{job_id}+2=RUNNING" in output
    assert len(output) == 3

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id - 1}"
        )
        .strip()
        .splitlines()
    )
    assert len(output) == 0

    atf.cancel_jobs([job_id])
    atf.wait_for_job_state(job_id, "DONE", fatal=True, timeout=60)

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    assert len(output) == 0

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert len(output) == 0

    output = (
        atf.run_command_output(
            f"squeue --state=all --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}+0=CANCELLED" in output
    assert f"{job_id}+1=CANCELLED" in output
    assert f"{job_id}+2=CANCELLED" in output
    assert len(output) == 3


def test_array_job():
    job_id = atf.submit_job_sbatch("--array=1-10 --hold --wrap='srun sleep infinity'")
    atf.wait_for_job_state(job_id, "PENDING", fatal=True, timeout=60)

    output = atf.run_command_output(
        "squeue --noheader --only-job-state --format='%i=%T'"
    ).strip()
    assert output == f"{job_id}_[1-10]=PENDING"

    atf.run_command(f"scontrol release {job_id}_5", fatal=True)

    j5_id = resolve_array_job(job_id, 5)

    atf.wait_for_job_state(j5_id, "RUNNING", fatal=True, timeout=60)

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id}_[1-4,6-10]=PENDING" in output
    assert f"{job_id}_5=RUNNING" in output
    assert len(output) == 2

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_[1-4,6-10]=PENDING" in output
    assert f"{job_id}_5=RUNNING" in output
    assert len(output) == 2

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {j5_id}"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_5=RUNNING" in output
    assert len(output) == 1

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}_5"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_5=RUNNING" in output
    assert len(output) == 1

    atf.cancel_jobs([j5_id])
    atf.wait_for_job_state(j5_id, "DONE", fatal=True, timeout=60)

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_[1-4,6-10]=PENDING" in output
    assert len(output) == 1

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id} --state=all"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_[1-4,6-10]=PENDING" in output
    assert f"{job_id}_5=CANCELLED" in output
    assert len(output) == 2

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {j5_id}"
        )
        .strip()
        .splitlines()
    )
    assert len(output) == 0

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {j5_id} --state=all"
        )
        .strip()
        .splitlines()
    )
    assert f"{job_id}_5=CANCELLED" in output
    assert len(output) == 1

    tasks = [1, 2, 3, 4, 6, 7, 8, 9, 10]
    atf.run_command(f"scontrol release {job_id}_[1-4,6-10]", fatal=True)
    for i in tasks:
        atf.wait_for_job_state(
            resolve_array_job(job_id, i), "RUNNING", fatal=True, timeout=60
        )

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    for i in tasks:
        assert f"{job_id}_{i}=RUNNING" in output
    assert len(output) == len(tasks)

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    for i in tasks:
        assert f"{job_id}_{i}=RUNNING" in output
    assert len(output) == len(tasks)

    for i in tasks:
        atf.wait_for_job_state(
            resolve_array_job(job_id, i), "RUNNING", fatal=True, timeout=60
        )

        # now check all array tasks
    tasks.append(5)

    atf.cancel_jobs([job_id])

    for i in tasks:
        atf.wait_for_job_state(
            resolve_array_job(job_id, i), "CANCELLED", fatal=True, timeout=60
        )

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' --state=all -j {job_id}"
        )
        .strip()
        .splitlines()
    )
    for i in tasks:
        assert f"{job_id}_{i}=CANCELLED" in output
    assert len(output) == len(tasks)

    output = (
        atf.run_command_output(
            f"squeue --noheader --only-job-state --format='%i=%T' -j {job_id} --state=all"
        )
        .strip()
        .splitlines()
    )
    for i in tasks:
        assert f"{job_id}_{i}=CANCELLED" in output
    assert len(output) == len(tasks)


def test_all_jobs():
    job_id1 = atf.submit_job_sbatch(
        "--overcommit --oversubscribe --mem=0 --array=1-100 --hold --wrap='srun sleep infinity'"
    )
    job_id2 = atf.submit_job_sbatch(
        "--overcommit --oversubscribe --mem=0 --array=1-100:4 --hold --wrap='srun sleep infinity'"
    )
    job_id3 = atf.submit_job_sbatch(
        "--overcommit --oversubscribe --mem=0 -n1 : -n1 : -n1 : -n1 --hold --wrap='srun sleep infinity'"
    )
    job_id4 = atf.submit_job_sbatch(
        "--overcommit --oversubscribe --mem=0 -n6 --hold --wrap='srun sleep infinity'"
    )

    atf.wait_for_job_state(job_id1, "PENDING", fatal=True, timeout=60)
    atf.wait_for_job_state(job_id2, "PENDING", fatal=True, timeout=60)
    atf.wait_for_job_state(job_id3, "PENDING", fatal=True, timeout=60)
    atf.wait_for_job_state(job_id4, "PENDING", fatal=True, timeout=60)

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id1}_[1-100]=PENDING" in output
    assert (
        f"{job_id2}_[1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97]=PENDING"
        in output
    )
    assert f"{job_id3}+0=PENDING" in output
    assert f"{job_id3}+1=PENDING" in output
    assert f"{job_id3}+2=PENDING" in output
    assert f"{job_id3}+3=PENDING" in output
    assert f"{job_id4}=PENDING" in output
    assert len(output) == 7

    job1_tasks = [2, 55, 74]
    atf.run_command(f"scontrol release {job_id1}_[2,55,74]", fatal=True)
    for i in job1_tasks:
        atf.wait_for_job_state(
            resolve_array_job(job_id1, i), "RUNNING", fatal=True, timeout=60
        )

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id1}_[1,3-54,56-73,75-100]=PENDING" in output
    assert f"{job_id1}_2=RUNNING" in output
    assert f"{job_id1}_55=RUNNING" in output
    assert f"{job_id1}_74=RUNNING" in output
    assert (
        f"{job_id2}_[1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97]=PENDING"
        in output
    )
    assert f"{job_id3}+0=PENDING" in output
    assert f"{job_id3}+1=PENDING" in output
    assert f"{job_id3}+2=PENDING" in output
    assert f"{job_id3}+3=PENDING" in output
    assert f"{job_id4}=PENDING" in output
    assert len(output) == 10

    for i in job1_tasks:
        atf.cancel_jobs([resolve_array_job(job_id1, i)])

    job2_tasks = [13, 49, 65, 81]
    atf.run_command(f"scontrol release {job_id2}_[13,49,65,81]", fatal=True)
    for i in job2_tasks:
        atf.wait_for_job_state(
            resolve_array_job(job_id2, i), "RUNNING", fatal=True, timeout=60
        )

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id1}_[1,3-54,56-73,75-100]=PENDING" in output
    assert (
        f"{job_id2}_[1,5,9,17,21,25,29,33,37,41,45,53,57,61,69,73,77,85,89,93,97]=PENDING"
        in output
    )
    assert f"{job_id2}_13=RUNNING" in output
    assert f"{job_id2}_49=RUNNING" in output
    assert f"{job_id2}_65=RUNNING" in output
    assert f"{job_id2}_81=RUNNING" in output
    assert f"{job_id3}+0=PENDING" in output
    assert f"{job_id3}+1=PENDING" in output
    assert f"{job_id3}+2=PENDING" in output
    assert f"{job_id3}+3=PENDING" in output
    assert f"{job_id4}=PENDING" in output
    assert len(output) == 11

    for i in job2_tasks:
        atf.cancel_jobs([resolve_array_job(job_id2, i)])

    atf.run_command(f"scontrol release {job_id3}", fatal=True)
    atf.wait_for_job_state(job_id3, "RUNNING", fatal=True, timeout=60)

    atf.run_command(f"scontrol release {job_id4}", fatal=True)
    atf.wait_for_job_state(job_id4, "RUNNING", fatal=True, timeout=60)

    output = (
        atf.run_command_output("squeue --noheader --only-job-state --format='%i=%T'")
        .strip()
        .splitlines()
    )
    assert f"{job_id1}_[1,3-54,56-73,75-100]=PENDING" in output
    assert (
        f"{job_id2}_[1,5,9,17,21,25,29,33,37,41,45,53,57,61,69,73,77,85,89,93,97]=PENDING"
        in output
    )
    assert f"{job_id3}+0=RUNNING" in output
    assert f"{job_id3}+1=RUNNING" in output
    assert f"{job_id3}+2=RUNNING" in output
    assert f"{job_id3}+3=RUNNING" in output
    assert f"{job_id4}=RUNNING" in output
    assert len(output) == 7
