| ############################################################################## |
| # ATF (Automated Testing Framework) python testsuite module |
| # Copyright (C) SchedMD LLC. |
| ############################################################################## |
| import collections |
| import errno |
| |
| # import glob |
| import logging |
| import math |
| import os |
| import pwd |
| import pathlib |
| import pytest |
| import re |
| import shutil |
| import stat |
| import socket |
| import subprocess |
| import sys |
| import time |
| import traceback |
| |
| # slurmrestd |
| import requests |
| import signal |
| |
| import json |
| |
| # This module will be (un)imported in require_openapi_generator() |
| openapi_client = None |
| import importlib |
| |
| ############################################################################## |
| # ATF module functions |
| ############################################################################## |
| |
| default_command_timeout = 60 |
| default_polling_timeout = 45 |
| default_sql_cmd_timeout = 120 |
| |
| PERIODIC_TIMEOUT = 30 |
| |
| |
| def get_open_port(): |
| """Finds an open port. |
| |
| Warning: Race conditions abound so be ready to retry calling function; |
| |
| Example: |
| >>> while not some_test(port): |
| >>> port = get_open_port() |
| |
| Shamelessly based on: |
| https://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python |
| """ |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.bind(("", 0)) |
| s.listen(1) |
| port = s.getsockname()[1] |
| s.close() |
| return port |
| |
| |
| def node_range_to_list(node_expression): |
| """Converts a node range expression into a list of node names. |
| |
| Example: |
| >>> node_range_to_list('node[1,3-5]') |
| ['node1', 'node3', 'node4', 'node5'] |
| """ |
| |
| node_list = [] |
| output = run_command_output( |
| f"scontrol show hostnames {node_expression}", fatal=True, quiet=True |
| ) |
| for line in output.rstrip().splitlines(): |
| node_list.append(line) |
| return node_list |
| |
| |
| def node_list_to_range(node_list): |
| """Converts a list of node names to a node range expression. |
| |
| Example: |
| >>> node_list_to_range(['node1', 'node3', 'node4', 'node5']) |
| 'node[1,3-5]' |
| """ |
| |
| return run_command_output( |
| f"scontrol show hostlistsorted {','.join(node_list)}", fatal=True, quiet=True |
| ).rstrip() |
| |
| |
| def range_to_list(range_expression): |
| """Converts an integer range expression into a list of integers. |
| |
| Example: |
| >>> range_to_list('1,3-5') |
| [1, 3, 4, 5] |
| """ |
| |
| return list(map(int, node_range_to_list(f"[{range_expression}]"))) |
| |
| |
| def list_to_range(numeric_list): |
| """Converts a list of integers to an integer range expression. |
| |
| Example: |
| >>> list_to_range([1, 3, 4, 5]) |
| '1,3-5' |
| """ |
| |
| node_range_expression = node_list_to_range(map(str, numeric_list)) |
| return re.sub(r"^\[(.*)\]$", r"\1", node_range_expression) |
| |
| |
| def run_command( |
| command, |
| fatal=False, |
| timeout=default_command_timeout, |
| quiet=False, |
| chdir=None, |
| user=None, |
| input=None, |
| xfail=False, |
| env_vars=None, |
| ): |
| """Executes a command and returns a dictionary result. |
| |
| Args: |
| command (string): The command to execute. The command is run within a |
| bash subshell, so pipes, redirection, etc. are performed. |
| fatal (boolean): If True, a non-zero exit code (or zero if combined |
| with xfail) will result in the test failing. |
| timeout (integer): If the command does not exit before timeout number |
| of seconds, this function will return with an exit code of 110. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| chdir (directory): Change to the specified directory before executing |
| the command. |
| user (user name): Run the command as the specified user. This requires |
| the invoking user to have unprompted sudo rights. |
| input (string): The specified input is supplied to the command as stdin. |
| xfail (boolean): If True, the command is expected to fail. |
| env_vars (string): A string to set environmental variables that is |
| prepended to the command when run. |
| |
| Returns: |
| A dictionary containing the following keys: |
| start_time: epoch start time |
| duration: number of seconds the command ran for |
| exit_code: exit code for the command |
| stdout: command stdout as a string |
| stderr: command stderr as a string |
| |
| Example: |
| >>> run_command('ls -l', fatal=True) |
| {'command': 'ls -l', 'start_time': 1712268971.532, 'duration': 0.007, 'exit_code': 0, 'stdout': 'total 124\n-rw-rw-r-- 1 slurm slurm 118340 Apr 4 22:15 atf.py\n-rw-rw-r-- 1 slurm slurm 498 Apr 4 22:09 test.py\n-rw-rw-r-- 1 slurm slurm 1013 Apr 4 22:09 python_script.py\n', 'stderr': ''} |
| |
| >>> run_command('ls /non/existent/path', xfail=True) |
| {'command': 'ls /non/existent/path', 'start_time': 1712269123.2, 'duration': 0.005, 'exit_code': 2, 'stdout': '', 'stderr': "ls: cannot access '/non/existent/path': No such file or directory\n"} |
| |
| >>> run_command('sleep 5', timeout=2) |
| {'command': 'sleep 5', 'start_time': 1712269157.113, 'duration': 2.0, 'exit_code': 110, 'stdout': '', 'stderr': ''} |
| """ |
| |
| additional_run_kwargs = {} |
| if chdir is not None: |
| additional_run_kwargs["cwd"] = chdir |
| if input is not None: |
| additional_run_kwargs["input"] = input |
| if timeout is not None: |
| additional_run_kwargs["timeout"] = timeout |
| |
| if quiet: |
| log_command_level = logging.TRACE |
| log_details_level = logging.TRACE |
| else: |
| log_command_level = logging.NOTE |
| log_details_level = logging.DEBUG |
| |
| if env_vars is not None: |
| command = env_vars.strip() + " " + command |
| |
| start_time = time.time() |
| invocation_message = "Running command" |
| if user is not None: |
| invocation_message += f" as user {user}" |
| invocation_message += f": {command}" |
| logging.log(log_command_level, invocation_message) |
| try: |
| if user is not None and user != properties["test-user"]: |
| if not properties["sudo-rights"]: |
| pytest.skip( |
| "This test requires the test user to have unprompted sudo rights", |
| allow_module_level=True, |
| ) |
| cp = subprocess.run( |
| [ |
| "sudo", |
| "--preserve-env=PATH", |
| "-u", |
| user, |
| "/bin/bash", |
| "-lc", |
| command, |
| ], |
| capture_output=True, |
| text=True, |
| **additional_run_kwargs, |
| ) |
| else: |
| cp = subprocess.run( |
| command, |
| shell=True, |
| executable="/bin/bash", |
| capture_output=True, |
| text=True, |
| **additional_run_kwargs, |
| ) |
| end_time = time.time() |
| duration = end_time - start_time |
| exit_code = cp.returncode |
| stdout = cp.stdout |
| stderr = cp.stderr |
| except subprocess.TimeoutExpired as e: |
| duration = e.timeout |
| exit_code = errno.ETIMEDOUT |
| # These are byte objects, not strings |
| stdout = e.stdout.decode("utf-8") if e.stdout else "" |
| stderr = e.stderr.decode("utf-8") if e.stderr else "" |
| |
| if input is not None: |
| logging.log(log_details_level, f"Command input: {input}") |
| logging.log(log_details_level, f"Command exit code: {exit_code}") |
| logging.log(log_details_level, f"Command stdout: {stdout}") |
| logging.log(log_details_level, f"Command stderr: {stderr}") |
| logging.log(log_details_level, "Command duration: %.03f seconds", duration) |
| |
| message = "" |
| if exit_code == errno.ETIMEDOUT: |
| message = f'Command "{command}" timed out after {duration} seconds' |
| elif exit_code != 0 and not xfail: |
| message = f'Command "{command}" failed with rc={exit_code}' |
| elif exit_code == 0 and xfail: |
| message = f'Command "{command}" was expected to fail but succeeded' |
| if (exit_code != 0 and not xfail) or (exit_code == 0 and xfail): |
| if stderr != "" or stdout != "": |
| message += ":" |
| if stderr != "": |
| message += f" {stderr}" |
| if stdout != "": |
| message += f" {stdout}" |
| |
| if message != "": |
| message = message.rstrip() |
| if fatal: |
| pytest.fail(message) |
| elif not quiet: |
| logging.warning(message) |
| |
| results = {} |
| results["command"] = command |
| results["start_time"] = float(int(start_time * 1000)) / 1000 |
| results["duration"] = float(int(duration * 1000)) / 1000 |
| results["exit_code"] = exit_code |
| results["stdout"] = stdout |
| results["stderr"] = stderr |
| |
| return results |
| |
| |
| def run_command_error(command, **run_command_kwargs): |
| """Executes a command and returns the standard error. |
| |
| This function accepts the same arguments as run_command. |
| |
| Args: |
| command (string): The command to execute. The command is run within a |
| bash subshell, so pipes, redirection, etc. are performed. |
| |
| Returns: |
| The standard error (stderr) output of the command as a string. |
| |
| Example: |
| >>> run_command_error('ls /non/existent/path') |
| "ls: cannot access '/non/existent/path': No such file or directory\n" |
| |
| >>> run_command_error('grep foo /etc/passwd', quiet=True) |
| '' |
| |
| >>> run_command_error('echo error message >&2', xfail=True) |
| 'error message\n' |
| """ |
| |
| results = run_command(command, **run_command_kwargs) |
| |
| return results["stderr"] |
| |
| |
| def run_command_output(command, **run_command_kwargs): |
| """Executes a command and returns the standard output. |
| |
| This function accepts the same arguments as run_command. |
| |
| Args: |
| command (string): The command to execute. The command is run within a |
| Bash subshell, so pipes, redirection, etc. are performed. |
| |
| Returns: |
| The standard output (stdout) of the command as a string. |
| |
| Example: |
| >>> run_command_output('ls') |
| 'file1.txt\nfile2.txt\nscript.py\n' |
| |
| >>> run_command_output('echo Hello, World!') |
| 'Hello, World!\n' |
| |
| >>> run_command_output('grep foo /etc/passwd', xfail=True) |
| '' |
| """ |
| |
| results = run_command(command, **run_command_kwargs) |
| |
| return results["stdout"] |
| |
| |
| def run_command_exit(command, **run_command_kwargs): |
| """Executes a command and returns the exit code. |
| |
| This function accepts the same arguments as run_command. |
| |
| Args: |
| command (string): The command to execute. The command is run within a |
| Bash subshell, so pipes, redirection, etc. are performed. |
| |
| Returns: |
| The exit code of the command as an integer. |
| |
| Example: |
| >>> run_command_exit('ls') |
| 0 |
| |
| >>> run_command_exit('grep foo /etc/passwd', xfail=True) |
| 1 |
| |
| >>> run_command_exit('sleep 5', timeout=2) |
| 110 |
| """ |
| |
| results = run_command(command, **run_command_kwargs) |
| |
| return results["exit_code"] |
| |
| |
| def repeat_until( |
| callable, |
| condition, |
| timeout=default_polling_timeout, |
| poll_interval=None, |
| xfail=False, |
| fatal=False, |
| ): |
| """Repeats a callable until a condition is met or it times out. |
| |
| The callable returns an object that the condition operates on. |
| |
| Args: |
| callable (callable): Repeatedly called until the condition is met or |
| the timeout is reached. |
| condition (callable): A callable object that returns a boolean. This |
| function will return True when the condition call returns True. |
| timeout (integer): If timeout number of seconds expires before the |
| condition is met, return False. |
| poll_interval (float): Number of seconds to wait between condition |
| polls. This may be a decimal fraction. The default poll interval |
| depends on the timeout used, but varies between .1 and 1 seconds. |
| xfail (boolean): If True, a timeout is expected. |
| fatal (boolean): If True, the test will fail if condition is not met |
| (or if condition is met with xfail). |
| |
| Returns: |
| True if the condition is met by the timeout, False otherwise. |
| |
| Example: |
| >>> repeat_until(lambda : random.randint(1,10), lambda n: n == 5, timeout=30, poll_interval=1) |
| True |
| """ |
| |
| begin_time = time.time() |
| |
| if poll_interval is None: |
| if timeout <= 5: |
| poll_interval = 0.1 |
| elif timeout <= 10: |
| poll_interval = 0.2 |
| else: |
| poll_interval = 1 |
| |
| condition_met = False |
| while time.time() < begin_time + timeout: |
| if condition(callable()): |
| condition_met = True |
| break |
| time.sleep(poll_interval) |
| |
| if not xfail and not condition_met: |
| if fatal: |
| pytest.fail(f"Condition was not met within the {timeout} second timeout") |
| else: |
| logging.warning( |
| f"Condition was not met within the {timeout} second timeout" |
| ) |
| elif xfail and condition_met: |
| if fatal: |
| pytest.fail( |
| f"Condition was met within the {timeout} second timeout and wasn't expected" |
| ) |
| else: |
| logging.warning( |
| f"Condition was met within the {timeout} second timeout and wasn't expected" |
| ) |
| |
| return condition_met |
| |
| |
| def repeat_command_until(command, condition, quiet=True, **repeat_until_kwargs): |
| """Repeats a command until a condition is met or it times out. |
| |
| This function accepts the same arguments as repeat_until. |
| |
| Args: |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| True if the condition is met by the timeout, False otherwise. |
| |
| Example: |
| >>> repeat_command_until("scontrol ping", lambda results: re.search(r'is UP', results['stdout'])) |
| True |
| """ |
| |
| return repeat_until( |
| lambda: run_command(command, quiet=quiet), condition, **repeat_until_kwargs |
| ) |
| |
| |
| def pids_from_exe(executable): |
| """Finds process IDs (PIDs) of running processes with given executable name. |
| |
| Args: |
| executable (string): The name of the executable for which to find running processes. |
| |
| Returns: |
| A list of integer process IDs (PIDs) for running processes that match the given |
| executable name. |
| |
| Example: |
| >>> pids_from_exe('/usr/bin/python3') |
| [12345, 67890] |
| |
| >>> pids_from_exe('/usr/sbin/sshd') |
| [54321] |
| |
| >>> pids_from_exe('/bin/non-existent') |
| [] |
| """ |
| |
| # We have to elevate privileges here, but forking off thousands of sudo |
| # commands is expensive, so we will sudo a dynamic bash script for speed |
| script = f"""cd /proc |
| for pid in `ls -d1 [0-9]*`; |
| do if [[ "$(readlink $pid/exe)" = "{executable}" ]]; |
| then echo $pid; |
| fi |
| done""" |
| pids = [] |
| output = run_command_output(script, user="root", quiet=True) |
| for line in output.rstrip().splitlines(): |
| pids.append(int(line)) |
| return pids |
| |
| |
| def is_slurmrestd_running(): |
| """Checks if slurmrestd is running. |
| Needs to be run after the related properties are set. |
| """ |
| # TODO: We could check also if the required plugins/parsers in properties |
| # are in the returned specs, but the format still depends on the version. |
| # Once v0.0.39 is removed, we could add the extra check. |
| return repeat_until( |
| lambda: request_slurmrestd("openapi/v3"), lambda r: r.status_code == 200 |
| ) |
| |
| |
| def is_slurmctld_running(quiet=False): |
| """Checks whether slurmctld is running. |
| |
| Args: |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| True if the slurmctld is running, False otherwise. |
| |
| Example: |
| >>> is_slurmctld_running() |
| True |
| |
| >>> is_slurmctld_running(quiet=True) |
| True |
| """ |
| |
| # Check whether slurmctld is running |
| if re.search(r"is UP", run_command_output("scontrol ping", quiet=quiet)): |
| return True |
| |
| return False |
| |
| |
| def gcore(component, pid=None, sbin=True): |
| """Generates a gcore file for all pids running of a given Slurm component. |
| |
| The gcore file will be save at slurm-logs-dir, where all the logs and kernel |
| coredumps should be generated too. |
| |
| Args: |
| component (string): The name of the component. E.g. slurmctld, slurmdbd... |
| pid (integer): The specific PID of the process to gcore. All PIDs of component are gcored by default... |
| sbin: If True search for pids related to slurm-sbin-dir, or slurm-bin-dir otherwise. |
| |
| Returns: |
| None |
| """ |
| # Ensure that slurm-logs-dir is set. |
| if "slurm-logs-dir" not in properties: |
| properties["slurm-logs-dir"] = os.path.dirname( |
| get_config_parameter("SlurmctldLogFile", live=False, quiet=True) |
| ) |
| |
| if sbin: |
| prefix = properties["slurm-sbin-dir"] |
| else: |
| prefix = properties["slurm-bin-dir"] |
| |
| pids = pids_from_exe(f"{prefix}/{component}") |
| |
| if pid: |
| if pid not in pids: |
| logging.warning( |
| f"Requested PID {pid} is not in the obtained PIDs ({pids}), but using it anyway" |
| ) |
| pids = [pid] |
| |
| if not pids: |
| logging.warning(f"Process {prefix}/{component} not found") |
| logging.debug(f"Getting gcores for PIDs: {pids}") |
| for pid in pids: |
| run_command( |
| f"sudo gcore -o {properties['slurm-logs-dir']}/{component}.core {pid}" |
| ) |
| |
| |
| def start_slurmctld(clean=False, quiet=False, also_slurmds=False): |
| """Starts the Slurm controller daemon (slurmctld). |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| clean (boolean): If True, clears previous slurmctld state. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| also_slurmds (boolean): If True, also start all required slurmds. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> start_slurmctld() # Start slurmctld with default settings |
| >>> start_slurmctld(clean=True, quiet=True) # Start slurmctld with clean state and quiet logging |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to start slurmctld") |
| |
| logging.debug("Starting slurmctld...") |
| |
| if not is_slurmctld_running(quiet=quiet): |
| # Start slurmctld |
| command = f"{properties['slurm-sbin-dir']}/slurmctld" |
| if clean: |
| command += " -c -i" |
| results = run_command(command, user=properties["slurm-user"], quiet=quiet) |
| if results["exit_code"] != 0: |
| pytest.fail( |
| f"Unable to start slurmctld (rc={results['exit_code']}): {results['stderr']}" |
| ) |
| |
| # Verify that slurmctld is running |
| if not repeat_command_until( |
| "scontrol ping", lambda results: re.search(r"is UP", results["stdout"]) |
| ): |
| logging.warning( |
| "scontrol ping is not responding, trying to get slurmctld core file..." |
| ) |
| gcore("slurmctld") |
| pytest.fail("Slurmctld is not running") |
| else: |
| logging.debug("Slurmctld started successfully") |
| else: |
| logging.warning("Slurmctld was already started") |
| |
| if also_slurmds: |
| # Build list of slurmds |
| slurmd_list = [] |
| output = run_command_output( |
| f"perl -nle 'print $1 if /^NodeName=(\\S+)/' {properties['slurm-config-dir']}/slurm.conf", |
| user=properties["slurm-user"], |
| quiet=quiet, |
| ) |
| if not output: |
| pytest.fail("Unable to determine the slurmd node names") |
| for node_name_expression in output.rstrip().split("\n"): |
| if node_name_expression != "DEFAULT": |
| slurmd_list.extend(node_range_to_list(node_name_expression)) |
| |
| # (Multi)Slurmds |
| for slurmd_name in slurmd_list: |
| logging.debug(f"Starting slurmd for {slurmd_name}...") |
| # Check whether slurmd is running |
| slurmd_pgrep = run_command( |
| f"pgrep -f 'slurmd -N {slurmd_name}'", quiet=quiet |
| ) |
| if slurmd_pgrep["exit_code"] != 0: |
| # Start slurmd |
| results = run_command( |
| f"{properties['slurm-sbin-dir']}/slurmd -N {slurmd_name}", |
| user="root", |
| quiet=quiet, |
| ) |
| if results["exit_code"] != 0: |
| pytest.fail( |
| f"Unable to start slurmd -N {slurmd_name} (rc={results['exit_code']}): {results['stderr']}" |
| ) |
| |
| # Verify that the slurmd is running |
| if ( |
| run_command_exit(f"pgrep -f 'slurmd -N {slurmd_name}'", quiet=quiet) |
| != 0 |
| ): |
| pytest.fail(f"Slurmd -N {slurmd_name} is not running") |
| else: |
| logging.warning(f"slurmd for {slurmd_name} already running") |
| logging.warning(f"slurmd_pgrep['stdout']: {slurmd_pgrep['stdout']}") |
| logging.warning(f"slurmd_pgrep['stderr']: {slurmd_pgrep['stderr']}") |
| logging.warning( |
| f"slurmd_pgrep['exit_code']: {slurmd_pgrep['exit_code']}" |
| ) |
| |
| # Verify that the slurmd is registered correctly |
| if not repeat_until( |
| lambda: get_nodes(quiet=True), |
| lambda nodes: all(nodes[name]["state"] == ["IDLE"] for name in slurmd_list), |
| ): |
| nodes = get_nodes(quiet=True) |
| non_idle = [ |
| name for name in slurmd_list if nodes[name]["state"] != ["IDLE"] |
| ] |
| logging.warning( |
| f"Getting the core files of the still not IDLE slurmds ({non_idle})" |
| ) |
| for node in non_idle: |
| pid = run_command_output( |
| f"pgrep -f 'slurmd -N {slurmd_name}'", quiet=quiet |
| ).strip() |
| gcore("slurmd", pid) |
| pytest.fail(f"Some nodes are not IDLE: {non_idle}") |
| |
| logging.debug(f"All nodes are IDLE: {slurmd_list}") |
| |
| |
| def start_slurmdbd(clean=False, quiet=False): |
| """Starts the Slurm DB daemon (slurmdbd). |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| clean (boolean): If True, clears previous slurmdbd state. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| """ |
| if not properties["auto-config"]: |
| require_auto_config("wants to start slurmdbd") |
| |
| logging.debug("Starting slurmdbd...") |
| |
| if ( |
| run_command_exit( |
| "sacctmgr show cluster", user=properties["slurm-user"], quiet=quiet |
| ) |
| != 0 |
| ): |
| # Start slurmdbd |
| results = run_command( |
| f"{properties['slurm-sbin-dir']}/slurmdbd", |
| user=properties["slurm-user"], |
| quiet=quiet, |
| ) |
| if results["exit_code"] != 0: |
| pytest.fail( |
| f"Unable to start slurmdbd (rc={results['exit_code']}): {results['stderr']}" |
| ) |
| |
| # Verify that slurmdbd is running |
| if not repeat_command_until( |
| "sacctmgr show cluster", lambda results: results["exit_code"] == 0 |
| ): |
| logging.warning( |
| "sacctmgr show cluster is not responding, trying to get slurmdbd core file..." |
| ) |
| gcore("slurmdbd") |
| pytest.fail("Slurmdbd is not running") |
| else: |
| logging.debug("Slurmdbd started successfully") |
| |
| |
| def start_slurm(clean=False, quiet=False): |
| """Starts all applicable Slurm daemons. |
| |
| This function examines the Slurm configuration files to determine which daemons |
| need to be started. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| clean (boolean): If True, clears previous slurmctld state. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> start_slurm() # Start all Slurm daemons with default settings |
| >>> start_slurm(clean=True, quiet=True) # Start all Slurm daemons with clean state and quiet logging |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to start slurm") |
| |
| # Determine whether slurmdbd should be included |
| if ( |
| get_config_parameter("AccountingStorageType", live=False, quiet=quiet) |
| == "accounting_storage/slurmdbd" |
| ): |
| start_slurmdbd(clean, quiet) |
| |
| # Remove unnecessary default node0 from config to avoid being used or reserved |
| output = run_command_output( |
| f"cat {properties['slurm-config-dir']}/slurm.conf", |
| user=properties["slurm-user"], |
| quiet=quiet, |
| ) |
| if len(re.findall(r"NodeName=", output)) > 1: |
| run_command( |
| f"sed -i '/NodeName=node0 /d' {properties['slurm-config-dir']}/slurm.conf", |
| user=properties["slurm-user"], |
| quiet=quiet, |
| ) |
| |
| # Start slurmctld |
| start_slurmctld(clean, quiet, also_slurmds=True) |
| |
| # Start slurmrestd if required |
| if properties["slurmrestd-started"]: |
| start_slurmrestd() |
| |
| |
| def stop_slurmctld(quiet=False, also_slurmds=False): |
| """Stops the Slurm controller daemon (slurmctld). |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| also_slurmds (boolean): If True, also stop all slurmds. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> stop_slurmctld() # Stop slurmctld with default logging |
| >>> stop_slurmctld(quiet=True) # Stop slurmctld with quiet logging |
| """ |
| |
| rc = None |
| failures = [] |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to stop slurmctld") |
| |
| # Stop slurmctld |
| command = "scontrol shutdown" |
| if not also_slurmds: |
| command += " slurmctld" |
| logging.debug("Stopping slurmctld...") |
| else: |
| logging.debug("Stopping slurmctld and slurmds...") |
| |
| results = run_command(command, user=properties["slurm-user"], quiet=quiet) |
| if results["exit_code"] != 0: |
| failures.append(f"Command {command} failed with rc={results['exit_code']}") |
| |
| # Verify that slurmctld is not running |
| if not repeat_until( |
| lambda: pids_from_exe(f"{properties['slurm-sbin-dir']}/slurmctld"), |
| lambda pids: len(pids) == 0, |
| ): |
| failures.append("Slurmctld is still running") |
| logging.warning("Getting the core files of the still running slurmctld") |
| gcore("slurmctld") |
| else: |
| logging.debug("No slurmctld is running.") |
| |
| if also_slurmds: |
| if get_version("sbin/slurmd") < (24, 11): |
| # FIXED: t20764. |
| slurmd_list = [] |
| output = run_command_output( |
| f"perl -nle 'print $1 if /^NodeName=(\\S+)/' {properties['slurm-config-dir']}/slurm.conf", |
| quiet=quiet, |
| ) |
| if not output: |
| failures.append("Unable to determine the slurmd node names") |
| else: |
| for node_name_expression in output.rstrip().split("\n"): |
| if node_name_expression != "DEFAULT": |
| slurmd_list.extend(node_range_to_list(node_name_expression)) |
| |
| for slurmd_name in slurmd_list: |
| run_command( |
| f"sudo systemctl stop {slurmd_name}_slurmstepd.scope", quiet=quiet |
| ) |
| |
| # Verify that slurmds are not running |
| if not repeat_until( |
| lambda: pids_from_exe(f"{properties['slurm-sbin-dir']}/slurmd"), |
| lambda pids: len(pids) == 0, |
| ): |
| failures.append("Some slurmds are still running") |
| logging.warning("Getting the core files of the still running slurmds") |
| gcore("slurmd") |
| else: |
| logging.debug("No slurmd is running.") |
| |
| if failures: |
| rc = failures |
| |
| return rc |
| |
| |
| def stop_slurmdbd(quiet=False): |
| """Stops the Slurm DB daemon (slurmdbd). |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| """ |
| |
| rc = None |
| failures = [] |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to stop slurmdbd") |
| |
| logging.debug("Stopping slurmdbd...") |
| |
| # Stop slurmdbd |
| results = run_command( |
| "sacctmgr shutdown", user=properties["slurm-user"], quiet=quiet |
| ) |
| if results["exit_code"] != 0: |
| failures.append( |
| f"Command \"sacctmgr shutdown\" failed with rc={results['exit_code']}" |
| ) |
| |
| # Verify that slurmdbd is not running (we might have to wait for rollups to complete) |
| if not repeat_until( |
| lambda: pids_from_exe(f"{properties['slurm-sbin-dir']}/slurmdbd"), |
| lambda pids: len(pids) == 0, |
| timeout=60, |
| ): |
| failures.append("Slurmdbd is still running") |
| logging.warning("Getting the core files of the still running slurmdbd") |
| gcore("slurmdbd") |
| else: |
| logging.debug("No slurmdbd is running.") |
| |
| if failures: |
| rc = failures |
| |
| return rc |
| |
| |
| def stop_slurm(fatal=True, quiet=False): |
| """Stops all applicable Slurm daemons. |
| |
| This function examines the Slurm configuration files to determine which daemons |
| need to be stopped. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| fatal (boolean): If True, a failure to stop all daemons will result in the |
| test failing. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| True if all Slurm daemons were stopped, False otherwise. |
| |
| Example: |
| >>> stop_slurm() # Stop all Slurm daemons with default settings |
| True |
| |
| >>> stop_slurm(fatal=False, quiet=True) # Stop all Slurm daemons with non-fatal failures and quiet logging |
| False |
| """ |
| |
| failures = [] |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to stop slurm") |
| |
| # Determine whether slurmdbd should be included |
| if ( |
| get_config_parameter("AccountingStorageType", live=False, quiet=quiet) |
| == "accounting_storage/slurmdbd" |
| ): |
| err = stop_slurmdbd(quiet) |
| if err: |
| failures.extend(err) |
| |
| # Stop slurmctld and slurmds |
| err = stop_slurmctld(quiet=quiet, also_slurmds=True) |
| if err: |
| failures.extend(err) |
| |
| # Stop slurmrestd if was started |
| if properties["slurmrestd-started"]: |
| properties["slurmrestd"].send_signal(signal.SIGINT) |
| try: |
| properties["slurmrestd"].wait(timeout=60) |
| except Exception: |
| properties["slurmrestd"].kill() |
| properties["slurmrestd_log"].close() |
| |
| if failures: |
| for fail in failures: |
| logging.warning(fail) |
| if fatal: |
| pytest.fail(failures[0]) |
| |
| return False |
| else: |
| return True |
| |
| |
| def restart_slurmctld(clean=False, quiet=False): |
| """Restarts the Slurm controller daemon (slurmctld). |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| clean (boolean): If True, clears previous slurmctld state. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> restart_slurmctld() # Restart slurmctld with default settings |
| >>> restart_slurmctld(clean=True, quiet=True) # Restart slurmctld with clean state and quiet logging |
| """ |
| |
| stop_slurmctld(quiet=quiet) |
| start_slurmctld(clean=clean, quiet=quiet) |
| |
| |
| def restart_slurm(clean=False, quiet=False): |
| """Restarts all applicable Slurm daemons. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| clean (boolean): If True, clears previous slurmctld state. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> restart_slurm() # Restart all Slurm daemons with default settings |
| >>> restart_slurm(clean=True, quiet=True) # Restart all Slurm daemons with clean state and quiet logging |
| """ |
| |
| stop_slurm(quiet=quiet) |
| start_slurm(clean=clean, quiet=quiet) |
| |
| |
| def require_slurm_running(): |
| """Ensures that the Slurm daemons are running. |
| |
| In local-config mode, the test is skipped if Slurm is not running. |
| In auto-config mode, Slurm is started if necessary. |
| |
| In order to avoid multiple restarts of Slurm (in auto-config), this function |
| should be called at the end of the setup preconditions. |
| |
| Args: |
| None |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_slurm_running() # Ensure Slurm is running or start it in auto-config mode |
| """ |
| |
| global nodes |
| |
| if properties["auto-config"]: |
| if not is_slurmctld_running(quiet=True): |
| |
| # Check and report the mixed components |
| versions = dict() |
| versions["slurmdbd"] = get_version("sbin/slurmdbd") |
| versions["slurmctld"] = get_version("sbin/slurmctld") |
| versions["slurmd"] = get_version("sbin/slurmd") |
| versions["scontrol"] = get_version("bin/scontrol") |
| if len(set(versions.values())) == 1: |
| logging.info( |
| f"Starting Slurm with all components in the same version: {versions['slurmctld']}" |
| ) |
| else: |
| logging.info(f"Starting Slurm in a mixed version setup: {versions}") |
| |
| properties["slurm-started"] = True |
| start_slurm(clean=True, quiet=True) |
| else: |
| if not is_slurmctld_running(quiet=True): |
| pytest.skip( |
| "This test requires slurm to be running", allow_module_level=True |
| ) |
| |
| # As a side effect, build up initial nodes dictionary |
| nodes = get_nodes(quiet=True) |
| |
| |
| def is_upgrade_setup( |
| old_slurm_prefix="/opt/slurm-old", |
| new_slurm_prefix="/opt/slurm-new", |
| old_build_prefix="", |
| new_build_prefix="", |
| old_source_prefix="", |
| new_source_prefix="", |
| force_old=False, |
| ): |
| """ |
| Return True if we have two Slurms configured in the system. |
| """ |
| |
| if not os.path.exists(old_slurm_prefix): |
| logging.debug(f"Old prefix {old_slurm_prefix} not exists.") |
| return False |
| |
| if not os.path.exists(new_slurm_prefix): |
| logging.debug(f"New prefix {new_slurm_prefix} not exists.") |
| return False |
| |
| # Add the right properties |
| setup_upgrades( |
| old_slurm_prefix, |
| new_slurm_prefix, |
| old_build_prefix, |
| new_build_prefix, |
| old_source_prefix, |
| new_source_prefix, |
| force_old, |
| ) |
| return True |
| |
| |
| def require_upgrades( |
| old_slurm_prefix="/opt/slurm-old", |
| new_slurm_prefix="/opt/slurm-new", |
| old_build_prefix="", |
| new_build_prefix="", |
| old_source_prefix="", |
| new_source_prefix="", |
| force_old=True, |
| ): |
| """Checks if has two different versions installed. |
| |
| If they are not, skip. |
| """ |
| if not properties["auto-config"]: |
| require_auto_config("to change/upgrade Slurm setup") |
| |
| if not is_upgrade_setup( |
| old_slurm_prefix, |
| new_slurm_prefix, |
| old_build_prefix, |
| new_build_prefix, |
| old_source_prefix, |
| new_source_prefix, |
| force_old, |
| ): |
| pytest.skip("This test needs an upgrade setup") |
| |
| # Double-check that old_version <= new_version |
| old_version = get_version(slurm_prefix=old_slurm_prefix) |
| new_version = get_version(slurm_prefix=new_slurm_prefix) |
| if old_version > new_version: |
| pytest.skip( |
| f"Old version ({old_version}) has to be older than new version ({new_version})" |
| ) |
| logging.info(f"Required upgrade setup found: {old_version} and {new_version}") |
| |
| |
| def setup_upgrades( |
| old_slurm_prefix="/opt/slurm-old", |
| new_slurm_prefix="/opt/slurm-new", |
| old_build_prefix="", |
| new_build_prefix="", |
| old_source_prefix="", |
| new_source_prefix="", |
| force_old=False, |
| ): |
| """ |
| Adds the necessary atf.properties[] with the old/new paths. |
| If force_old is specified itt also update the links pointing to the old |
| paths, and they will be restored in the global teardown. |
| """ |
| # TODO: We should use slurm-new(-build) instead of slurm-git(-build) |
| if old_build_prefix == "": |
| old_build_prefix = properties["slurm-build-dir"] |
| if new_build_prefix == "": |
| new_build_prefix = f"{properties['slurm-build-dir']}/../slurm-git-build" |
| if old_source_prefix == "": |
| old_source_prefix = properties["slurm-source-dir"] |
| if new_source_prefix == "": |
| new_source_prefix = f"{properties['slurm-source-dir']}/../slurm-git" |
| |
| properties["old-slurm-prefix"] = old_slurm_prefix |
| properties["new-slurm-prefix"] = new_slurm_prefix |
| |
| properties["old-build-prefix"] = old_build_prefix |
| properties["new-build-prefix"] = new_build_prefix |
| |
| properties["old-source-prefix"] = old_source_prefix |
| properties["new-source-prefix"] = new_source_prefix |
| |
| properties["forced_upgrade_setup"] = force_old |
| |
| if force_old: |
| logging.debug( |
| "Setting bin/ and sbin/ pointing to old version and saving a backup..." |
| ) |
| run_command( |
| f"sudo mv {properties['slurm-sbin-dir']} {module_tmp_path}/upgrade-sbin", |
| quiet=True, |
| fatal=True, |
| ) |
| run_command( |
| f"sudo mv {properties['slurm-bin-dir']} {module_tmp_path}/upgrade-bin", |
| quiet=True, |
| fatal=True, |
| ) |
| run_command( |
| f"sudo mkdir {properties['slurm-sbin-dir']} {properties['slurm-bin-dir']}", |
| quiet=True, |
| fatal=True, |
| ) |
| run_command( |
| f"sudo ln -s {properties['old-slurm-prefix']}/sbin/* {properties['slurm-sbin-dir']}/", |
| quiet=True, |
| fatal=True, |
| ) |
| run_command( |
| f"sudo ln -s {properties['old-slurm-prefix']}/bin/* {properties['slurm-bin-dir']}/", |
| quiet=True, |
| fatal=True, |
| ) |
| |
| |
| def upgrade_component(component, new_version=True): |
| """Upgrades a component creating the required links, and restarts it if necessary. |
| |
| This function needs require_upgrades() to be already run to work properly. |
| |
| Args: |
| component (string): The bin/ or sbin/ component of Slurm to check. |
| new_version (boolean): Set it false to downgrade to the older version instead. |
| |
| Returns: |
| A tuple representing the version. E.g. (25.05.0). |
| """ |
| |
| if ( |
| "old-slurm-prefix" not in properties.keys() |
| or "new-slurm-prefix" not in properties.keys() |
| ): |
| pytest.fail("To upgrade_components() first we need to call require_upgrades()") |
| |
| if not os.path.exists(f"{properties['slurm-prefix']}/{component}"): |
| pytest.fail(f"Unknown or not existing {component}") |
| |
| if new_version: |
| upgrade_prefix = properties["new-slurm-prefix"] |
| else: |
| upgrade_prefix = properties["old-slurm-prefix"] |
| |
| # Stop components when necessary |
| if component == "sbin/slurmdbd": |
| stop_slurmdbd() |
| elif component == "sbin/slurmctld": |
| stop_slurmctld() |
| |
| run_command( |
| f"sudo rm -f {properties['slurm-prefix']}/{component}", |
| quiet=True, |
| fatal=True, |
| ) |
| run_command( |
| f"sudo ln -s {upgrade_prefix}/{component} {properties['slurm-prefix']}/{component}", |
| quiet=True, |
| fatal=True, |
| ) |
| |
| # Restart components when necessary |
| if component == "sbin/slurmdbd": |
| start_slurmdbd() |
| elif component == "sbin/slurmctld": |
| start_slurmctld() |
| |
| |
| def get_version(component="sbin/slurmctld", slurm_prefix=""): |
| """Returns the version of the Slurm component as a tuple. |
| |
| It calls the component with -V and converts the output into a tuple. |
| |
| Args: |
| component (string): The bin/ or sbin/ component of Slurm to check. |
| It also supports "config.h" to obtain the VERSION in the header. |
| slurm_prefix (string): The path where the component is. By default the defined in testsuite.conf. |
| If component is "config.h", then it's the build dir. |
| |
| Returns: |
| A tuple representing the version. E.g. (25.05.0). |
| """ |
| if component == "config.h": |
| if slurm_prefix == "": |
| slurm_prefix = properties["slurm-build-dir"] |
| header = pathlib.Path(f"{slurm_prefix}/config.h") |
| if not header.exists(): |
| pytest.fail("Unable to access to config.h to get Slurm version") |
| |
| version_str = re.search( |
| r'#define\s+VERSION\s+"([^"]+)"', header.read_text() |
| ).group(1) |
| |
| else: |
| if slurm_prefix == "": |
| slurm_prefix = f"{properties['slurm-sbin-dir']}/.." |
| |
| version_str = ( |
| run_command_output(f"sudo {slurm_prefix}/{component} -V", quiet=True) |
| .strip() |
| .replace("slurm ", "") |
| ) |
| |
| return tuple(int(part) if part.isdigit() else 0 for part in version_str.split(".")) |
| |
| |
| def require_version(version, component="sbin/slurmctld", slurm_prefix=""): |
| """Checks if the component is at least the required version, or skips. |
| |
| Args: |
| version (tuple): The tuple representing the version. |
| component (string): The bin/ or sbin/ component of Slurm to check. |
| slurm_prefix (string): The path where the component is. By default the defined in testsuite.conf. |
| |
| Returns: |
| A tuple representing the version. E.g. (25.05.0). |
| """ |
| component_version = get_version(component, slurm_prefix) |
| if component_version < version: |
| pytest.skip( |
| f"The version of {component} is {component_version}, required is {version}" |
| ) |
| |
| |
| def request_slurmrestd(request): |
| """Returns the slurmrestd response of a given request. |
| It needs slurmrestd to be running (see require_slurmrestd()) |
| """ |
| return requests.get( |
| f"{properties['slurmrestd_url']}/{request}", |
| headers=properties["slurmrestd-headers"], |
| ) |
| |
| |
| def require_openapi_generator(version="7.3.0"): |
| """Generates an OpenAPI client using OpenAPI-Generator, or skips if not available (even in auto-config). |
| It needs slurmrestd to be running (see require_slurmrestd()). |
| It also sets the necessary OPENAPI_GENERATOR_VERSION and JAVA_OPTS |
| environment variables. |
| Args: |
| version (string): the required version. |
| |
| Returns: |
| None |
| """ |
| |
| # Require specific testing version |
| os.environ["OPENAPI_GENERATOR_VERSION"] = version |
| |
| # Work around: https://github.com/OpenAPITools/openapi-generator/issues/13684 |
| os.environ["JAVA_OPTS"] = ( |
| "--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED" |
| ) |
| |
| ogc_version = ( |
| run_command_output("openapi-generator-cli version").strip().split("\n")[-1] |
| ) |
| if ogc_version != version: |
| pytest.skip( |
| f"test requires openapi-generator-cli version {version} (not {ogc_version})", |
| allow_module_level=True, |
| ) |
| |
| # allow pointing to an existing OpenAPI generated client |
| opath = module_tmp_path |
| if "SLURM_TESTSUITE_OPENAPI_CLIENT" in os.environ: |
| opath = os.environ["SLURM_TESTSUITE_OPENAPI_CLIENT"] |
| |
| pyapi_path = f"{opath}/pyapi/" |
| spec_path = f"{opath}/openapi.json" |
| |
| # Always create path if needed |
| os.makedirs(opath, exist_ok=True) |
| |
| if not os.path.exists(spec_path): |
| r = request_slurmrestd("openapi/v3") |
| if r.status_code != 200: |
| pytest.fail(f"Error requesting openapi specs from slurmrestd: {r}") |
| |
| with open(spec_path, "w") as f: |
| f.write(r.text) |
| f.close() |
| |
| if not os.path.exists(pyapi_path): |
| run_command( |
| f"openapi-generator-cli generate -i '{spec_path}' -g python-pydantic-v1 --strict-spec=true -o '{pyapi_path}'", |
| fatal=True, |
| timeout=60, |
| ) |
| |
| sys.path.insert(0, pyapi_path) |
| |
| # Re-import openapi_client |
| # Regular import doesn't work if was already imported by another test. |
| global openapi_client |
| module_name = "openapi_client" |
| module_prefix = module_name + "." |
| for mod in list(sys.modules): |
| if mod == module_name or mod.startswith(module_prefix): |
| del sys.modules[mod] |
| openapi_client = importlib.import_module(module_name) |
| importlib.reload(openapi_client) |
| |
| properties["openapi_config"] = openapi_client.Configuration() |
| properties["openapi_config"].host = properties["slurmrestd_url"] |
| properties["openapi_config"].access_token = properties["slurmrestd-headers"][ |
| "X-SLURM-USER-TOKEN" |
| ] |
| |
| |
| def openapi_slurm(): |
| """ |
| Returns a SlurmApi client from OpenAPI. |
| It needs require_openapi_generator() to be run first. |
| """ |
| return openapi_client.SlurmApi( |
| openapi_client.ApiClient(properties["openapi_config"]) |
| ) |
| |
| |
| def openapi_slurmdb(): |
| """ |
| Returns a SlurmdbApi client from OpenAPI. |
| It needs require_openapi_generator() to be run first. |
| """ |
| return openapi_client.SlurmdbApi( |
| openapi_client.ApiClient(properties["openapi_config"]) |
| ) |
| |
| |
| def backup_config_file(config="slurm"): |
| """Backs up a configuration file. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| config (string): Name of the config file to back up (without the .conf suffix). |
| |
| Returns: |
| None |
| |
| Example: |
| >>> backup_config_file('slurm') |
| >>> backup_config_file('gres') |
| >>> backup_config_file('cgroup') |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config(f"wants to modify the {config} configuration file") |
| |
| properties["configurations-modified"].add(config) |
| |
| config_file = f"{properties['slurm-config-dir']}/{config}.conf" |
| backup_config_file = f"{config_file}.orig-atf" |
| |
| # If a backup already exists, issue a warning and return (honor existing backup) |
| if os.path.isfile(backup_config_file): |
| logging.trace(f"Backup file already exists ({backup_config_file})") |
| return |
| |
| # If the file to backup does not exist, touch an empty backup file with |
| # the sticky bit set. restore_config_file will remove the file. |
| if not os.path.isfile(config_file): |
| run_command( |
| f"touch {backup_config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| run_command( |
| f"chmod 1000 {backup_config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Otherwise, copy the config file to the backup |
| else: |
| run_command( |
| f"cp {config_file} {backup_config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| |
| def restore_config_file(config="slurm"): |
| """Restores a configuration file. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| config (string): Name of config file to restore (without the .conf suffix). |
| |
| Returns: |
| None |
| |
| Example: |
| >>> restore_config_file('slurm') |
| >>> restore_config_file('gres') |
| >>> restore_config_file('cgroup') |
| """ |
| |
| config_file = f"{properties['slurm-config-dir']}/{config}.conf" |
| backup_config_file = f"{config_file}.orig-atf" |
| |
| properties["configurations-modified"].remove(config) |
| |
| # If backup file doesn't exist, it has probably already been |
| # restored by a previous call to restore_config_file |
| if not os.path.isfile(backup_config_file): |
| logging.trace( |
| f"Backup file does not exist for {config_file}. It has probably already been restored." |
| ) |
| return |
| |
| # If the sticky bit is set and the file is empty, remove both the file and the backup |
| backup_stat = os.stat(backup_config_file) |
| if backup_stat.st_size == 0 and backup_stat.st_mode & stat.S_ISVTX: |
| run_command( |
| f"rm -f {backup_config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| if os.path.isfile(config_file): |
| run_command( |
| f"rm -f {config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Otherwise, copy backup config file to primary config file |
| # and remove the backup (.orig-atf) |
| else: |
| run_command( |
| f"cp {backup_config_file} {config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| run_command( |
| f"rm -f {backup_config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| |
| def get_config(live=True, source="slurm", quiet=False, delimiter="="): |
| """Returns the Slurm configuration as a dictionary. |
| |
| Args: |
| live (boolean): |
| If True, the configuration information is obtained via |
| a query to the relevant Slurm daemon (e.g., scontrol show config). |
| If False, the configuration information is obtained by directly |
| parsing the relevant Slurm configuration file (e.g. slurm.conf). |
| source (string): |
| If live is True, source should be either scontrol or sacctmgr. |
| If live is False, source should be the name of the config file |
| without the .conf prefix (e.g. slurmdbd). |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| delimiter (string): The delimiter between the parameter name and the value. |
| |
| Returns: |
| A dictionary comprised of the parameter names and their values. |
| For parameters that can have multiple lines and subparameters, |
| the dictionary value will be a dictionary of dictionaries. |
| |
| Example: |
| >>> get_config() |
| {'AccountingStorageBackupHost': '(null)', 'AccountingStorageEnforce': 'none', 'AccountingStorageHost': 'localhost', 'AccountingStorageExternalHost': '(null)', 'AccountingStorageParameters': '(null)', 'AccountingStoragePort': '0', 'AccountingStorageTRES': 'cpu,mem,energy,node,billing,fs/disk,vmem,pages', ...} |
| >>> get_config(live=False, source='slurm') |
| {'SlurmctldHost': 'nathan-atf-docstrings', 'SlurmUser': 'slurm', 'SlurmctldLogFile': '/var/slurm/log/slurmctld.log', 'SlurmctldPidFile': '/var/slurm/run/slurmctld.pid', 'SlurmctldDebug': 'debug3', 'SlurmdLogFile': '/var/slurm/log/slurmd.%n.log', 'SlurmdPidFile': '/var/slurm/run/slurmd.%n.pid', ...} |
| >>> get_config(live=True, source='scontrol', quiet=True) |
| {'AccountingStorageBackupHost': '(null)', 'AccountingStorageEnforce': 'none', 'AccountingStorageHost': 'localhost', 'AccountingStorageExternalHost': '(null)', 'AccountingStorageParameters': '(null)', 'AccountingStoragePort': '0', 'AccountingStorageTRES': 'cpu,mem,energy,node,billing,fs/disk,vmem,pages', ...} |
| """ |
| |
| slurm_dict = {} |
| |
| if live: |
| if source == "slurm" or source == "controller" or source == "scontrol": |
| command = "scontrol" |
| elif source == "slurmdbd" or source == "dbd" or source == "sacctmgr": |
| command = "sacctmgr" |
| else: |
| pytest.fail(f"Invalid live source value ({source})") |
| |
| output = run_command_output(f"{command} show config", fatal=True, quiet=quiet) |
| |
| for line in output.splitlines(): |
| if match := re.search(rf"^\s*(\S+)\s*{re.escape(delimiter)}\s*(.*)$", line): |
| slurm_dict[match.group(1)] = match.group(2).rstrip() |
| else: |
| config = source |
| config_file = f"{properties['slurm-config-dir']}/{config}.conf" |
| |
| # We might be looking for parameters in a config file that has not |
| # been created yet. If so, we just want this to return an empty dict |
| output = run_command_output( |
| f"cat {config_file}", user=properties["slurm-user"], quiet=quiet |
| ) |
| for line in output.splitlines(): |
| if match := re.search(rf"^\s*(\S+)\s*{re.escape(delimiter)}\s*(.*)$", line): |
| parameter_name, parameter_value = ( |
| match.group(1), |
| match.group(2).rstrip(), |
| ) |
| if parameter_name.lower() in [ |
| "downnodes", |
| "name", |
| "nodename", |
| "nodeset", |
| "partitionname", |
| "switchname", |
| ]: |
| instance_name, subparameters = parameter_value.split(" ", 1) |
| subparameters_dict = {} |
| for subparameter_name, subparameter_value in re.findall( |
| rf" *([^= ]+) *{re.escape(delimiter)} *([^ ]+)", subparameters |
| ): |
| # Reformat the value if necessary |
| if is_integer(subparameter_value): |
| subparameter_value = int(subparameter_value) |
| elif is_float(subparameter_value): |
| subparameter_value = float(subparameter_value) |
| elif subparameter_value == "(null)": |
| subparameter_value = None |
| subparameters_dict[subparameter_name] = subparameter_value |
| if parameter_name not in slurm_dict: |
| slurm_dict[parameter_name] = {} |
| slurm_dict[parameter_name][instance_name] = subparameters_dict |
| else: |
| # Reformat the value if necessary |
| if is_integer(parameter_value): |
| parameter_value = int(parameter_value) |
| elif is_float(parameter_value): |
| parameter_value = float(parameter_value) |
| elif parameter_value == "(null)": |
| parameter_value = None |
| slurm_dict[parameter_name] = parameter_value |
| |
| return slurm_dict |
| |
| |
| def get_config_parameter(name, default=None, **get_config_kwargs): |
| """Obtains the value for a Slurm configuration parameter. |
| |
| This function accepts the same arguments as get_config. |
| |
| Args: |
| name (string): The parameter name. |
| default (string or None): This value is returned if the parameter |
| is not found. |
| |
| Returns: |
| The value of the specified parameter, or the default if not found. |
| |
| Example: |
| >>> get_config_parameter('JobAcctGatherFrequency') |
| '30' |
| >>> get_config_parameter('MaxJobCount', default='10000') |
| '10000' |
| >>> get_config_parameter('partitionname', default='debug', live=True, source='scontrol') |
| 'debug' |
| """ |
| |
| config_dict = get_config(**get_config_kwargs) |
| |
| # Convert keys to lower case so we can do a case-insensitive search |
| lower_dict = dict( |
| (key.casefold(), str(value).casefold()) for key, value in config_dict.items() |
| ) |
| |
| if name.casefold() in lower_dict: |
| return lower_dict[name.casefold()] |
| else: |
| return default |
| |
| |
| def config_parameter_includes(name, value, **get_config_kwargs): |
| """Checks whether a configuration parameter includes a specific value. |
| |
| When a parameter may contain a comma-separated list of values, this |
| function can be used to determine whether a specific value is within |
| the list. |
| |
| This function accepts the same arguments as get_config. |
| |
| Args: |
| name (string): The parameter name. |
| value (string): The value you are looking for. |
| |
| Returns: |
| True if the specified string value is found within the parameter |
| value list, False otherwise. |
| |
| Example: |
| >>> config_parameter_includes('SlurmdParameters', 'config_overrides') |
| False |
| """ |
| |
| config_dict = get_config(**get_config_kwargs) |
| |
| # Convert keys to lower case so we can do a case-insensitive search |
| lower_dict = dict((key.casefold(), value) for key, value in config_dict.items()) |
| |
| if name.casefold() in lower_dict and value.lower() in map( |
| str.lower, lower_dict[name.casefold()].split(",") |
| ): |
| return True |
| else: |
| return False |
| |
| |
| def set_config_parameter( |
| parameter_name, |
| parameter_value, |
| source="slurm", |
| restart=False, |
| delimiter="=", |
| ): |
| """Sets the value of the specified configuration parameter. |
| |
| This function modifies the specified slurm configuration file and |
| reconfigures slurm (or restarts slurm if restart=True). A backup |
| is automatically created and the original configuration is restored |
| after the test completes. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| parameter_name (string): The parameter name. |
| parameter_value (string): The parameter value. |
| Use a value of None to unset a parameter. |
| source (string): Name of the config file without the .conf prefix. |
| restart (boolean): If True and slurm is running, slurm will be |
| restarted rather than reconfigured. |
| delimiter (string): The delimiter between the parameter name and the value. |
| |
| Note: |
| When setting a complex parameter (one which may be repeated and has |
| its own subparameters, such as with nodes, partitions and gres), |
| the parameter_value should be a dictionary of dictionaries. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> set_config_parameter("ClusterName", "cluster1") |
| >>> set_config_parameter("required", "/tmp/spank_plugin.so", source="plugstack", delimiter=" ") |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to modify parameters") |
| |
| if source == "dbd": |
| config = "slurmdbd" |
| else: |
| config = source |
| |
| config_file = f"{properties['slurm-config-dir']}/{config}.conf" |
| |
| # This has the side-effect of adding config to configurations-modified |
| backup_config_file(config) |
| |
| # Remove all matching parameters and append the new parameter |
| lines = [] |
| output = run_command_output( |
| f"cat {config_file}", user=properties["slurm-user"], quiet=True |
| ) |
| for line in output.splitlines(): |
| if not re.search(rf"(?i)^\s*{parameter_name}\s*{re.escape(delimiter)}", line): |
| lines.append(line) |
| if isinstance(parameter_value, dict): |
| for instance_name in parameter_value: |
| line = f"{parameter_name}{delimiter}{instance_name}" |
| for subparameter_name, subparameter_value in parameter_value[ |
| instance_name |
| ].items(): |
| line += f" {subparameter_name}{delimiter}{subparameter_value}" |
| lines.append(line) |
| elif parameter_value is not None: |
| lines.append(f"{parameter_name}{delimiter}{parameter_value}") |
| input = "\n".join(lines) |
| run_command( |
| f"cat > {config_file}", |
| input=input, |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| slurmctld_running = is_slurmctld_running(quiet=True) |
| |
| # Remove clustername state file if we aim to change the cluster name |
| if parameter_name.lower() == "clustername": |
| state_save_location = get_config_parameter( |
| "StateSaveLocation", live=slurmctld_running, quiet=True |
| ) |
| run_command( |
| f"rm -f {state_save_location}/clustername", |
| user=properties["slurm-user"], |
| quiet=True, |
| ) |
| |
| # Reconfigure (or restart) slurm controller if it is already running |
| if slurmctld_running: |
| if source != "slurm" or parameter_name.lower() in [ |
| "accountingstoragetype", |
| "rebootprogram", |
| ]: |
| restart_slurm(quiet=True) |
| elif restart or parameter_name.lower() in [ |
| "authtype", |
| "controlmach", |
| "plugindir", |
| "statesavelocation", |
| "slurmctldhost", |
| "slurmctldport", |
| "slurmdport", |
| ]: |
| restart_slurmctld(quiet=True) |
| else: |
| run_command( |
| "scontrol reconfigure", user=properties["slurm-user"], quiet=True |
| ) |
| |
| |
| def add_config_parameter_value(name, value, source="slurm"): |
| """Appends a value to configuration parameter list. |
| |
| When a parameter may contain a comma-separated list of values, this |
| function can be used to add a value to the list. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| name (string): The parameter name. |
| value (string): The value to add. |
| source (string): Name of the config file without the .conf prefix. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> add_config_parameter_value('SlurmdParameters', 'config_overrides') |
| """ |
| |
| if config_parameter_includes(name, value, live=False, quiet=True, source=source): |
| return |
| |
| original_value_string = get_config_parameter( |
| name, live=False, quiet=True, source=source |
| ) |
| if original_value_string is None: |
| set_config_parameter(name, value, source=source) |
| else: |
| value_list = original_value_string.split(",") |
| value_list.append(value) |
| set_config_parameter(name, ",".join(value_list), source=source) |
| |
| |
| def remove_config_parameter_value(name, value, source="slurm"): |
| """Removes a value from a configuration parameter list. |
| |
| When a parameter may contain a comma-separated list of values, this |
| function can be used to remove a value from the list. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| name (string): The parameter name. |
| value (string): The value to remove. |
| source (string): Name of the config file without the .conf prefix. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> remove_config_parameter_value('SlurmdParameters', 'config_overrides') |
| """ |
| if not config_parameter_includes( |
| name, value, live=False, quiet=True, source=source |
| ): |
| return |
| |
| value_list = get_config_parameter( |
| name, live=False, quiet=True, source=source |
| ).split(",") |
| value_list.remove(value.casefold()) |
| if value_list: |
| set_config_parameter(name, ",".join(value_list), source=source) |
| else: |
| set_config_parameter(name, None, source=source) |
| |
| |
| def is_tool(tool): |
| """Returns True if the tool is found in PATH. |
| |
| Args: |
| tool (string): The name of the tool to check for in the PATH environment variable. |
| |
| Returns: |
| True if the tool is found in PATH, False otherwise. |
| |
| Example: |
| >>> is_tool('ls') |
| True |
| >>> is_tool('uninstalled-tool-name') |
| False |
| """ |
| |
| from shutil import which |
| |
| return which(tool) is not None |
| |
| |
| def require_tool(tool): |
| """Skips if the supplied tool is not found. |
| |
| Args: |
| tool (string): The name of the tool to check for in the PATH environment variable. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_tool('ls') |
| >>> require_tool('uninstalled-tool-name') |
| """ |
| |
| if not is_tool(tool): |
| msg = f"This test requires '{tool}' and it was not found" |
| pytest.skip(msg, allow_module_level=True) |
| |
| |
| def require_mpi(mpi_option="pmix", mpi_compiler="mpicc"): |
| """Skips if we cannot use the --mpi=mpi_option or the mpi_compiler is not available". |
| |
| Args: |
| mpi_option (string): The value to use with --mpi when submitting jobs. |
| mpi_compiler (string): The required compiler in the system. |
| |
| Returns: |
| None |
| """ |
| |
| require_tool(mpi_compiler) |
| output = run_command_output("srun --mpi=list", fatal=True) |
| if re.search(rf"plugin versions available: .*{mpi_option}", output) is None: |
| pytest.skip( |
| f"This test needs to be able to use --mpi={mpi_option}", |
| allow_module_level=True, |
| ) |
| |
| |
| def require_influxdb(influx_client="influx", jobacct_gather="jobacct_gather/cgroup"): |
| """Require the influx client available and the right config. |
| With auto-config the default values would work. Without you may need to setup the acct_gather values in the testsuite.conf file. |
| |
| Args: |
| influx_client (string): The name of the influxdb client. Default: influx. |
| jobacct_gather (string): To use influxdb we need to set some JobAcctGatherType. Default: cgroup. |
| |
| Returns: |
| None |
| """ |
| |
| require_tool(influx_client) |
| |
| require_config_parameter("JobAcctGatherType", jobacct_gather) |
| require_config_parameter("AcctGatherProfileType", "acct_gather_profile/influxdb") |
| |
| require_config_parameter( |
| "ProfileInfluxDBHost", |
| f"{properties['influxdb_host']}:{properties['influxdb_port']}", |
| source="acct_gather", |
| ) |
| require_config_parameter( |
| "ProfileInfluxDBDatabase", properties["influxdb_db"], source="acct_gather" |
| ) |
| require_config_parameter("ProfileInfluxDBDefault", "ALL", source="acct_gather") |
| require_config_parameter("ProfileInfluxDBRTPolicy", "autogen", source="acct_gather") |
| |
| request_influxdb(f"CREATE DATABASE {properties['influxdb_db']}") |
| properties["influxdb-started"] = True |
| |
| |
| def request_influxdb(query): |
| """Send a query using the influx client. Requires to run require_influxdb first. |
| |
| Args: |
| query (string): The query to send, |
| |
| Returns: |
| The stdout returned by the influxdb client. |
| If the clients fails somehow, this function will fatal. |
| """ |
| return run_command_output( |
| f"influx -host {properties['influxdb_host']} -port {properties['influxdb_port']} -database {properties['influxdb_db']} -format column -execute \"{query}\"", |
| fatal=True, |
| ) |
| |
| |
| def require_whereami(): |
| """Compiles the whereami.c program to be used by tests. |
| |
| This function installs the whereami program. To get the |
| correct output, TaskPlugin is required in the slurm.conf |
| file before slurm starts up. |
| ex: TaskPlugin=task/cgroup,task/affinity |
| |
| The file will be installed in the testsuite/python/lib/scripts |
| directory where the whereami.c file is located |
| |
| Args: |
| None |
| |
| Returns: |
| None |
| |
| Examples: |
| >>> atf.require_whereami() |
| >>> print('\nwhereami is located at', atf.properties['whereami']) |
| >>> output = atf.run_command(f"srun {atf.properties['whereami']}", |
| >>> user=atf.properties['slurm-user']) |
| """ |
| require_config_parameter("TaskPlugin", "task/cgroup,task/affinity") |
| |
| # If the file already exists and we don't need to recompile |
| dest_file = f"{properties['testsuite_scripts_dir']}/whereami" |
| if os.path.isfile(dest_file): |
| properties["whereami"] = dest_file |
| return |
| |
| source_file = f"{properties['testsuite_scripts_dir']}/whereami.c" |
| if not os.path.isfile(source_file): |
| pytest.skip("Could not find whereami.c!", allow_module_level=True) |
| |
| run_command( |
| f"gcc {source_file} -o {dest_file}", fatal=True, user=properties["slurm-user"] |
| ) |
| properties["whereami"] = dest_file |
| |
| |
| def require_config_parameter( |
| parameter_name, |
| parameter_value, |
| condition=None, |
| source="slurm", |
| skip_message=None, |
| delimiter="=", |
| ): |
| """Ensures that a configuration parameter has the required value. |
| |
| In local-config mode, the test is skipped if the required configuration is not set. |
| In auto-config mode, sets the required configuration value if necessary. |
| |
| Args: |
| parameter_name (string): The parameter name. |
| parameter_value (string): The target parameter value. |
| condition (callable): If there is a range of acceptable values, a |
| condition can be specified to test whether the current parameter |
| value is sufficient. If not, the target parameter_value will be |
| used (or the test will be skipped in the case of local-config mode). |
| source (string): Name of the config file without the .conf prefix. |
| skip_message (string): Message to be displayed if in local-config mode |
| and parameter not present. |
| delimiter (string): The delimiter between the parameter name and the value. |
| |
| Note: |
| When requiring a complex parameter (one which may be repeated and has |
| its own subparameters, such as with nodes, partitions and gres), |
| the parameter_value should be a dictionary of dictionaries. See the |
| fourth example for multi-line parameters. |
| |
| Returns: |
| None |
| |
| Examples: |
| >>> require_config_parameter('SelectType', 'select/cons_tres') |
| >>> require_config_parameter('SlurmdTimeout', 5, lambda v: v <= 5) |
| >>> require_config_parameter('Name', {'gpu': {'File': '/dev/tty0'}, 'mps': {'Count': 100}}, source='gres') |
| >>> require_config_parameter("PartitionName", {"primary": {"Nodes": "ALL"}, "dynamic1": {"Nodes": "ns1"}, "dynamic2": {"Nodes": "ns2"}, "dynamic3": {"Nodes": "ns1,ns2"}}) |
| """ |
| |
| if isinstance(parameter_value, dict): |
| tmp1_dict = dict() |
| for k1, v1 in parameter_value.items(): |
| tmp2_dict = dict() |
| for k2, v2 in v1.items(): |
| if isinstance(v2, str): |
| tmp2_dict[k2.casefold()] = v2.casefold() |
| else: |
| tmp2_dict[k2.casefold()] = v2 |
| tmp1_dict[k1.casefold()] = tmp2_dict |
| |
| parameter_value = tmp1_dict |
| elif isinstance(parameter_value, str): |
| parameter_value = parameter_value.casefold() |
| |
| observed_value = get_config_parameter( |
| parameter_name, live=False, source=source, quiet=True, delimiter=delimiter |
| ) |
| |
| condition_satisfied = False |
| if condition is None: |
| # condition = lambda observed, desired: observed == desired |
| if observed_value == parameter_value: |
| condition_satisfied = True |
| else: |
| if condition(observed_value): |
| condition_satisfied = True |
| |
| if not condition_satisfied: |
| if properties["auto-config"]: |
| set_config_parameter( |
| parameter_name, parameter_value, source=source, delimiter=delimiter |
| ) |
| else: |
| if skip_message is None: |
| skip_message = f"This test requires the {parameter_name} parameter to be {parameter_value} (but it is {observed_value})" |
| pytest.skip(skip_message, allow_module_level=True) |
| |
| |
| def require_config_parameter_includes(name, value, source="slurm"): |
| """Ensures that a configuration parameter list contains the required value. |
| |
| In local-config mode, the test is skipped if the configuration parameter |
| list does not include the required value. |
| In auto-config mode, adds the required value to the configuration parameter |
| list if necessary. |
| |
| Args: |
| name (string): The parameter name. |
| value (string): The value we want to be in the list. |
| source (string): Name of the config file without the .conf prefix. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_config_parameter_includes('SlurmdParameters', 'config_overrides') |
| """ |
| |
| if properties["auto-config"]: |
| add_config_parameter_value(name, value, source=source) |
| else: |
| if not config_parameter_includes( |
| name, value, source=source, live=False, quiet=True |
| ): |
| pytest.skip( |
| f"This test requires the {name} parameter to include {value}", |
| allow_module_level=True, |
| ) |
| |
| |
| def require_config_parameter_excludes(name, value, source="slurm"): |
| """Ensures that a configuration parameter list does not contain a value. |
| |
| In local-config mode, the test is skipped if the configuration parameter |
| includes the specified value. In auto-config mode, removes the specified |
| value from the configuration parameter list if necessary. |
| |
| Args: |
| name (string): The parameter name. |
| value (string): The value we do not want to be in the list. |
| source (string): Name of the config file without the .conf prefix. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_config_parameter_excludes('SlurmdParameters', 'config_overrides') |
| """ |
| if properties["auto-config"]: |
| remove_config_parameter_value(name, value, source=source) |
| else: |
| if config_parameter_includes( |
| name, value, source=source, live=False, quiet=True |
| ): |
| pytest.skip( |
| f"This test requires the {name} parameter to exclude {value}", |
| allow_module_level=True, |
| ) |
| |
| |
| def require_tty(number): |
| """Creates a TTY device file if it does not exist. |
| |
| Args: |
| number (integer): The number of the TTY device. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_tty(1) |
| >>> require_tty(2) |
| """ |
| |
| tty_file = f"/dev/tty{number}" |
| if not os.path.exists(tty_file): |
| run_command( |
| f"mknod -m 666 {tty_file} c 4 {number}", user="root", fatal=True, quiet=True |
| ) |
| |
| |
| ## Use this to create an entry in gres.conf and create an associated tty |
| # def require_gres_device(name): |
| # |
| # gres_value = get_config_parameter('Name', live=False, source='gres', quiet=True) |
| # if gres_value is None or name not in gres_value: |
| # if not properties['auto-config']: |
| # pytest.skip(f"This test requires a '{name}' gres device to be defined in gres.conf", allow_module_level=True) |
| # else: |
| # require_tty(0) |
| # require_config_parameter('Name', {name: {'File': '/dev/tty0'}}, source='gres') |
| |
| |
| def require_auto_config(reason=""): |
| """Ensures that auto-config mode is being used. |
| |
| This function skips the test if auto-config mode is not enabled. |
| |
| Args: |
| reason (string): Augments the skip reason with a context-specific |
| explanation for why the auto-config mode is needed by the test. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_auto_config("wants to set the Epilog") |
| """ |
| |
| if not properties["auto-config"]: |
| message = "This test requires auto-config to be enabled" |
| if reason != "": |
| message += f" ({reason})" |
| pytest.skip(message, allow_module_level=True) |
| |
| |
| def require_accounting(modify=False): |
| """Ensures that Slurm accounting is configured. |
| |
| In local-config mode, the test is skipped if Slurm accounting is not |
| configured. In auto-config mode, configures Slurm accounting if necessary. |
| |
| Args: |
| modify (boolean): If True, this indicates to the ATF that the test |
| will modify the accounting database (e.g. adding accounts, etc). |
| A database backup is automatically created and the original dump |
| is restored after the test completes. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_accounting() |
| >>> require_accounting(modify=True) |
| """ |
| |
| if properties["auto-config"]: |
| if ( |
| get_config_parameter("AccountingStorageType", live=False, quiet=True) |
| != "accounting_storage/slurmdbd" |
| ): |
| set_config_parameter("AccountingStorageType", "accounting_storage/slurmdbd") |
| if modify: |
| backup_accounting_database() |
| else: |
| if modify and not properties["allow-slurmdbd-modify"]: |
| require_auto_config("wants to modify the accounting database") |
| elif ( |
| get_config_parameter("AccountingStorageType", live=False, quiet=True) |
| != "accounting_storage/slurmdbd" |
| ): |
| pytest.skip( |
| "This test requires accounting to be configured", |
| allow_module_level=True, |
| ) |
| |
| |
| def require_slurmrestd(openapi_plugins, data_parsers): |
| properties["openapi_plugins"] = openapi_plugins |
| properties["data_parsers"] = data_parsers |
| |
| if properties["auto-config"]: |
| properties["slurmrestd-started"] = True |
| elif "SLURM_TESTSUITE_SLURMRESTD_URL" in os.environ: |
| properties["slurmrestd_url"] = os.environ["SLURM_TESTSUITE_SLURMRESTD_URL"] |
| |
| # Setup auth token |
| setup_slurmrestd_headers() |
| |
| # Check version is the expected one |
| if not is_slurmrestd_running(): |
| pytest.skip( |
| f"This test needs slurmrestd running in SLURM_TESTSUITE_SLURMRESTD_URL but cannot connect with {os.environ['SLURM_TESTSUITE_SLURMRESTD_URL']}", |
| allow_module_level=True, |
| ) |
| else: |
| pytest.skip( |
| "This test requires to start slurmrestd or SLURM_TESTSUITE_SLURMRESTD_URL", |
| allow_module_level=True, |
| ) |
| |
| |
| def start_slurmrestd(): |
| os.environ["SLURM_JWT"] = "daemon" |
| port = None |
| attempts = 0 |
| |
| if "slurm-logs-dir" not in properties: |
| properties["slurm-logs-dir"] = os.path.dirname( |
| get_config_parameter("SlurmctldLogFile", live=False, quiet=True) |
| ) |
| |
| properties["slurmrestd_log"] = open( |
| f"{properties['slurm-logs-dir']}/slurmrestd.log", "w" |
| ) |
| if not properties["slurmrestd_log"]: |
| pytest.fail( |
| f"Unable to open slurmrestd log: {properties['slurm-logs-dir']}/slurmrestd.log" |
| ) |
| |
| while not port and attempts < 15: |
| port = get_open_port() |
| attempts += 1 |
| args = [ |
| "slurmrestd", |
| "-a", |
| "jwt", |
| "-s", |
| properties["openapi_plugins"], |
| ] |
| if properties["data_parsers"] is not None: |
| args.extend(["-d", properties["data_parsers"]]) |
| |
| args.append(f"localhost:{port}") |
| logging.debug(f"Trying to start slurmrestd: {args}") |
| |
| properties["slurmrestd"] = subprocess.Popen( |
| args, |
| stdin=subprocess.DEVNULL, |
| stdout=properties["slurmrestd_log"], |
| stderr=properties["slurmrestd_log"], |
| ) |
| s = None |
| |
| for i in range(100): |
| if properties["slurmrestd"].poll(): |
| break |
| |
| try: |
| s = socket.create_connection(("localhost", port)) |
| break |
| except Exception as e: |
| logging.debug(f"Unable to connect to port {port}: {e}") |
| time.sleep(1) |
| |
| if s: |
| s.close() |
| break |
| |
| logging.debug(f"slurmrestd accepting on port {port} but is still running") |
| properties["slurmrestd"].kill() |
| properties["slurmrestd"].wait() |
| port = None |
| |
| if not port: |
| pytest.fail(f"Unable start slurmrestd after trying {attempts} different ports") |
| |
| del os.environ["SLURM_JWT"] |
| |
| properties["slurmrestd_url"] = f"http://localhost:{port}/" |
| |
| # Setup auth token |
| setup_slurmrestd_headers() |
| |
| # Check slurmrestd is up |
| if not is_slurmrestd_running(): |
| pytest.fail("Slurmrestd not responding") |
| |
| |
| def setup_slurmrestd_headers(): |
| # Create the headers with the token to connect later |
| token = ( |
| run_command_output("scontrol token lifespan=600", fatal=True) |
| .replace("SLURM_JWT=", "") |
| .replace("\n", "") |
| ) |
| if token == "": |
| logging.warning("unable to get auth/jwt token") |
| |
| properties["slurmrestd-headers"] = { |
| "X-SLURM-USER-NAME": get_user_name(), |
| "X-SLURM-USER-TOKEN": token, |
| } |
| |
| |
| def get_user_name(): |
| """Returns the username of the current user. |
| |
| Args: |
| None |
| |
| Returns: |
| The username of the current user. |
| |
| Example: |
| >>> get_user_name() |
| 'john_doe' |
| """ |
| return pwd.getpwuid(os.getuid()).pw_name |
| |
| |
| def cancel_jobs( |
| job_list, |
| timeout=default_polling_timeout, |
| poll_interval=0.1, |
| fatal=False, |
| quiet=False, |
| ): |
| """Cancels a list of jobs and waits for them to complete. |
| |
| Args: |
| job_list (list): A list of job ids to cancel. All 0s will be ignored. |
| timeout (integer): Number of seconds to wait for jobs to be done before |
| timing out. |
| poll_interval (float): Number of seconds to wait between job state |
| polls. |
| fatal (boolean): If True, a timeout will result in the test failing. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| True if all jobs were successfully cancelled and completed within |
| the timeout period, False otherwise. |
| |
| Example: |
| >>> cancel_jobs([1234, 5678], timeout=60, fatal=True) |
| True |
| >>> cancel_jobs([9876, 5432], timeout=30, fatal=False) |
| False |
| """ |
| |
| # Filter list to ignore job_ids being 0 |
| job_list = [i for i in job_list if i != 0] |
| job_list_string = " ".join(str(i) for i in job_list) |
| |
| if job_list_string == "": |
| return True |
| |
| run_command(f"scancel {job_list_string}", fatal=fatal, quiet=quiet) |
| |
| for job_id in job_list: |
| status = wait_for_job_state( |
| job_id, |
| "DONE", |
| timeout=timeout, |
| poll_interval=poll_interval, |
| fatal=fatal, |
| quiet=quiet, |
| ) |
| if not status: |
| if fatal: |
| pytest.fail( |
| f"Job ({job_id}) was not cancelled within the {timeout} second timeout" |
| ) |
| return status |
| |
| return True |
| |
| |
| def cancel_all_jobs( |
| timeout=default_polling_timeout, poll_interval=0.1, fatal=False, quiet=False |
| ): |
| """Cancels all jobs by the test user and waits for them to be cancelled. |
| |
| Args: |
| fatal (boolean): If True, a timeout will result in the test failing. |
| timeout (integer): If timeout number of seconds expires before the |
| jobs are verified to be cancelled, fail. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| True if all jobs were successfully cancelled and completed within |
| the timeout period, False otherwise. |
| |
| Example: |
| >>> cancel_all_jobs(timeout=60, fatal=True) |
| True |
| >>> cancel_all_jobs(timeout=30, fatal=False) |
| False |
| """ |
| |
| user_name = get_user_name() |
| |
| run_command(f"scancel -u {user_name}", fatal=fatal, quiet=quiet) |
| |
| return repeat_command_until( |
| f"squeue -u {user_name} --noheader", |
| lambda results: results["stdout"] == "", |
| timeout=timeout, |
| poll_interval=poll_interval, |
| fatal=fatal, |
| quiet=quiet, |
| ) |
| |
| |
| def is_integer(value): |
| try: |
| int(value) |
| except ValueError: |
| return False |
| else: |
| return True |
| |
| |
| def is_float(value): |
| try: |
| float(value) |
| except ValueError: |
| return False |
| else: |
| return True |
| |
| |
| def get_nodes(live=True, quiet=False, **run_command_kwargs): |
| """Returns the node configuration as a dictionary of dictionaries. |
| |
| If the live argument is not True, the dictionary will contain the literal |
| configuration values and the DEFAULT node will be separately indexed. |
| |
| Args: |
| live (boolean): |
| If True, the node configuration information is obtained via a query |
| to the slurmctld daemon (e.g. scontrol show config). |
| If False, the node configuration information is obtained by |
| directly parsing the Slurm configuration file (slurm.conf). |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| A dictionary of dictionaries where the first level keys are the node |
| names and with their values being a dictionary of configuration |
| parameters for the respective node. |
| |
| Example: |
| >>> get_nodes() |
| {'node1': {'NodeName': 'node1', 'Arch': 'x86_64', 'CoresPerSocket': 1, 'CPUAlloc': 0, 'CPUEfctv': 1, 'CPUTot': 1, 'CPULoad': 91.4, 'AvailableFeatures': None, 'ActiveFeatures': None, 'Gres': None, 'NodeAddr': 'slurm-host', 'NodeHostName': 'slurm-host', 'Port': 6821, 'Version': '24.05.0-0rc1', ...}} |
| >>> get_nodes(live=False, quiet=True) |
| {'node1': {'NodeHostname': 'slurm-host', 'Port': 6821, 'NodeName': 'node1'}} |
| """ |
| |
| nodes_dict = {} |
| |
| if live: |
| # TODO: Remove extra debug info for t22858 instead of fatal |
| result = run_command( |
| "scontrol show nodes --json -F", |
| fatal=False, |
| quiet=quiet, |
| **run_command_kwargs, |
| ) |
| if result["exit_code"]: |
| logging.debug( |
| "Fatal failure of 'scontrol show nodes', probably due 'Unable to contact slurm controller' (t22858)" |
| ) |
| logging.debug("Getting gcore from slurmctld before fatal.") |
| gcore("slurmctld") |
| pytest.fail( |
| f"Command 'scontrol show nodes -oF' failed with rc={result['exit_code']}: {result['stderr']}" |
| ) |
| output = result["stdout"] |
| node_dict_json = json.loads(output)["nodes"] |
| |
| node_dict = {} |
| for node in node_dict_json: |
| # Add the node dictionary to the nodes dictionary |
| nodes_dict[node["name"]] = node |
| else: |
| # Get the config dictionary |
| config_dict = get_config(live=False, quiet=quiet) |
| |
| # Convert keys to lower case so we can do a case-insensitive search |
| lower_config_dict = dict( |
| (key.lower(), value) for key, value in config_dict.items() |
| ) |
| |
| # DEFAULT will be included separately |
| if "nodename" in lower_config_dict: |
| for node_expression, node_expression_dict in lower_config_dict[ |
| "nodename" |
| ].items(): |
| port_expression = ( |
| node_expression_dict["Port"] |
| if "Port" in node_expression_dict |
| else "" |
| ) |
| |
| # Break up the node expression and port expression into lists |
| node_list = node_range_to_list(node_expression) |
| port_list = range_to_list(port_expression) |
| |
| # Iterate over the nodes in the expression |
| for node_index in range(len(node_list)): |
| node_name = node_list[node_index] |
| # Add the parameters to the temporary node dictionary |
| node_dict = dict(node_expression_dict) |
| node_dict["NodeName"] = node_name |
| if node_index < len(port_list): |
| node_dict["Port"] = int(port_list[node_index]) |
| # Add the node dictionary to the nodes dictionary |
| nodes_dict[node_name] = node_dict |
| |
| return nodes_dict |
| |
| |
| def get_node_parameter(node_name, parameter_name, default=None, live=True): |
| """Obtains the value for a node configuration parameter. |
| |
| Args: |
| node_name (string): The node name. |
| parameter_name (string): The parameter name. |
| default (string or None): This value is returned if the parameter |
| is not found. |
| live (boolean): |
| If True, the node configuration information is obtained via a query |
| to the slurmctld daemon (e.g. scontrol show config). |
| If False, the node configuration information is obtained by |
| directly parsing the Slurm configuration file (slurm.conf). |
| |
| Returns: |
| The value of the specified node parameter, or the default if not found. |
| |
| Example: |
| >>> get_node_parameter('node1', 'State') |
| 'IDLE' |
| >>> get_node_parameter('node2', 'RealMemory', default=4) |
| 1 |
| >>> get_node_parameter('node3', 'Partitions', default='primary', live=False) |
| 'primary' |
| """ |
| |
| nodes_dict = get_nodes(live=live, quiet=True) |
| |
| if node_name in nodes_dict: |
| node_dict = nodes_dict[node_name] |
| else: |
| pytest.fail(f"Node ({node_name}) was not found in the node configuration") |
| |
| if parameter_name in node_dict: |
| return node_dict[parameter_name] |
| else: |
| return default |
| |
| |
| def set_node_parameter(node_name, new_parameter_name, new_parameter_value): |
| """Sets the value of the specified node configuration parameter. |
| |
| This function sets a node property for the specified node and restarts |
| the relevant slurm daemons. A backup is automatically created and the |
| original configuration is restored after the test completes. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| node_name (string): The node name. |
| new_parameter_name (string): The parameter name. |
| new_parameter_value (string): The parameter value. |
| Use a value of None to unset a node parameter. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> set_node_parameter('node1', 'Features', 'f1') |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to modify node parameters") |
| |
| config_file = f"{properties['slurm-config-dir']}/slurm.conf" |
| |
| # Read the original slurm.conf into a list of lines |
| output = run_command_output( |
| f"cat {config_file}", user=properties["slurm-user"], quiet=True |
| ) |
| original_config_lines = output.splitlines() |
| new_config_lines = original_config_lines.copy() |
| |
| # Locate the node among the various NodeName definitions |
| found_node_name = False |
| for line_index in range(len(original_config_lines)): |
| line = original_config_lines[line_index] |
| |
| words = re.split(r" +", line.strip()) |
| if len(words) < 1: |
| continue |
| if words[0][0] == "#": |
| continue |
| parameter_name, parameter_value = words[0].split("=", 1) |
| if parameter_name.lower() != "nodename": |
| continue |
| |
| # We found a NodeName line. Read in the node parameters |
| node_expression = parameter_value |
| port_expression = "" |
| original_node_parameters = collections.OrderedDict() |
| for word in words[1:]: |
| parameter_name, parameter_value = word.split("=", 1) |
| if parameter_name.lower() == "port": |
| port_expression = parameter_value |
| else: |
| original_node_parameters[parameter_name] = parameter_value |
| |
| node_list = node_range_to_list(node_expression) |
| port_list = range_to_list(port_expression) |
| |
| # Determine whether our node is included in this node expression |
| if node_name in node_list: |
| # Yes. We found the node |
| found_node_name = True |
| node_index = node_list.index(node_name) |
| |
| # Delete the original node definition |
| new_config_lines.pop(line_index) |
| |
| # Add the modified definition for the specified node |
| modified_node_parameters = original_node_parameters.copy() |
| if new_parameter_value is None: |
| if new_parameter_name in modified_node_parameters: |
| del modified_node_parameters[new_parameter_name] |
| else: |
| modified_node_parameters[new_parameter_name] = new_parameter_value |
| modified_node_line = f"NodeName={node_name}" |
| if node_index < len(port_list): |
| modified_node_line += f" Port={port_list[node_index]}" |
| for parameter_name, parameter_value in modified_node_parameters.items(): |
| modified_node_line += f" {parameter_name}={parameter_value}" |
| new_config_lines.insert(line_index, modified_node_line) |
| |
| # If the node was part of an aggregate node definition |
| node_list.remove(node_name) |
| if len(node_list): |
| # Write the remainder of the aggregate using the original attributes |
| remainder_node_expression = node_list_to_range(node_list) |
| remainder_node_line = f"NodeName={remainder_node_expression}" |
| if node_index < len(port_list): |
| port_list.pop(node_index) |
| if port_list: |
| remainder_port_expression = list_to_range(port_list) |
| remainder_node_line += f" Port={remainder_port_expression}" |
| for parameter_name, parameter_value in original_node_parameters.items(): |
| remainder_node_line += f" {parameter_name}={parameter_value}" |
| new_config_lines.insert(line_index, remainder_node_line) |
| |
| break |
| |
| if not found_node_name: |
| pytest.fail( |
| f"Invalid node specified in set_node_parameter(). Node ({node_name}) does not exist" |
| ) |
| |
| # Write the config file back out with the modifications |
| backup_config_file("slurm") |
| new_config_string = "\n".join(new_config_lines) |
| run_command( |
| f"echo '{new_config_string}' > {config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Restart slurm controller if it is already running |
| if is_slurmctld_running(quiet=True): |
| restart_slurm(quiet=True) |
| |
| |
| def get_reservations(quiet=False, **run_command_kwargs): |
| """Returns the reservations as a dictionary of dictionaries. |
| |
| Args: |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| A dictionary of dictionaries where the first level keys are the |
| reservation names and with their values being a dictionary of |
| configuration parameters for the respective reservation. |
| |
| Example: |
| >>> get_reservations() |
| {'resv1': {'ReservationName': 'resv1', 'StartTime': '2024-04-06T00:00:17', 'EndTime': '2024-04-06T00:15:17', 'Duration': '00:15:00', 'Nodes': 'node1', 'NodeCnt': 1, 'CoreCnt': 1, 'Features': None, 'PartitionName': 'primary', 'Flags': 'DAILY,REPLACE_DOWN,PURGE_COMP=00:01:00', 'TRES': 'cpu=1', 'Users': 'atf', 'Groups': None, 'Accounts': None, 'Licenses': None, 'State': 'INACTIVE', 'BurstBuffer': None, 'MaxStartDelay': None}} |
| """ |
| |
| resvs_dict = {} |
| resv_dict = {} |
| |
| output = run_command_output( |
| "scontrol show reservations -o", fatal=True, quiet=quiet, **run_command_kwargs |
| ) |
| for line in output.splitlines(): |
| if line == "": |
| continue |
| |
| while match := re.search(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", line): |
| parameter_name, parameter_value = match.group(1), match.group(2) |
| |
| # Remove the consumed parameter from the line |
| line = re.sub(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", "", line) |
| |
| # Reformat the value if necessary |
| if is_integer(parameter_value): |
| parameter_value = int(parameter_value) |
| elif is_float(parameter_value): |
| parameter_value = float(parameter_value) |
| elif parameter_value == "(null)": |
| parameter_value = None |
| |
| # Add it to the temporary resv dictionary |
| resv_dict[parameter_name] = parameter_value |
| |
| # Add the resv dictionary to the resvs dictionary |
| resvs_dict[resv_dict["ReservationName"]] = resv_dict |
| |
| # Clear the resv dictionary for use by the next resv |
| resv_dict = {} |
| |
| return resvs_dict |
| |
| |
| def get_reservation_parameter(resv_name, parameter_name, default=None): |
| """Obtains the value for a reservation configuration parameter. |
| |
| Args: |
| resv_name (string): The reservation name. |
| parameter_name (string): The parameter name. |
| default (string or None): This value is returned if the parameter |
| is not found. |
| |
| Returns: |
| The value of the specified reservation parameter, or the default if not found. |
| |
| Example: |
| >>> get_reservation_parameter('resv1', 'PartitionName') |
| 'primary' |
| >>> get_reservation_parameter('resv2', 'EndTime', default='2024-04-06T00:15:17') |
| '2024-04-06T00:15:17' |
| """ |
| |
| resvs_dict = get_reservations() |
| |
| if resv_name in resvs_dict: |
| resv_dict = resvs_dict[resv_name] |
| else: |
| pytest.fail(f"reservation ({resv_name}) was not found") |
| |
| if parameter_name in resv_dict: |
| return resv_dict[parameter_name] |
| else: |
| return default |
| |
| |
| def is_super_user(): |
| """Checks if the current user is a super user. |
| |
| Args: |
| None |
| |
| Returns: |
| True if the current user is a super user, False otherwise. |
| |
| Example: |
| >>> is_super_user() |
| False |
| """ |
| |
| uid = os.getuid() |
| |
| if uid == 0: |
| return True |
| |
| user = pwd.getpwuid(uid)[0] |
| if get_config_parameter("SlurmUser") == user: |
| return True |
| |
| return False |
| |
| |
| def require_sudo_rights(): |
| """Skips the test if the test user does not have unprompted sudo privileges. |
| |
| Args: |
| None |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_sudo_rights() |
| """ |
| if not properties["sudo-rights"]: |
| pytest.skip( |
| "This test requires the test user to have unprompted sudo privileges", |
| allow_module_level=True, |
| ) |
| |
| |
| def submit_job_sbatch(sbatch_args='--wrap "sleep 60"', **run_command_kwargs): |
| """Submits a job using sbatch and returns the job id. |
| |
| The submitted job will automatically be cancelled when the test ends. |
| |
| Args: |
| sbatch_args (string): The arguments to sbatch. |
| |
| Returns: |
| The job id. |
| |
| Example: |
| >>> submit_job_sbatch('--wrap "echo Hello"') |
| 1234 |
| >>> submit_job_sbatch('-J myjob --wrap "echo World"', fatal=True) |
| 5678 |
| """ |
| |
| output = run_command_output(f"sbatch {sbatch_args}", **run_command_kwargs) |
| |
| if match := re.search(r"Submitted \S+ job (\d+)", output): |
| job_id = int(match.group(1)) |
| properties["submitted-jobs"].append(job_id) |
| return job_id |
| else: |
| return 0 |
| |
| |
| # Returns results |
| def run_job(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the run_command results dictionary. |
| |
| If the srun command times out, it will automatically be cancelled when the |
| test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The srun run_command results dictionary. |
| |
| Example: |
| >>> run_job('-n 4 --output=output.txt ./my_executable', timeout=60) |
| {'command': 'srun -n 4 --output=output.txt ./my_executable', 'start_time': 1712276708.827, 'duration': 60.0, 'exit_code': 110, 'stdout': '', 'stderr': 'srun: Requested partition configuration not available now\nsrun: job 15 queued and waiting for resources\n'} |
| >>> run_job('-n 1 --error=error.txt ./my_executable', fatal=True) |
| {'command': 'srun -n 1 --error=error.txt my_executable', 'start_time': 1712277798.016, 'duration': 0.06, 'exit_code': 0, 'stdout': 'foo bar\n', 'stderr': ''} |
| """ |
| |
| results = run_command(f"srun {srun_args}", **run_command_kwargs) |
| |
| return results |
| |
| |
| # Return exit code |
| def run_job_exit(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the exit code. |
| |
| If the srun command times out, it will automatically be cancelled when the |
| test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The exit code from srun. |
| |
| Example: |
| >>> run_job_exit('-n 4 --output=output.txt ./my_executable', timeout=60) |
| 2 |
| >>> run_job_exit('-n 2 --error=error.txt ./my_executable', fatal=True) |
| 0 |
| """ |
| |
| results = run_job(srun_args, **run_command_kwargs) |
| |
| return results["exit_code"] |
| |
| |
| # Return output |
| def run_job_output(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the standard output. |
| |
| If the srun command times out, it will automatically be cancelled when the |
| test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The standard output from srun. |
| |
| Example: |
| >>> run_job_output('-n 4 ./my_executable', timeout=60) |
| 'stdout of the command' |
| >>> run_job_output('-n 2 --output=output.txt ./my_executable', fatal=True) |
| '' |
| """ |
| |
| results = run_job(srun_args, **run_command_kwargs) |
| |
| return results["stdout"] |
| |
| |
| # Return error |
| def run_job_error(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the standard error. |
| |
| If the srun command times out, it will automatically be cancelled when the |
| test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The standard error from srun. |
| |
| Example: |
| >>> run_job_error('-n 4 --output=output.txt ./my_executable', timeout=60) |
| 'stderr of the command' |
| >>> run_job_error('-n 200000 ./my_executable', fatal=True) # Will automatically fail the test due to resources |
| """ |
| |
| results = run_job(srun_args, **run_command_kwargs) |
| |
| return results["stderr"] |
| |
| |
| # Return job id |
| def submit_job_srun(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the job id. |
| |
| This function obtains the job id by adding the -v option to srun and parsing |
| out the job id. If the srun command times out, it will automatically be |
| cancelled when the test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The job id from srun. |
| |
| Example: |
| >>> submit_job_srun('-n 4 --output=output.txt ./my_executable') |
| 12345 |
| >>> submit_job_srun('-n 2 --output=output.txt ./my_executable', timeout=60) |
| 67890 |
| """ |
| |
| results = run_job(" ".join(["-v", srun_args]), **run_command_kwargs) |
| |
| if match := re.search(r"jobid (\d+)", results["stderr"]): |
| return int(match.group(1)) |
| else: |
| return 0 |
| |
| |
| # Return job id (command should not be interactive/shell) |
| def submit_job_salloc(salloc_args, **run_command_kwargs): |
| """Submits a job using salloc and returns the job id. |
| |
| The submitted job will automatically be cancelled when the test ends. |
| |
| Args: |
| salloc_args (string): The arguments to salloc. |
| |
| Returns: |
| The job id. |
| |
| Example: |
| >>> submit_job_salloc('-N 1 -t 60 --output=output.txt ./my_executable') |
| 12345 |
| >>> submit_job_salloc('-N 2 -t 120 --output=output.txt ./my_executable', timeout=60) |
| 67890 |
| """ |
| |
| results = run_command(f"salloc {salloc_args}", **run_command_kwargs) |
| if match := re.search(r"Granted job allocation (\d+)", results["stderr"]): |
| job_id = int(match.group(1)) |
| properties["submitted-jobs"].append(job_id) |
| return job_id |
| else: |
| return 0 |
| |
| |
| # Return job id |
| def submit_job(command, job_param, job, *, wrap_job=True, **run_command_kwargs): |
| """Submits a job using the given command and returns the job id. |
| |
| Args: |
| command (string): The command to submit the job (salloc, srun, sbatch). |
| job_param (string): The arguments to the job. |
| job (string): The command or job file to be executed. |
| wrap_job (boolean): If True, the job will be wrapped when the command is 'sbatch'. |
| |
| Returns: |
| The job id. |
| |
| Example: |
| >>> submit_job('salloc', '-N 1 -t 60', './my_executable', quiet=True) |
| 12345 |
| >>> submit_job('srun', '-N 2 -t 120', './my_executable', quiet=True) |
| 67890 |
| >>> submit_job('sbatch', '-N 1 -t 60', './my_script.sh', wrap_job=False, quiet=True) |
| 23456 |
| """ |
| |
| # Make sure command is a legal command to run a job |
| assert command in [ |
| "salloc", |
| "srun", |
| "sbatch", |
| ], f"Invalid command '{command}'. Should be salloc, srun, or sbatch." |
| |
| if command == "salloc": |
| return submit_job_salloc(f"{job_param} {job}", **run_command_kwargs) |
| elif command == "srun": |
| return submit_job_srun(f"{job_param} {job}", **run_command_kwargs) |
| elif command == "sbatch": |
| # If the job should be wrapped, do so before submitting |
| if wrap_job: |
| job = f"--wrap '{job}'" |
| return submit_job_sbatch(f"{job_param} {job}", **run_command_kwargs) |
| |
| |
| def run_job_nodes(srun_args, **run_command_kwargs): |
| """Runs a job using srun and returns the allocated node list. |
| |
| This function obtains the job id by adding the -v option to srun and parsing |
| out the allocated node list. If the srun command times out, it will |
| automatically be cancelled when the test ends. |
| |
| Args: |
| srun_args (string): The arguments to srun. |
| |
| Returns: |
| The allocated node list for the job. |
| |
| Example: |
| >>> run_job_nodes('-N 2 hostname', quiet=True) |
| ['node001', 'node002'] |
| >>> run_job_nodes('-N 1 --exclude=node001 hostname', quiet=True) |
| ['node002'] |
| """ |
| |
| results = run_command(f"srun -v {srun_args}", **run_command_kwargs) |
| node_list = [] |
| if results["exit_code"] == 0: |
| if match := re.search(r"jobid \d+: nodes\(\d+\):`([^']+)'", results["stderr"]): |
| node_list = node_range_to_list(match.group(1)) |
| return node_list |
| |
| |
| def get_qos(name=None, **run_command_kwargs): |
| """Returns the QOSes in the system as a dictionary of dictionaries. |
| |
| Args: |
| name: The name of a specific QOS of which to get parameters. |
| |
| Returns: |
| A dictionary of dictionaries where the first level keys are the QOSes names |
| and with their values being a dictionary of configuration parameters for |
| the respective QOS. |
| """ |
| |
| command = "sacctmgr --json show qos" |
| if id is not None: |
| command += f" {name}" |
| output = run_command_output(command, fatal=True, quiet=True, **run_command_kwargs) |
| |
| qos_dict = {} |
| qos_list = json.loads(output)["qos"] |
| for qos in qos_list: |
| qos_dict[qos["name"]] = qos |
| |
| return qos_dict |
| |
| |
| def get_jobs(job_id=None, dbd=False, **run_command_kwargs): |
| """Returns the jobs in the system as a dictionary of dictionaries. |
| |
| Args: |
| job_id (integer): The id of a specific job of which to get parameters. |
| dbd (boolean): If True, obtain the jobs from sacct instead of scontrol. |
| |
| Returns: |
| A dictionary of dictionaries where the first level keys are the job ids |
| and with their values being a dictionary of configuration parameters for |
| the respective job. |
| |
| Example: |
| >>> get_jobs() |
| {38: {'JobId': 38, 'JobName': 'wrap', 'UserId': 'atf(1002)', 'GroupId': 'atf(1002)', ...}, |
| 39: {'JobId': 39, 'JobName': 'wrap', 'UserId': 'atf(1002)', 'GroupId': 'atf(1002)', ...}} |
| >>> get_jobs(job_id='12345', quiet=True) |
| {12345: {'JobId': '12345', 'JobName': 'foo.sh', 'UserId': 'test(1003)', ...}} |
| """ |
| |
| jobs_dict = {} |
| |
| if dbd: |
| command = "sacct --json -X" |
| if job_id is not None: |
| command += f" -j {job_id}" |
| output = run_command_output( |
| command, fatal=True, quiet=True, **run_command_kwargs |
| ) |
| |
| jobs_list = json.loads(output)["jobs"] |
| for job in jobs_list: |
| jobs_dict[job["job_id"]] = job |
| |
| else: |
| command = "scontrol -d -o show jobs" |
| if job_id is not None: |
| command += f" {job_id}" |
| # TODO: Remove extra debug info for t22858 instead of fatal |
| result = run_command(command, fatal=False, **run_command_kwargs) |
| if result["exit_code"]: |
| logging.debug( |
| f"Fatal failure of {command}, probably due 'Unable to contact slurm controller' (t22858)" |
| ) |
| logging.debug("Getting gcore from slurmctld before fatal.") |
| gcore("slurmctld") |
| pytest.fail( |
| f"Command {command} failed with rc={result['exit_code']}: {result['stderr']}" |
| ) |
| |
| output = result["stdout"] |
| job_dict = {} |
| for line in output.splitlines(): |
| if line == "": |
| continue |
| |
| while match := re.search(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", line): |
| param_name, param_value = match.group(1), match.group(2) |
| |
| # Remove the consumed parameter from the line |
| line = re.sub(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", "", line) |
| |
| # Reformat the value if necessary |
| if is_integer(param_value): |
| param_value = int(param_value) |
| elif is_float(param_value): |
| param_value = float(param_value) |
| elif param_value == "(null)": |
| param_value = None |
| |
| # Add it to the temporary job dictionary |
| job_dict[param_name] = param_value |
| |
| # Add the job dictionary to the jobs dictionary |
| if job_dict: |
| jobs_dict[job_dict["JobId"]] = job_dict |
| |
| # Clear the job dictionary for use by the next job |
| job_dict = {} |
| |
| return jobs_dict |
| |
| |
| def get_steps(step_id=None, **run_command_kwargs): |
| """Returns the steps as a dictionary of dictionaries. |
| |
| Args: |
| step_id (string): The specific step ID to retrieve information for. If |
| not provided, information for all steps will be returned. |
| |
| Returns: |
| A dictionary of dictionaries where the first level keys are the step ids |
| and with their values being a dictionary of configuration parameters for |
| the respective step. |
| |
| Example: |
| >>> get_steps() |
| {'44.batch': {'StepId': '44.batch', 'UserId': 1002, 'StartTime': '2024-04-05T01:17:53', ...}, |
| '44.0': {'StepId': 44.0, 'UserId': 1002, 'StartTime': '2024-04-05T01:17:54', ...}, |
| '45.batch': {'StepId': '45.batch', 'UserId': 1002, 'StartTime': '2024-04-05T01:17:53', ...}, |
| '45.0': {'StepId': 45.0, 'UserId': 1002, 'StartTime': '2024-04-05T01:17:54', ...}} |
| >>> get_steps(step_id='123.0', quiet=True) |
| {'123.0': {'StepId': 123.0, 'UserId': 1002, 'StartTime': '2024-04-05T01:21:14', ...}} |
| """ |
| |
| steps_dict = {} |
| step_dict = {} |
| |
| command = "scontrol -d -o show steps" |
| if step_id is not None: |
| command += f" {step_id}" |
| result = run_command(command, **run_command_kwargs) |
| |
| if result["exit_code"]: |
| logging.debug("scontrol command failed, no steps returned") |
| return step_dict |
| |
| output = result["stdout"] |
| for line in output.splitlines(): |
| if line == "": |
| continue |
| |
| while match := re.search(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", line): |
| param_name, param_value = match.group(1), match.group(2) |
| |
| # Remove the consumed parameter from the line |
| line = re.sub(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", "", line) |
| |
| # Reformat the value if necessary |
| if is_integer(param_value): |
| param_value = int(param_value) |
| elif is_float(param_value) and param_name != "StepId": |
| param_value = float(param_value) |
| elif param_value == "(null)": |
| param_value = None |
| |
| # Add it to the temporary step dictionary |
| step_dict[param_name] = param_value |
| |
| # Add the step dictionary to the steps dictionary |
| if step_dict: |
| steps_dict[str(step_dict["StepId"])] = step_dict |
| |
| # Clear the step dictionary for use by the next step |
| step_dict = {} |
| |
| return steps_dict |
| |
| |
| def get_job(job_id, quiet=False): |
| """Returns job information for a specific job as a dictionary. |
| |
| Args: |
| job_id (integer): The id of the job for which information is requested. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| A dictionary containing parameters for the specified job. If the job id |
| is not found, an empty dictionary is returned. |
| |
| Example: |
| >>> get_job(51) |
| {'JobId': 51, 'JobName': 'wrap', 'UserId': 'atf(1002)', 'GroupId': 'atf(1002)', ...} |
| >>> get_job(182, quiet=True) |
| {'JobId': 182, 'JobName': 'foo.sh', 'UserId': 'atf(1002)', 'GroupId': 'atf(1002)', ...} |
| """ |
| |
| jobs_dict = get_jobs(job_id, quiet=quiet) |
| |
| return jobs_dict[job_id] if job_id in jobs_dict else {} |
| |
| |
| def get_job_parameter(job_id, parameter_name, default=None, quiet=False): |
| """Returns the value of a specific parameter for a given job. |
| |
| Args: |
| job_id (integer): The id of the job for which the parameter value is requested. |
| parameter_name (string): The name of the parameter whose value is to be obtained. |
| default (string or None): The value to be returned if the parameter is not found. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| The value of the specified job parameter, or the default value if the |
| parameter is not found. |
| |
| Example: |
| >>> get_job_parameter(12345, 'UserId') |
| 'atf(1002)' |
| >>> get_job_parameter(67890, 'Partition', default='normal', quiet=True) |
| 'primary' |
| """ |
| |
| jobs_dict = get_jobs(quiet=quiet) |
| |
| if job_id in jobs_dict: |
| job_dict = jobs_dict[job_id] |
| else: |
| pytest.fail(f"Job ({job_id}) was not found in the job configuration") |
| |
| if parameter_name in job_dict: |
| return job_dict[parameter_name] |
| else: |
| return default |
| |
| |
| def get_job_id_from_array_task(array_job_id, array_task_id, fatal=False, quiet=True): |
| """Returns the raw job id of a task of a job array. |
| |
| Args: |
| array_job_id (integer): The id of the job array. |
| array_task_id (integer): The id of the task of the job array. |
| fatal (boolean): If True, fails if the raw job id is not found in the system. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| The raw job id of the given task of a job array, or 0 if not found. |
| |
| Example: |
| >>> get_job_id_from_array_task(234, 2) |
| 241 |
| """ |
| |
| jobs_dict = get_jobs(quiet=quiet) |
| for job_id, job_values in jobs_dict.items(): |
| if ( |
| job_values["ArrayJobId"] == array_job_id |
| and job_values["ArrayTaskId"] == array_task_id |
| ): |
| return job_id |
| |
| if fatal: |
| pytest.fail(f"{array_job_id}_{array_task_id} was not found in the system") |
| |
| return 0 |
| |
| |
| def get_step_parameter(step_id, parameter_name, default=None, quiet=False): |
| """Returns the value of a specific parameter for a given step. |
| |
| Args: |
| step_id (string): The id of the step for which the parameter value is requested. |
| parameter_name (string): The name of the parameter whose value is to be obtained. |
| default (string or None): The value to be returned if the parameter is not found. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| |
| Returns: |
| The value of the specified step parameter, or the default value if the |
| parameter is not found. |
| |
| Example: |
| >>> get_step_parameter("45.0", 'NodeList') |
| 'node[2,4]' |
| >>> get_step_parameter("60.1", 'Partition', default='normal', quiet=True) |
| 'primary' |
| """ |
| |
| steps_dict = get_steps(step_id, quiet=quiet) |
| |
| if step_id not in steps_dict: |
| logging.debug(f"Step ({step_id}) was not found in the step list") |
| return default |
| |
| step_dict = steps_dict[step_id] |
| if parameter_name in step_dict: |
| return step_dict[parameter_name] |
| else: |
| return default |
| |
| |
| def wait_for_node_state_any( |
| nodename, |
| desired_node_states, |
| timeout=default_polling_timeout, |
| poll_interval=None, |
| fatal=False, |
| reverse=False, |
| ): |
| """Wait for any of the specified node states to be reached. |
| |
| Polls the node state every poll interval seconds, waiting up to the timeout |
| for the specified node state to be reached. |
| |
| Args: |
| nodename (string): The name of the node whose state is being monitored. |
| desired_node_states (iterable): The states that the node is expected to reach. |
| timeout (integer): The number of seconds to wait before timing out. |
| poll_interval (float): Number of seconds between node state polls. |
| fatal (boolean): If True, a timeout will cause the test to fail. |
| reverse (boolean): If True, wait for the node to lose the desired state. |
| |
| Returns: |
| Boolean value indicating whether the node ever reached the desired state. |
| |
| Example: |
| >>> wait_for_node_state_any('node1', ['IDLE', 'ALLOCATED'], timeout=60, poll_interval=5) |
| True |
| >>> wait_for_node_state_any('node2', ['DOWN'], timeout=30, fatal=True) |
| False |
| """ |
| |
| state_set = frozenset(desired_node_states) |
| |
| def any_overlap(state): |
| return bool(state_set & set(state)) != reverse |
| |
| # Wrapper for the repeat_until command to do all our state checking for us |
| repeat_until( |
| lambda: get_node_parameter(nodename, "state"), |
| any_overlap, |
| timeout=timeout, |
| poll_interval=poll_interval, |
| fatal=fatal, |
| ) |
| |
| return any_overlap(get_node_parameter(nodename, "state")) |
| |
| |
| def wait_for_node_state( |
| nodename, |
| desired_node_state, |
| timeout=default_polling_timeout, |
| poll_interval=None, |
| fatal=False, |
| reverse=False, |
| ): |
| """Wait for a specified node state to be reached. |
| |
| Polls the node state every poll interval seconds, waiting up to the timeout |
| for the specified node state to be reached. |
| |
| Args: |
| nodename (string): The name of the node whose state is being monitored. |
| desired_node_state (string): The state that the node is expected to reach. |
| timeout (integer): The number of seconds to wait before timing out. |
| poll_interval (float): Number of seconds between node state polls. |
| fatal (boolean): If True, a timeout will cause the test to fail. |
| reverse (boolean): If True, wait for the node to lose the desired state. |
| |
| Returns: |
| Boolean value indicating whether the node ever reached the desired state. |
| |
| Example: |
| >>> wait_for_node_state('node1', 'IDLE', timeout=60, poll_interval=5) |
| True |
| >>> wait_for_node_state('node2', 'DOWN', timeout=30, fatal=True) |
| False |
| """ |
| |
| # Figure out if we're waiting for the desired_node_state to be present or to be gone |
| def has_state(state): |
| return desired_node_state in state |
| |
| def not_state(state): |
| return desired_node_state not in state |
| |
| # Wrapper for the repeat_until command to do all our state checking for us |
| repeat_until( |
| lambda: get_node_parameter(nodename, "state"), |
| not_state if reverse else has_state, |
| timeout=timeout, |
| poll_interval=poll_interval, |
| fatal=fatal, |
| ) |
| |
| return (desired_node_state in get_node_parameter(nodename, "state")) != reverse |
| |
| |
| def wait_for_step(job_id, step_id, **repeat_until_kwargs): |
| """Wait for the specified step of a job to be running. |
| |
| Continuously polls the step state until it becomes running or until a |
| timeout occurs. |
| |
| Args: |
| job_id (integer): The id of the job. |
| step_id (integer): The id of the step within the job. |
| |
| Returns: |
| A boolean value indicating whether the specified step is running or not. |
| |
| Example: |
| >>> wait_for_step(1234, 0, timeout=60, poll_interval=5, fatal=True) |
| True |
| >>> wait_for_step(5678, 1, timeout=30) |
| False |
| """ |
| |
| step_str = f"{job_id}.{step_id}" |
| return repeat_until( |
| lambda: run_command_output(f"scontrol -o show step {step_str}"), |
| lambda out: re.search(rf"StepId={step_str}", out) is not None, |
| **repeat_until_kwargs, |
| ) |
| |
| |
| def wait_for_step_accounted(job_id, step_id, **repeat_until_kwargs): |
| """Wait for specified job step to appear in accounting database (`sacct`). |
| |
| Continuously polls the database until the step is accounted for or until a |
| timeout occurs. |
| |
| Args: |
| job_id (integer): The id of the job. |
| step_id (integer): The id of the step within the job. |
| |
| Returns: |
| A boolean value indicating whether the specified step is accounted for |
| in the database or not. |
| |
| Example: |
| >>> wait_for_step_accounted(1234, 0, timeout=60, poll_interval=5, fatal=True) |
| True |
| >>> wait_for_step_accounted(5678, 1, timeout=30) |
| False |
| """ |
| |
| step_str = f"{job_id}.{step_id}" |
| return repeat_until( |
| lambda: run_command_output(f"sacct -j {job_id} -o jobid"), |
| lambda out: re.search(rf"{step_str}", out) is not None, |
| **repeat_until_kwargs, |
| ) |
| |
| |
| def wait_for_job_accounted(job_id, **repeat_until_kwargs): |
| """Wait for specified job to appear in accounting database (`sacct`). |
| |
| Continuously polls the database until the job is accounted for or until a |
| timeout occurs. |
| |
| Args: |
| job_id (integer): The id of the job. |
| |
| Returns: |
| A boolean value indicating whether the specified job is accounted for |
| in the database or not. |
| |
| Example: |
| >>> wait_for_job_accounted(1234, timeout=60, poll_interval=5, fatal=True) |
| True |
| >>> wait_for_job_accounted(5678, timeout=30) |
| False |
| """ |
| |
| return repeat_until( |
| lambda: run_command_output(f"sacct -Xj {job_id} -o JobID"), |
| lambda out: re.search(rf"{job_id}", out) is not None, |
| **repeat_until_kwargs, |
| ) |
| |
| |
| def wait_for_job_state( |
| job_id, |
| desired_job_state, |
| desired_reason=None, |
| timeout=default_polling_timeout, |
| poll_interval=None, |
| fatal=False, |
| quiet=False, |
| xfail=False, |
| ): |
| """Wait for the specified job to reach the desired state. |
| |
| Continuously polls the job state until the desired state is reached or until |
| a timeout occurs. |
| |
| Some supported job states are aggregate states, which may include multiple |
| discrete states. Some logic is built-in to fail if a job reaches a state |
| that makes the desired job state impossible to reach. |
| |
| Current supported aggregate states: |
| - DONE |
| |
| Args: |
| job_id (integer): The id of the job. |
| desired_job_state (string): The desired state of the job. |
| desired_reason (string): Optional reason to also match. |
| timeout (integer): The number of seconds to poll before timing out. |
| poll_interval (float): Time (in seconds) between job state polls. |
| fatal (boolean): If True, a timeout will cause the test to fail. |
| quiet (boolean): If True, logging is performed at the TRACE log level. |
| xfail (boolean): If True, state (or reason) are not expected to be reached. |
| |
| Returns: |
| Boolean value indicating whether the job reached the desired state. |
| |
| Example: |
| >>> wait_for_job_state(1234, 'COMPLETED', timeout=300, poll_interval=10, fatal=True) |
| True |
| >>> wait_for_job_state(5678, 'RUNNING', timeout=60) |
| False |
| """ |
| |
| if poll_interval is None: |
| if timeout <= 5: |
| poll_interval = 0.1 |
| elif timeout <= 10: |
| poll_interval = 0.2 |
| else: |
| poll_interval = 1 |
| |
| if quiet: |
| log_level = logging.TRACE |
| else: |
| log_level = logging.DEBUG |
| |
| # We don't use repeat_until here because we support pseudo-job states and |
| # we want to allow early return (e.g. for a DONE state if we want RUNNING) |
| |
| xfail_str = "" |
| if xfail: |
| xfail_str = "not " |
| message = ( |
| f"Waiting for job ({job_id}) to {xfail_str}reach state {desired_job_state}" |
| ) |
| if desired_reason is not None: |
| message += f" and reason {desired_reason}" |
| logging.log(log_level, message) |
| |
| begin_time = time.time() |
| while time.time() < begin_time + timeout: |
| job_state = get_job_parameter( |
| job_id, "JobState", default="NOT_FOUND", quiet=True |
| ) |
| |
| message = f"Job ({job_id}) is in state {job_state}, but we are waiting for {desired_job_state}" |
| if job_state in [ |
| "NOT_FOUND", |
| "BOOT_FAIL", |
| "CANCELLED", |
| "COMPLETED", |
| "DEADLINE", |
| "FAILED", |
| "NODE_FAIL", |
| "OUT_OF_MEMORY", |
| "TIMEOUT", |
| "PREEMPTED", |
| ]: |
| if desired_job_state == "DONE" or job_state == desired_job_state: |
| message = f"Job ({job_id}) is in the {xfail_str}desired state {desired_job_state}" |
| reason = get_job_parameter( |
| job_id, "Reason", default="NOT_FOUND", quiet=True |
| ) |
| if desired_reason is None or reason == desired_reason: |
| if desired_reason is not None: |
| message += ( |
| f" with the {xfail_str}desired reason {desired_reason}" |
| ) |
| if not xfail: |
| logging.log(log_level, message) |
| else: |
| logging.warning(message) |
| return True |
| else: |
| message += ( |
| f", but with reason {reason} and we waited for {desired_reason}" |
| ) |
| |
| if not xfail: |
| if fatal: |
| pytest.fail(message) |
| else: |
| logging.warning(message) |
| else: |
| logging.log(log_level, message) |
| return False |
| elif job_state == desired_job_state: |
| message = ( |
| f"Job ({job_id}) is in the {xfail_str}desired state {desired_job_state}" |
| ) |
| reason = get_job_parameter( |
| job_id, "Reason", default="NOT_FOUND", quiet=True |
| ) |
| if desired_reason is None or reason == desired_reason: |
| if desired_reason is not None: |
| message += f" with the {xfail_str}desired reason {desired_reason}" |
| if not xfail: |
| logging.log(log_level, message) |
| else: |
| logging.warning(message) |
| return True |
| else: |
| message += ( |
| f", but with reason {reason} and we waited for {desired_reason}" |
| ) |
| |
| logging.log(log_level, message) |
| time.sleep(poll_interval) |
| |
| message = f"Job ({job_id}) did not reach the {desired_job_state} state" |
| if desired_reason is not None: |
| message += f" or the reason {desired_reason}" |
| message += f" within the {timeout} second(s) timeout" |
| if not xfail: |
| if fatal: |
| pytest.fail(message) |
| else: |
| logging.warning(message) |
| else: |
| logging.log(log_level, message) |
| |
| return False |
| |
| |
| def check_steps_delayed(job_id, job_output, expected_delayed): |
| """Check the output file of a job for expected delayed steps. |
| |
| This function checks that the output file for a job contains the expected |
| pattern of delayed job steps. Note that at the time of writing, this |
| requires srun steps to have at least a verbosity level of "-vv" to log |
| their f"srun: Step completed in JobId={job_id}, retrying" notification. |
| |
| Args: |
| job_id (integer): The job ID that we're interested in. |
| job_output (string): The content of the output file of the job. |
| expected_delayed (integer): The initial number of delayed job steps. It |
| is verified that this initial number of job steps are delayed and |
| then this number of delayed job steps decrements one by one as |
| running job steps finish. |
| |
| Returns: |
| True if steps were delayed in the correct amounts and order, else False. |
| |
| Example: |
| >>> check_steps_delayed(123, "srun: Received task exit notification for 1 task of StepId=1.0 (status=0x0000).\nsrun: node1: task 0: Completed\nsrun: debug: task 0 done\nsrun: Step completed in JobId=1, retrying\nsrun: Step completed in JobId=1, retrying\nsrun: Step created for StepId=1.1\nsrun: Received task exit notification for 1 task of StepId=1.1 (status=0x0000).\nsrun: node2: task 1: Completed\nsrun: debug: task 1 done\nsrun: debug: IO thread exiting\nsrun: Step completed in JobId=1, retrying\nsrun: Step created for StepId=1.2", 2) |
| True |
| >>> check_steps_delayed(456, "srun: Received task exit notification for 1 task of StepId=1.0 (status=0x0000).\nsrun: node1: task 0: Completed\nsrun: debug: task 0 done\nsrun: Step completed in JobId=1, retrying\nsrun: Step completed in JobId=1, retrying\nsrun: Step created for StepId=1.1\nsrun: Received task exit notification for 1 task of StepId=1.1 (status=0x0000).\nsrun: node2: task 1: Completed\nsrun: debug: task 1 done\nsrun: debug: IO thread exiting", 2) |
| False |
| """ |
| |
| # Iterate through each group of expected delayed steps. For example, |
| # if there was a job that had 5 steps that could run in parallel but, due to |
| # resource constrains, only allowed 3 steps to run at a time, we would |
| # expect a group of 2 delayed job steps followed by a group of 1 delayed job |
| # steps. For this example job, expected_delayed=2. |
| # |
| # The idea of the for loop below is to iterate through each group of delayed |
| # job steps and replace the expected output as we go with re.sub. This |
| # ensures that the delayed job step groups occur in the correct order. |
| # |
| # Each regex pattern matches part of the pattern we'd expect to see in the |
| # output, replaces the matched text (see previous paragraph), and then makes |
| # sure there is still text left to match for the rest of the pattern. If the |
| # regex pattern doesn't match anything, then re.sub will match and replace |
| # all the rest of the output and leave job_output empty. |
| for delayed_grp_size in range(expected_delayed, 0, -1): |
| # Match all lines before receiving an exit notification. This regex |
| # pattern will match any line that doesn't contain "srun: Received task |
| # exit notification". |
| before_start_pattern = r"(^((?!srun: Received task exit notification).)*$\n)*" |
| job_output = re.sub(before_start_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {before_start_pattern}") |
| return False |
| |
| # Match receiving the next exit notification. This regex pattern will |
| # match the line where the exit notification is received when a step |
| # that was already running finishes. |
| exit_pattern = rf"srun: Received task exit notification for \[0-9]+ task of StepId={job_id}\.[0-9]+ \(status=0x[0-9A-Fa-f]+\)\.\n" |
| job_output = re.sub(exit_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {exit_pattern}") |
| return False |
| |
| # Match lines we don't want before a step completion. After the exit |
| # notification, we now match all lines that don't contain "srun: Step |
| # completed". Sometimes an exit notification can be received multiple |
| # times and any redundant exit notifications are also matched by this |
| # pattern. |
| before_completed_pattern = r"(^((?!srun: Step completed).)*$\n)*" |
| job_output = re.sub(before_completed_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {before_completed_pattern}") |
| return False |
| |
| # Match number of lines retrying to start a delayed job step. Note that |
| # this pattern searched for steps retrying "delayed_grp_size" number of |
| # times. This is because every step that is delayed retries every time a |
| # previously running step finishes. |
| completed_pattern = rf"(srun: Step completed in JobId={job_id}, retrying\n){{{delayed_grp_size}}}" |
| job_output = re.sub(completed_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {completed_pattern}") |
| return False |
| |
| # Match lines we don't want before a step creation. Due to steps running |
| # in parallel, other lines of text can be output from already running |
| # steps before we're told a new step has been created. This regex |
| # pattern matches all lines that don't contain "srun: Step created". |
| before_created_pattern = r"(^((?!srun: Step created).)*$\n)*" |
| job_output = re.sub(before_created_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {before_created_pattern}") |
| return False |
| |
| # Match the step creation line for the delayed step |
| created_pattern = rf"srun: Step created for StepId={job_id}\.[0-9]+" |
| job_output = re.sub(created_pattern, "", job_output, 1, re.MULTILINE) |
| if not job_output: |
| logging.error(f"Pattern not found: {created_pattern}") |
| return False |
| |
| return True |
| |
| |
| def create_node(node_dict): |
| """Creates a node with the properties described by the supplied dictionary. |
| |
| This function is currently only used as a helper function within other |
| library functions (e.g. require_nodes). It modifies the Slurm configuration |
| file and restarts the relevant Slurm daemons. A backup is automatically |
| created, and the original configuration is restored after the test |
| completes. This function may only be used in auto-config mode. |
| |
| Args: |
| node_dict (dictionary): A dictionary containing the desired node |
| properties. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> create_node({'NodeName': 'new_node', 'CPUs': 4, 'RealMemory': 16384}) |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to add a node") |
| |
| config_file = f"{properties['slurm-config-dir']}/slurm.conf" |
| |
| # Read the original slurm.conf into a list of lines |
| output = run_command_output( |
| f"cat {config_file}", user=properties["slurm-user"], quiet=True |
| ) |
| original_config_lines = output.splitlines() |
| new_config_lines = original_config_lines.copy() |
| |
| # Locate the last NodeName definition |
| last_node_line_index = 0 |
| for line_index in range(len(original_config_lines)): |
| line = original_config_lines[line_index] |
| |
| if re.search(r"(?i)^ *NodeName", line) is not None: |
| last_node_line_index = line_index |
| if last_node_line_index == 0: |
| last_node_line_index = line_index |
| |
| # Build up the new node line |
| node_line = "" |
| if "NodeName" in node_dict: |
| node_line = f"NodeName={node_dict['NodeName']}" |
| node_range = node_dict.pop("NodeName") |
| if "Port" in node_dict: |
| node_line += f" Port={node_dict['Port']}" |
| node_dict.pop("Port") |
| for parameter_name, parameter_value in sorted(node_dict.items()): |
| node_line += f" {parameter_name}={parameter_value}" |
| |
| # Add the new node line |
| new_config_lines.insert(last_node_line_index + 1, node_line) |
| |
| # Write the config file back out with the modifications |
| backup_config_file("slurm") |
| new_config_string = "\n".join(new_config_lines) |
| run_command( |
| f"echo '{new_config_string}' > {config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Create the required node directories |
| for node_name in node_range_to_list(node_range): |
| spool_dir = properties["slurm-spool-dir"].replace("%n", node_name) |
| tmpfs_dir = properties["slurm-tmpfs"].replace("%n", node_name) |
| properties["nodes"].append(node_name) |
| |
| run_command(f"sudo mkdir -p {spool_dir}", fatal=True, quiet=True) |
| run_command(f"sudo mkdir -p {tmpfs_dir}", fatal=True, quiet=True) |
| |
| # Restart slurm if it is already running |
| if is_slurmctld_running(quiet=True): |
| restart_slurm(quiet=True) |
| |
| |
| # requirements_list is a list of (parameter_name, parameter_value) tuples. |
| # Uses non-live node info because must copy from existing node config line |
| # We implemented requirements_list as a list of tuples so that this could |
| # later be extended to include a comparator, etc. |
| # atf.require_nodes(1, [('CPUs', 4), ('RealMemory', 40)]) |
| # atf.require_nodes(2, [('Gres', 'gpu:1,mps:100')]) |
| def require_nodes(requested_node_count, requirements_list=[]): |
| """Ensure that a requested number of nodes have the required properties. |
| |
| In local-config mode, the test is skipped if an insufficient number of |
| nodes possess the required properties. |
| In auto-config mode, nodes are created as needed with the required |
| properties. |
| |
| Args: |
| requested_node_count (integer): Number of required nodes. |
| requirements_list (list of tuples): List of (parameter_name, |
| parameter_value) tuples. |
| |
| Currently supported node requirement types include: |
| CPUs |
| Cores |
| RealMemory |
| Gres |
| Features |
| |
| Other node requirement types will still be appended to the requirements, |
| but this could stop slurm from starting. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> require_nodes(2, [('CPUs', 4), ('RealMemory', 40)]) |
| >>> require_nodes(2, [('CPUs', 2), ('RealMemory', 30), ('Features', 'gpu,mpi')]) |
| >>> require_nodes(2, [('CPUs', 4), ('Sockets', 1)]) |
| """ |
| |
| # If using local-config and slurm is running, use live node information |
| # so that a test is not incorrectly skipped when slurm derives a non-single |
| # CPUTot while the slurm.conf does not contain a CPUs property. |
| if not properties["auto-config"] and is_slurmctld_running(quiet=True): |
| live = True |
| else: |
| live = False |
| |
| # This should return separate nodes and a DEFAULT (unless live) |
| nodes_dict = get_nodes(live=live, quiet=True) |
| original_nodes = {} |
| default_node = {} |
| |
| # Instantiate original node names from nodes_dict |
| for node_name in nodes_dict: |
| # Split out the default node |
| if node_name == "DEFAULT": |
| default_node = nodes_dict[node_name] |
| else: |
| original_nodes[node_name] = {} |
| |
| # Populate with any default parameters |
| if default_node: |
| for node_name in original_nodes: |
| for parameter_name, parameter_value in default_node.items(): |
| if parameter_name.lower() != "nodename": |
| original_nodes[node_name][parameter_name] = parameter_value |
| |
| # Merge in parameters from nodes_dict |
| for node_name in original_nodes: |
| for parameter_name, parameter_value in nodes_dict[node_name].items(): |
| if parameter_name.lower() != "nodename": |
| # Translate CPUTot to CPUs for screening qualifying nodes |
| if parameter_name.lower() == "cputot": |
| parameter_name = "CPUs" |
| original_nodes[node_name][parameter_name] = parameter_value |
| |
| # Check to see how many qualifying nodes we have |
| qualifying_node_count = 0 |
| node_count = 0 |
| nonqualifying_node_count = 0 |
| first_node_name = "" |
| first_qualifying_node_name = "" |
| node_indices = {} |
| augmentation_dict = {} |
| for node_name in sorted(original_nodes): |
| lower_node_dict = dict( |
| (key.lower(), value) for key, value in original_nodes[node_name].items() |
| ) |
| node_count += 1 |
| |
| if node_count == 1: |
| first_node_name = node_name |
| |
| # Build up node indices for use when having to create new nodes |
| match = re.search(r"^(.*?)(\d*)$", node_name) |
| node_prefix, node_index = match.group(1), match.group(2) |
| if node_index == "": |
| node_indices[node_prefix] = node_indices.get(node_prefix, []) |
| else: |
| node_indices[node_prefix] = node_indices.get(node_prefix, []) + [ |
| int(node_index) |
| ] |
| |
| node_qualifies = True |
| for requirement_tuple in requirements_list: |
| parameter_name, parameter_value = requirement_tuple[0:2] |
| if parameter_name in ["CPUs", "RealMemory"]: |
| if parameter_name.lower() in lower_node_dict: |
| if lower_node_dict[parameter_name.lower()] < parameter_value: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = parameter_value |
| else: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = parameter_value |
| elif parameter_name == "Cores": |
| boards = lower_node_dict.get("boards", 1) |
| sockets_per_board = lower_node_dict.get("socketsperboard", 1) |
| cores_per_socket = lower_node_dict.get("corespersocket", 1) |
| sockets = boards * sockets_per_board |
| cores = sockets * cores_per_socket |
| if cores < parameter_value: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict["CoresPerSocket"] = math.ceil( |
| parameter_value / sockets |
| ) |
| elif parameter_name == "Gres": |
| if parameter_name.lower() in lower_node_dict: |
| gres_list = parameter_value.split(",") |
| for gres_value in gres_list: |
| if match := re.search(r"^(\w+):(\d+)$", gres_value): |
| (required_gres_name, required_gres_value) = ( |
| match.group(1), |
| match.group(2), |
| ) |
| else: |
| pytest.fail( |
| "Gres requirement must be of the form <name>:<count>" |
| ) |
| if match := re.search( |
| rf"{required_gres_name}:(\d+)", |
| lower_node_dict[parameter_name.lower()], |
| ): |
| if match.group(1) < required_gres_value: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = gres_value |
| else: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = gres_value |
| else: |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = parameter_value |
| elif parameter_name == "Features": |
| required_features = set(parameter_value.split(",")) |
| node_features = set(lower_node_dict.get("features", "").split(",")) |
| if not required_features.issubset(node_features): |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if nonqualifying_node_count == 1: |
| augmentation_dict[parameter_name] = parameter_value |
| else: |
| logging.debug( |
| f"{parameter_name} is not a supported node requirement type." |
| ) |
| logging.debug( |
| f"{parameter_name}={parameter_value} will be added anyways!" |
| ) |
| augmentation_dict[parameter_name] = parameter_value |
| if node_qualifies: |
| node_qualifies = False |
| nonqualifying_node_count += 1 |
| if node_qualifies: |
| qualifying_node_count += 1 |
| if first_qualifying_node_name == "": |
| first_qualifying_node_name = node_name |
| |
| # Not enough qualifying nodes |
| if qualifying_node_count < requested_node_count: |
| # If auto-config, configure what is required |
| if properties["auto-config"]: |
| # Create new nodes to meet requirements ignoring default node0 |
| new_node_count = requested_node_count |
| |
| # If we already have a qualifying node, we will use it as the template |
| if qualifying_node_count > 0: |
| template_node_name = first_qualifying_node_name |
| template_node = nodes_dict[template_node_name].copy() |
| # Otherwise we will use the first node as a template and augment it |
| else: |
| template_node_name = first_node_name |
| template_node = nodes_dict[template_node_name].copy() |
| for parameter_name, parameter_value in augmentation_dict.items(): |
| template_node[parameter_name] = parameter_value |
| |
| base_port = int(nodes_dict[template_node_name]["Port"]) |
| |
| # Build up a list of available new indices starting after the template |
| match = re.search(r"^(.*?)(\d*)$", template_node_name) |
| template_node_prefix, template_node_index = match.group(1), int( |
| match.group(2) |
| ) |
| used_indices = sorted(node_indices[template_node_prefix]) |
| new_indices = [] |
| new_index = template_node_index |
| for i in range(new_node_count): |
| new_index += 1 |
| while new_index in used_indices: |
| new_index += 1 |
| new_indices.append(new_index) |
| |
| # Create a new aggregate node |
| # Later, we could consider collapsing the node into the template node if unmodified |
| new_node_dict = template_node.copy() |
| if new_node_count == 1: |
| new_node_dict["NodeName"] = template_node_prefix + str(new_indices[0]) |
| new_node_dict["Port"] = base_port - template_node_index + new_indices[0] |
| else: |
| new_node_dict["NodeName"] = ( |
| f"{template_node_prefix}[{list_to_range(new_indices)}]" |
| ) |
| new_node_dict["Port"] = list_to_range( |
| list( |
| map(lambda x: base_port - template_node_index + x, new_indices) |
| ) |
| ) |
| create_node(new_node_dict) |
| |
| # If local-config, skip |
| else: |
| message = f"This test requires {requested_node_count} nodes" |
| if requirements_list: |
| message += f" with {requirements_list}" |
| pytest.skip(message, allow_module_level=True) |
| |
| |
| def make_bash_script(script_name, script_contents): |
| """Creates an executable Bash script with the specified contents. |
| |
| Args: |
| script_name (string): Name of the script to create. |
| script_contents (string): Contents of the script. |
| |
| Returns: |
| None |
| |
| Example: |
| >>> make_bash_script("my_script.sh", "echo 'Hello, World!'") # This creates an executable Bash script named "my_script.sh" with the contents "echo 'Hello, World!'" |
| """ |
| |
| with open(script_name, "w") as f: |
| f.write("#!/bin/bash\n") |
| f.write(script_contents) |
| os.chmod(script_name, 0o0700) |
| |
| |
| def wait_for_file(file_name, **repeat_until_kwargs): |
| """Waits for the specified file to be present. |
| |
| This function waits up to the specified timeout seconds for the file to be |
| present, polling every poll_interval seconds. The default timeout and |
| poll_interval are inherited from repeat_until. |
| |
| Args: |
| file_name (string): The file name. |
| |
| Returns: |
| True if the file was found, False otherwise. |
| |
| Example: |
| >>> wait_for_file("my_file.txt", timeout=30, poll_interval=0.5) |
| True |
| """ |
| |
| logging.debug(f"Waiting for file ({file_name}) to be present") |
| return repeat_until( |
| lambda: os.path.isfile(file_name), lambda exists: exists, **repeat_until_kwargs |
| ) |
| |
| |
| # Assuming this will only be called internally after validating accounting is configured and auto-config is set |
| def backup_accounting_database(): |
| """Backs up the accounting database. |
| |
| This function may only be used in auto-config mode. The database dump is |
| automatically restored when the test ends. |
| |
| Args: |
| None |
| |
| Returns: |
| None |
| |
| Example: |
| >>> backup_accounting_database() # Backs up Slurm accounting database to file in the test's temporary directory. |
| """ |
| |
| if not properties["auto-config"]: |
| return |
| |
| mysqldump_path = shutil.which("mysqldump") |
| if mysqldump_path is None: |
| pytest.fail( |
| "Unable to backup the accounting database. mysqldump was not found in your path" |
| ) |
| mysql_path = shutil.which("mysql") |
| if mysql_path is None: |
| pytest.fail( |
| "Unable to backup the accounting database. mysql was not found in your path" |
| ) |
| |
| sql_dump_file = f"{str(module_tmp_path / '../../slurm_acct_db.sql')}" |
| |
| # If a dump already exists, issue a warning and return (honor existing dump) |
| if os.path.isfile(sql_dump_file): |
| logging.warning(f"Dump file already exists ({sql_dump_file})") |
| return |
| |
| slurmdbd_dict = get_config(live=False, source="slurmdbd", quiet=True) |
| database_host, database_port, database_name, database_user, database_password = ( |
| slurmdbd_dict.get(key) |
| for key in [ |
| "StorageHost", |
| "StoragePort", |
| "StorageLoc", |
| "StorageUser", |
| "StoragePass", |
| ] |
| ) |
| |
| mysql_options = "" |
| if database_host: |
| mysql_options += f" -h {database_host}" |
| if database_port: |
| mysql_options += f" -P {database_port}" |
| if database_user: |
| mysql_options += f" -u {database_user}" |
| else: |
| mysql_options += f" -u {properties['slurm-user']}" |
| if database_password: |
| mysql_options += f" -p {database_password}" |
| |
| if not database_name: |
| database_name = "slurm_acct_db" |
| |
| mysql_command = f"{mysql_path} {mysql_options} -e \"USE '{database_name}'\"" |
| if run_command_exit(mysql_command, quiet=True) != 0: |
| logging.debug(f"Slurm accounting database ({database_name}) is not present") |
| else: |
| mysqldump_command = ( |
| f"{mysqldump_path} {mysql_options} {database_name} > {sql_dump_file}" |
| ) |
| run_command( |
| mysqldump_command, fatal=True, quiet=False, timeout=default_sql_cmd_timeout |
| ) |
| |
| |
| def restore_accounting_database(): |
| """Restores the accounting database from the backup. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| None |
| |
| Returns: |
| None |
| |
| Example: |
| >>> restore_accounting_database() # Restores Slurm accounting database from previously created backup. |
| """ |
| |
| if not properties["auto-config"]: |
| return |
| |
| mysql_path = shutil.which("mysql") |
| if mysql_path is None: |
| pytest.fail( |
| "Unable to restore the accounting database. mysql was not found in your path" |
| ) |
| |
| slurmdbd_dict = get_config(live=False, source="slurmdbd", quiet=True) |
| database_host, database_port, database_name, database_user, database_password = ( |
| slurmdbd_dict.get(key) |
| for key in [ |
| "StorageHost", |
| "StoragePort", |
| "StorageLoc", |
| "StorageUser", |
| "StoragePass", |
| ] |
| ) |
| if not database_name: |
| database_name = "slurm_acct_db" |
| |
| base_command = mysql_path |
| if database_host: |
| base_command += f" -h {database_host}" |
| if database_port: |
| base_command += f" -P {database_port}" |
| if database_user: |
| base_command += f" -u {database_user}" |
| else: |
| base_command += f" -u {properties['slurm-user']}" |
| if database_password: |
| base_command += f" -p {database_password}" |
| |
| # If DB exists, drop it and try to restore the dump file |
| mysql_command = f"{base_command} -e \"USE '{database_name}'\"" |
| if run_command_exit(mysql_command, quiet=True) == 0: |
| run_command( |
| f'{base_command} -e "drop database {database_name}"', |
| fatal=True, |
| quiet=False, |
| timeout=default_sql_cmd_timeout, |
| ) |
| |
| sql_dump_file = f"{str(module_tmp_path / '../../slurm_acct_db.sql')}" |
| |
| # If the dump file doesn't exist, it has probably already been |
| # restored by a previous call to restore_accounting_database |
| if not os.path.isfile(sql_dump_file): |
| logging.debug( |
| f"Slurm accounting database backup ({sql_dump_file}) is s not present. It has probably already been restored." |
| ) |
| return |
| |
| dump_stat = os.stat(sql_dump_file) |
| if not (dump_stat.st_size == 0 and dump_stat.st_mode & stat.S_ISVTX): |
| run_command( |
| f'{base_command} -e "create database {database_name}"', |
| fatal=True, |
| quiet=False, |
| ) |
| run_command( |
| f"{base_command} {database_name} < {sql_dump_file}", |
| fatal=True, |
| quiet=False, |
| timeout=default_sql_cmd_timeout, |
| ) |
| |
| # In either case, remove the dump file |
| run_command(f"rm -f {sql_dump_file}", fatal=True, quiet=False) |
| |
| |
| def run_check_test(source_file, build_args=""): |
| """Compiles and runs a libcheck test |
| Args: |
| source_file (string): The name of the source file, relative to testsuite_check_dir. |
| build_args (string): Additional string to be appended to the build command. |
| """ |
| |
| import xmltodict |
| |
| check_test = ( |
| f"{module_tmp_path}/{os.path.splitext(os.path.basename(source_file))[0]}" |
| ) |
| xml_test = check_test + ".xml" |
| |
| compile_against_libslurm( |
| f"{properties['testsuite_check_dir']}/{source_file}", |
| check_test, |
| full=True, |
| build_args=build_args + "-lcheck -lm -lsubunit", |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Run the libcheck test setting an xml output |
| result = run_command( |
| check_test, quiet=True, env_vars=f"CK_XML_LOG_FILE_NAME={xml_test}" |
| ) |
| |
| # Parse the xml output |
| if not os.path.exists(xml_test): |
| pytest.fail(f"Test results not found: {xml_test}") |
| |
| with open(xml_test) as f: |
| xml_data = xmltodict.parse(f.read()) |
| |
| logging.info(f"{result['stdout']}") |
| if result["exit_code"]: |
| logging.error(f"\n{result['stderr']}") |
| |
| return xml_data["testsuites"]["suite"] |
| |
| |
| def compile_against_libslurm( |
| source_file, |
| dest_file, |
| build_args="", |
| full=False, |
| shared=False, |
| new_prefixes=False, |
| **run_command_kwargs, |
| ): |
| """Compiles a test program against either libslurm.so or libslurmfull.so. |
| |
| This function compiles the specified source file against the Slurm library, |
| either libslurm.so or libslurmfull.so, and creates the target binary file. |
| |
| Args: |
| source_file (string): The name of the source file. |
| dest_file (string): The name of the target binary file. |
| build_args (string): Additional string to be appended to the build command. |
| full (boolean): Use libslurmfull.so instead of libslurm.so. |
| shared (boolean): Produces a shared library (adds the -shared compiler option |
| and adds a .so suffix to the output file name). |
| **run_command_kwargs: Auxiliary arguments to be passed to the |
| run_command function (e.g., quiet, fatal, timeout, etc.). |
| |
| Returns: |
| None |
| |
| Example: |
| >>> compile_against_libslurm("my_test.c", "my_test", build_args="-Wall -Werror") |
| """ |
| |
| slurm_prefix = properties["slurm-prefix"] |
| slurm_source = properties["slurm-source-dir"] |
| slurm_build = properties["slurm-build-dir"] |
| if new_prefixes: |
| slurm_prefix = properties["new-slurm-prefix"] |
| slurm_source = properties["new-source-prefix"] |
| slurm_build = properties["new-build-prefix"] |
| |
| if full: |
| slurm_library = "slurmfull" |
| else: |
| slurm_library = "slurm" |
| if os.path.isfile(f"{slurm_prefix}/lib64/slurm/lib{slurm_library}.so"): |
| lib_dir = "lib64" |
| else: |
| lib_dir = "lib" |
| if full: |
| lib_path = f"{slurm_prefix}/{lib_dir}/slurm" |
| else: |
| lib_path = f"{slurm_prefix}/{lib_dir}" |
| |
| command = f"gcc {source_file} -g -pthread" |
| if shared: |
| command += " -fPIC -shared" |
| command += f" -o {dest_file}" |
| command += f" -I{slurm_source} -I{slurm_build} -I{slurm_prefix}/include -Wl,-rpath={lib_path} -L{lib_path} -l{slurm_library} -lresolv" |
| if build_args != "": |
| command += f" {build_args}" |
| run_command(command, **run_command_kwargs) |
| |
| |
| def get_partitions(**run_command_kwargs): |
| """Returns the Slurm partition configuration as a dictionary of dictionaries. |
| |
| Args: |
| **run_command_kwargs: Auxiliary arguments to be passed to the |
| run_command function (e.g., quiet, fatal, timeout, etc.). |
| |
| Returns: |
| A dictionary of dictionaries, where the first-level keys are the |
| partition names, and the values are dictionaries containing the |
| configuration parameters for the respective partitions. |
| |
| Example: |
| >>> get_partitions(quiet=True) |
| {'partition1': {'PartitionName': 'partition1', 'AllowGroups': 'ALL', 'Defaults': 'YES', ...}, |
| 'partition2': {'PartitionName': 'partition2', 'AllowGroups': 'group1,group2', 'Defaults': 'YES', ...}} |
| """ |
| |
| partitions_dict = {} |
| |
| output = run_command_output( |
| "scontrol show partition -o", fatal=True, **run_command_kwargs |
| ) |
| |
| partition_dict = {} |
| for line in output.splitlines(): |
| if line == "": |
| continue |
| |
| while match := re.search(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", line): |
| param_name, param_value = match.group(1), match.group(2) |
| |
| # Remove the consumed parameter from the line |
| line = re.sub(r"^ *([^ =]+)=(.*?)(?= +[^ =]+=| *$)", "", line) |
| |
| # Reformat the value if necessary |
| if is_integer(param_value): |
| param_value = int(param_value) |
| elif is_float(param_value): |
| param_value = float(param_value) |
| elif param_value == "(null)": |
| param_value = None |
| |
| # Add it to the temporary partition dictionary |
| partition_dict[param_name] = param_value |
| |
| # Add the partition dictionary to the partitions dictionary |
| partitions_dict[partition_dict["PartitionName"]] = partition_dict |
| |
| # Clear the partition dictionary for use by the next partition |
| partition_dict = {} |
| |
| return partitions_dict |
| |
| |
| def get_partition_parameter(partition_name, parameter_name, default=None): |
| """Obtains the value for a Slurm partition configuration parameter. |
| |
| This function retrieves the value of the specified parameter for the given |
| partition. If the parameter is not present, the default value is returned. |
| |
| Args: |
| partition_name (string): The name of the partition. |
| parameter_name (string): The name of the parameter to retrieve. |
| default (string or None): The default value to return if the parameter is |
| not found. |
| |
| Returns: |
| The value of the specified partition parameter, or the default if not |
| found. |
| |
| Example: |
| >>> get_partition_parameter('my_partition', 'AllowAccounts') |
| 'ALL' |
| >>> get_partition_parameter('second_partition', 'DefaultTime', '00:30:00') |
| '00:30:00' |
| """ |
| |
| partitions_dict = get_partitions() |
| |
| if partition_name in partitions_dict: |
| partition_dict = partitions_dict[partition_name] |
| else: |
| pytest.fail( |
| f"Partition ({partition_name}) was not found in the partition configuration" |
| ) |
| |
| if parameter_name in partition_dict: |
| return partition_dict[parameter_name] |
| else: |
| return default |
| |
| |
| def set_partition_parameter(partition_name, new_parameter_name, new_parameter_value): |
| """Sets the value of the specified partition configuration parameter. |
| |
| This function the specified partition property and reconfigures |
| the slurm daemons. A backup is automatically created and the |
| original configuration is restored after the test completes. |
| |
| This function may only be used in auto-config mode. |
| |
| Args: |
| partition_name (string): The partition name. |
| new_parameter_name (string): The parameter name. |
| new_parameter_value (string): The parameter value. |
| Use a value of None to unset a partition parameter. |
| |
| Example: |
| >>> set_partition_parameter('partition1', 'MaxTime', 'INFINITE') |
| """ |
| |
| if not properties["auto-config"]: |
| require_auto_config("wants to modify partition parameters") |
| |
| config_file = f"{properties['slurm-config-dir']}/slurm.conf" |
| |
| # Read the original slurm.conf into a list of lines |
| output = run_command_output( |
| f"cat {config_file}", user=properties["slurm-user"], quiet=True |
| ) |
| original_config_lines = output.splitlines() |
| new_config_lines = original_config_lines.copy() |
| |
| # Locate the partition among the various Partition definitions |
| found_partition_name = False |
| for line_index in range(len(original_config_lines)): |
| line = original_config_lines[line_index] |
| |
| words = re.split(r" +", line.strip()) |
| if len(words) < 1: |
| continue |
| if words[0][0] == "#": |
| continue |
| parameter_name, parameter_value = words[0].split("=", 1) |
| if parameter_name.lower() != "partitionname": |
| continue |
| |
| if parameter_value == partition_name: |
| # We found a matching PartitionName line |
| found_partition_name = True |
| |
| # Read in the partition parameters |
| original_partition_parameters = collections.OrderedDict() |
| for word in words[1:]: |
| parameter_name, parameter_value = word.split("=", 1) |
| original_partition_parameters[parameter_name] = parameter_value |
| |
| # Delete the original partition definition |
| new_config_lines.pop(line_index) |
| |
| # Add the modified definition for the specified partition |
| modified_partition_parameters = original_partition_parameters.copy() |
| if new_parameter_value is None: |
| if new_parameter_name in modified_partition_parameters: |
| del modified_partition_parameters[new_parameter_name] |
| else: |
| modified_partition_parameters[new_parameter_name] = new_parameter_value |
| modified_partition_line = f"PartitionName={partition_name}" |
| for ( |
| parameter_name, |
| parameter_value, |
| ) in modified_partition_parameters.items(): |
| modified_partition_line += f" {parameter_name}={parameter_value}" |
| new_config_lines.insert(line_index, modified_partition_line) |
| |
| break |
| |
| if not found_partition_name: |
| pytest.fail( |
| f"Invalid partition name specified in set_partition_parameter(). Partition {partition_name} does not exist" |
| ) |
| |
| # Write the config file back out with the modifications |
| backup_config_file("slurm") |
| new_config_string = "\n".join(new_config_lines) |
| run_command( |
| f"echo '{new_config_string}' > {config_file}", |
| user=properties["slurm-user"], |
| fatal=True, |
| quiet=True, |
| ) |
| |
| # Reconfigure slurm controller if it is already running |
| if is_slurmctld_running(quiet=True): |
| run_command("scontrol reconfigure", user=properties["slurm-user"], quiet=True) |
| |
| |
| def default_partition(): |
| """Returns the name of the default Slurm partition. |
| |
| This function retrieves the Slurm partition configuration and returns the |
| name of the partition that is marked as the default. |
| |
| Args: |
| None |
| |
| Returns: |
| The name of the default Slurm partition. |
| |
| Example: |
| >>> default_partition() |
| 'my_default_partition' |
| """ |
| |
| partitions_dict = get_partitions() |
| |
| for partition_name in partitions_dict: |
| if partitions_dict[partition_name]["Default"] == "YES": |
| return partition_name |
| |
| |
| # This is supplied for ease-of-use in test development only. |
| # Tests should not use this permanently. Use logging.debug() instead. |
| def log_debug(msg): |
| logging.debug(msg) |
| |
| |
| ############################################################################## |
| # ATF module initialization |
| ############################################################################## |
| |
| |
| # This is a logging filter that adds a new LogRecord traceback attribute |
| class TraceBackFilter(logging.Filter): |
| def filter(self, record): |
| call_stack = [] |
| within_atf_context = False |
| |
| for frame_summary in (traceback.extract_stack())[-5::-1]: |
| if within_atf_context: |
| if "testsuite/python" not in frame_summary.filename: |
| break |
| else: |
| if "testsuite/python" in frame_summary.filename: |
| within_atf_context = True |
| else: |
| continue |
| |
| function = frame_summary.name |
| short_filename = frame_summary.filename.rpartition("testsuite/python/")[2] |
| lineno = frame_summary.lineno |
| |
| call_stack.append(f"{function}@{short_filename}:{lineno}") |
| |
| record.traceback = ",".join(call_stack) |
| |
| return True |
| |
| |
| # Add a new traceback LogRecord attribute |
| logging.getLogger().addFilter(TraceBackFilter()) |
| |
| # Add a custom TRACE logging level |
| # This has to be done early enough to allow pytest --log-level=TRACE to be used |
| logging.TRACE = logging.NOTSET + 5 |
| logging.addLevelName(logging.TRACE, "TRACE") |
| |
| |
| def _trace(message, *args, **kwargs): |
| logging.log(logging.TRACE, message, *args, **kwargs) |
| |
| |
| logging.trace = _trace |
| logging.getLogger().trace = _trace |
| |
| # Add a custom NOTE logging level in between INFO and DEBUG |
| logging.NOTE = logging.DEBUG + 5 |
| logging.addLevelName(logging.NOTE, "NOTE") |
| |
| |
| def _note(message, *args, **kwargs): |
| logging.log(logging.NOTE, message, *args, **kwargs) |
| |
| |
| logging.note = _note |
| logging.getLogger().note = _note |
| |
| # The module-level temporary directory is initialized in conftest.py |
| module_tmp_path = None |
| |
| # Instantiate and populate testrun-level properties |
| properties = {} |
| |
| # Initialize directory properties |
| properties["testsuite_base_dir"] = str(pathlib.Path(__file__).resolve().parents[2]) |
| properties["testsuite_python_lib"] = properties["testsuite_base_dir"] + "/python/lib" |
| properties["slurm-source-dir"] = str(pathlib.Path(__file__).resolve().parents[3]) |
| properties["slurm-build-dir"] = properties["slurm-source-dir"] |
| properties["slurm-prefix"] = "/usr/local" |
| properties["testsuite_scripts_dir"] = ( |
| properties["testsuite_base_dir"] + "/python/scripts" |
| ) |
| properties["testsuite_check_dir"] = properties["testsuite_base_dir"] + "/python/check" |
| properties["influxdb_host"] = "localhost" |
| properties["influxdb_port"] = 8086 |
| properties["influxdb_db"] = "slurm" |
| |
| # Override directory properties with values from testsuite.conf file |
| testsuite_config = {} |
| # The default location for the testsuite.conf file (in SRCDIR/testsuite) |
| # can be overridden with the SLURM_TESTSUITE_CONF environment variable. |
| testsuite_config_file = os.getenv( |
| "SLURM_TESTSUITE_CONF", f"{properties['testsuite_base_dir']}/testsuite.conf" |
| ) |
| if not os.path.isfile(testsuite_config_file): |
| pytest.fail( |
| f"The unified testsuite configuration file (testsuite.conf) was not found. This file can be created from a copy of the autogenerated sample found in BUILDDIR/testsuite/testsuite.conf.sample. By default, this file is expected to be found in SRCDIR/testsuite ({properties['testsuite_base_dir']}). If placed elsewhere, set the SLURM_TESTSUITE_CONF environment variable to the full path of your testsuite.conf file." |
| ) |
| with open(testsuite_config_file, "r") as f: |
| for line in f.readlines(): |
| if match := re.search(r"^\s*(\w+)\s*=\s*(.*)$", line): |
| testsuite_config[match.group(1).lower()] = match.group(2) |
| if "slurmsourcedir" in testsuite_config: |
| properties["slurm-source-dir"] = testsuite_config["slurmsourcedir"] |
| if "slurmbuilddir" in testsuite_config: |
| properties["slurm-build-dir"] = testsuite_config["slurmbuilddir"] |
| if "slurminstalldir" in testsuite_config: |
| properties["slurm-prefix"] = testsuite_config["slurminstalldir"] |
| if "slurmconfigdir" in testsuite_config: |
| properties["slurm-config-dir"] = testsuite_config["slurmconfigdir"] |
| |
| if "influxdb_host" in testsuite_config: |
| properties["influxdb_host"] = properties["influxdb_host"] |
| if "influxdb_port" in testsuite_config: |
| properties["influxdb_host"] = properties["influxdb_port"] |
| if "influxdb_db" in testsuite_config: |
| properties["influxdb_db"] = properties["influxdb_db"] |
| |
| # Set derived directory properties |
| # The environment (e.g. PATH, SLURM_CONF) overrides the configuration. |
| # If the Slurm clients and daemons are not in the current PATH |
| # but can be found using the configured SlurmInstallDir, add the |
| # derived bin and sbin dir to the current PATH. |
| properties["slurm-bin-dir"] = f"{properties['slurm-prefix']}/bin" |
| if squeue_path := shutil.which("squeue"): |
| properties["slurm-bin-dir"] = os.path.dirname(squeue_path) |
| elif os.access(f"{properties['slurm-bin-dir']}/squeue", os.X_OK): |
| os.environ["PATH"] += ":" + properties["slurm-bin-dir"] |
| properties["slurm-sbin-dir"] = f"{properties['slurm-prefix']}/sbin" |
| if slurmctld_path := shutil.which("slurmctld"): |
| properties["slurm-sbin-dir"] = os.path.dirname(slurmctld_path) |
| elif os.access(f"{properties['slurm-sbin-dir']}/slurmctld", os.X_OK): |
| os.environ["PATH"] += ":" + properties["slurm-sbin-dir"] |
| properties["slurm-config-dir"] = re.sub( |
| r"\${prefix}", properties["slurm-prefix"], properties["slurm-config-dir"] |
| ) |
| if slurm_conf_path := os.getenv("SLURM_CONF"): |
| properties["slurm-config-dir"] = os.path.dirname(slurm_conf_path) |
| |
| # Derive the slurm-user value |
| properties["slurm-user"] = "root" |
| slurm_config_file = f"{properties['slurm-config-dir']}/slurm.conf" |
| if not os.path.isfile(slurm_config_file): |
| pytest.fail( |
| f"The python testsuite was expecting your slurm.conf to be found in {properties['slurm-config-dir']}. Please create it or use the SLURM_CONF environment variable to indicate its location." |
| ) |
| if os.access(slurm_config_file, os.R_OK): |
| with open(slurm_config_file, "r") as f: |
| for line in f.readlines(): |
| if match := re.search(r"^\s*(?i:SlurmUser)\s*=\s*(.*)$", line): |
| properties["slurm-user"] = match.group(1) |
| else: |
| # slurm.conf is not readable as test-user. We will try reading it as root |
| results = run_command( |
| f"grep -i SlurmUser {slurm_config_file}", user="root", quiet=True |
| ) |
| if results["exit_code"] == 0: |
| pytest.fail(f"Unable to read {slurm_config_file}") |
| for line in results["stdout"].splitlines(): |
| if match := re.search(r"^\s*(?i:SlurmUser)\s*=\s*(.*)$", line): |
| properties["slurm-user"] = match.group(1) |
| |
| properties["submitted-jobs"] = [] |
| properties["test-user"] = pwd.getpwuid(os.getuid()).pw_name |
| properties["auto-config"] = False |
| properties["allow-slurmdbd-modify"] = False |
| properties["slurmrestd-started"] = False |
| |
| # Instantiate a nodes dictionary. These are populated in require_slurm_running. |
| nodes = {} |
| |
| # Check if user has sudo privileges |
| results = subprocess.run( |
| "sudo -ln | grep -q '(ALL.*) NOPASSWD: ALL'", |
| shell=True, |
| capture_output=True, |
| text=True, |
| ) |
| if results.returncode == 0: |
| properties["sudo-rights"] = True |
| else: |
| properties["sudo-rights"] = False |