blob: 25fb038c3b245cc5d9bf9e2c6d08c0fe868e195b [file] [edit]
############################################################################
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved
############################################################################
import csv
import logging
from pathlib import Path
import atf
import pytest
id_cols = ["Step", "Node", "Series"]
summarized_cols = [
"EpochTime",
"CPUFrequency",
"CPUUtilization",
"CPUTime",
"GPUMemMB",
"GPUUtilization",
"RSS",
"VMSize",
"Pages",
"ReadMB",
"WriteMB",
]
int_value_cols = [
"EpochTime",
"CPUFrequency",
"GPUMemMB",
"RSS",
"VMSize",
"Pages",
]
listed_cols = summarized_cols + ["ElapsedTime"]
@pytest.fixture(scope="module")
def hdf5_dir():
return Path("h5").absolute()
@pytest.fixture(scope="module", autouse=True)
def setup(hdf5_dir):
atf.require_nodes(2)
atf.require_config_parameter("AcctGatherProfileType", "acct_gather_profile/hdf5")
atf.require_config_parameter(
"ProfileHDF5Dir",
hdf5_dir,
source="acct_gather",
)
atf.require_config_parameter(
"JobAcctGatherType",
"jobacct_gather/cgroup",
)
atf.require_slurm_running()
@pytest.fixture(scope="module")
def user_profile_dir(hdf5_dir):
return hdf5_dir / atf.properties["test-user"]
@pytest.fixture(scope="module")
def job_id(user_profile_dir):
"""Generate a profile and return the job id"""
job_id = atf.submit_job_srun(
"--acctg-freq=task=2 --profile=task -N2 --ntasks-per-node=1 sleep 6",
fatal=True,
)
yield job_id
atf.run_command(
f"rm -f {user_profile_dir}/{job_id}_*.h5",
fatal=False,
quiet=True,
)
def get_node_profiles(user_profile_dir, job_id):
output = atf.run_command_output(
f"ls -1 {user_profile_dir}/{job_id}_*.h5",
fatal=False,
quiet=True,
)
return sorted(Path(profile) for profile in output.splitlines())
def profile_test(args):
return (
atf.run_command_exit(
f"test {args}",
quiet=True,
)
== 0
)
def wait_for_node_profiles(user_profile_dir, job_id):
profiles = None
for t in atf.timer():
profiles = get_node_profiles(user_profile_dir, job_id)
if len(profiles) == 2:
break
else:
pytest.fail(f"Unable to get 2 profiles: {profiles}")
return profiles
@pytest.fixture(scope="module")
def node_profiles(user_profile_dir, job_id):
return wait_for_node_profiles(user_profile_dir, job_id)
@pytest.fixture(scope="module")
def hdf5_file(job_id, node_profiles):
hdf5_file = Path("test.h5").absolute()
atf.run_command(f"sh5util --savefiles -j {job_id} -o {hdf5_file}", fatal=True)
return hdf5_file
def assert_schema(field_names, expected_cols):
missing_cols = set(expected_cols) - set(field_names)
assert not missing_cols, f"Missing expected column(s): {missing_cols}"
def read_sh5util_csv(job_id, hdf5_file, opts, expected_cols):
output_file = "output"
cmd = f"sh5util -i {hdf5_file} -o {output_file} -j {job_id} -s Tasks {opts}"
atf.run_command(cmd, fatal=True)
with open(output_file, newline="") as csv_file:
reader = csv.DictReader(csv_file)
cols = reader.fieldnames
logging.debug(f"CSV cols: {cols}")
assert cols, f"Output file ({output_file}) is empty"
assert_schema(cols, expected_cols)
rows = list(reader)
logging.debug(f"CSV contents: {rows}")
assert rows, f"No data in output file ({output_file})"
return rows
@pytest.fixture(scope="module")
def time_series_rows(job_id, hdf5_file):
return read_sh5util_csv(
job_id, hdf5_file, "-E --level=Node:TimeSeries", id_cols + listed_cols
)
@pytest.fixture(scope="module")
def totals_rows(job_id, hdf5_file):
metrics = ["Min", "Max", "Sum", "Avg"]
cols = id_cols + ["ElapsedTime"]
for col in summarized_cols:
for metric in metrics:
cols.append(f"{metric}_{col}")
return read_sh5util_csv(job_id, hdf5_file, "-E --level=Node:Totals", cols)
def test_profile_dir(hdf5_dir, user_profile_dir, job_id):
"""Profiles land in the user's subdirectory"""
profiles = wait_for_node_profiles(user_profile_dir, job_id)
assert profile_test(f"-d {user_profile_dir}")
assert len(profiles) == 2, "Expected two profiles for a two node job"
assert profiles == get_node_profiles(user_profile_dir, job_id)
def test_merge(hdf5_file, node_profiles):
"""Merging profiles produces a new, non-empty output file and saves inputs"""
assert hdf5_file.exists(), f"Merge did not create {hdf5_file}"
assert hdf5_file.stat().st_size > 0, f"Merge created empty {hdf5_file}"
for profile in node_profiles:
assert profile_test(f"-f {profile}"), f"Profile {profile} was not saved"
def test_hdf5_utils(node_profiles, hdf5_file):
"""Generated hdf5 files are compatible with common hdf5 utilities"""
atf.require_tool("h5dump")
for node_profile in node_profiles:
assert atf.run_command_exit(f"h5dump -H {node_profile}") == 0
assert atf.run_command_exit(f"h5dump -H {hdf5_file}") == 0
def test_listed_fields(job_id, hdf5_file):
"""Listing fields produces the correct schema"""
output = atf.run_command_output(
f"sh5util -i {hdf5_file} -j {job_id} --series=Tasks -L",
fatal=True,
)
assert_schema(output.strip().splitlines(), listed_cols)
def assert_count(values, count, description):
assert (
len(values) == count
), f"Expected {count} {description}, saw: {len(values)} ({values})"
def row_identity(row):
return tuple(row[col] for col in id_cols)
def grouped_by_id(rows):
grouped = {}
for row in rows:
row_id = row_identity(row)
if row_id not in grouped:
grouped[row_id] = []
grouped[row_id].append(row)
return grouped
def assert_close(field, metric, identity, observed_value, expected_value, sample_count):
if isinstance(expected_value, int):
target = expected_value
else:
target = pytest.approx(
expected_value, rel=1e-12, abs=max(1e-5, sample_count * 1e-6)
)
assert observed_value == target, f"{metric}_{field} for {identity}"
def assert_ids(ids):
steps = {x[0] for x in ids}
nodes = {x[1] for x in ids}
series = {x[2] for x in ids}
assert_count(steps, 1, "Step(s)")
assert_count(nodes, 2, "Node(s)")
assert_count(series, 2, "Series")
@pytest.mark.parametrize("field", summarized_cols)
def test_totals_match_time_series(field, time_series_rows, totals_rows):
"""Node:Totals values match the extracted Node:TimeSeries samples"""
time_series_by_id = grouped_by_id(time_series_rows)
totals_by_id = {row_identity(row): row for row in totals_rows}
assert time_series_by_id.keys() == totals_by_id.keys()
assert_ids(time_series_by_id)
is_int_field = field in int_value_cols
for identity, rows in time_series_by_id.items():
values = [
int(row[field]) if is_int_field else float(row[field]) for row in rows
]
expected_values = {
"Min": min(values),
"Max": max(values),
"Sum": sum(values),
"Avg": sum(values) / len(values),
}
for metric, expected_value in expected_values.items():
raw = totals_by_id[identity][f"{metric}_{field}"]
totals_value = int(raw) if is_int_field and metric != "Avg" else float(raw)
assert_close(
field, metric, identity, totals_value, expected_value, len(values)
)
@pytest.mark.xfail(
atf.get_version("bin/sh5util") < (26, 5),
reason="Ticket 25241: Item extract 'Malformed file' error fixed in sh5util >= 26.05",
)
def test_item_extract(job_id, hdf5_file, time_series_rows):
"""Item-extract mode data is consistent"""
nodes = sorted({row["Node"] for row in time_series_rows})
item_cols = [
"MinNode",
"MinValue",
"MaxNode",
"MaxValue",
"Sum",
"Avg",
"NumNodes",
*nodes,
]
for col in summarized_cols:
item_rows = read_sh5util_csv(job_id, hdf5_file, f"-I --data={col}", item_cols)
for row in item_rows:
num_nodes = row["NumNodes"]
assert (
num_nodes.isdigit()
), f"{col} item extract has invalid NumNodes: {num_nodes}"
assert (
1 <= int(num_nodes) <= len(nodes)
), f"{col} item extract has invalid NumNodes: {num_nodes}"