blob: a2528341259e7a80c712901c316e0aea76662cf8 [file] [log] [blame] [edit]
############################################################################
# Copyright (C) SchedMD LLC.
############################################################################
import atf
from datetime import datetime
import logging
import math
import pytest
import re
import threading
import time
# Global variables
cluster = "test_cluster1"
# total number of rows created for all tables in purge type
row_count_total = 500000
# purge for those rows should happen in less than this time
max_perf_time_default = 60
table_gen_info = [
{
"type": "Event",
"tables": [
{
"name": f"{cluster}_event_table",
"sql": "(time_start,time_end,reason) values (@seq,@seq,'')",
# Ex. override row count and set fixed amount if needed
# 'row_count' : 100,
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "Job",
"tables": [
{
"name": f"{cluster}_job_table",
"sql": "(cpus_req,job_name,id_assoc,id_job,id_resv,id_wckey,id_user,id_group,het_job_id,het_job_offset,state_reason_prev,nodes_alloc,\\`partition\\`,priority,state,time_end,env_hash_inx,script_hash_inx) values (0,'',0,@seq,0,0,0,0,0,0,0,0,'',0,0,@seq,@seq,@seq)",
},
# job_table will be used to seed remaining job tables to speed things up
{
"name": f"{cluster}_job_env_table",
"sql": f"(hash_inx,env_hash) select env_hash_inx,env_hash_inx from {cluster}_job_table",
"from_template": True,
},
{
"name": f"{cluster}_job_script_table",
"sql": f"(hash_inx,script_hash) select script_hash_inx,script_hash_inx from {cluster}_job_table",
"from_template": True,
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "Resv",
"tables": [
{
"name": f"{cluster}_resv_table",
"sql": "(id_resv,time_end,resv_name) values (@seq,@seq,'')",
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "Step",
"tables": [
{
"name": f"{cluster}_step_table",
"sql": "(job_db_inx,id_step,nodelist,nodes_alloc,state,step_name,task_cnt,time_end) values (@seq,@seq,'',0,0,'',0,@seq)",
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "Suspend",
"tables": [
{
"name": f"{cluster}_suspend_table",
"sql": "(job_db_inx,time_end,id_assoc) values (@seq,@seq,0)",
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "TXN",
"tables": [
{
"name": "txn_table",
"sql": f"(timestamp,cluster,action,name,actor) values (@seq,'{cluster}',0,'','')",
},
],
"max_perf_time": max_perf_time_default,
},
{
"type": "Usage",
"tables": [
# qos usage tables added in 24.11 but not purged until 25.05.3, list them first for row count calculation later
{
"name": f"{cluster}_qos_usage_hour_table",
"sql": "(creation_time,mod_time,id,time_start) values (@seq,@seq,1,@seq)",
"minversion": (25, 5, 3),
},
# qos_usage_hour_table will be used to seed remaining qos usage tables to speed things up
{
"name": f"{cluster}_qos_usage_day_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_qos_usage_hour_table",
"from_template": True,
"minversion": (25, 5, 3),
},
{
"name": f"{cluster}_qos_usage_month_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_qos_usage_hour_table",
"from_template": True,
"minversion": (25, 5, 3),
},
{
"name": f"{cluster}_assoc_usage_hour_table",
"sql": "(creation_time,mod_time,id,time_start) values (@seq,@seq,@seq,@seq)",
},
# assoc_usage_hour_table will be used to seed remaining assoc usage tables to speed things up
{
"name": f"{cluster}_assoc_usage_day_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_assoc_usage_month_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_wckey_usage_hour_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_wckey_usage_day_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_wckey_usage_month_table",
"sql": f"(creation_time,mod_time,id,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_usage_hour_table",
"sql": f"(creation_time,mod_time,id_tres,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_usage_day_table",
"sql": f"(creation_time,mod_time,id_tres,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
{
"name": f"{cluster}_usage_month_table",
"sql": f"(creation_time,mod_time,id_tres,time_start) select creation_time,mod_time,id,time_start from {cluster}_assoc_usage_hour_table",
"from_template": True,
},
],
"max_perf_time": max_perf_time_default,
},
]
@pytest.fixture(scope="module", autouse=True)
def setup():
"""Test setup with required configurations."""
atf.require_accounting(modify=True)
atf.require_config_parameter_includes("AccountingStorageEnforce", "associations")
atf.require_config_parameter("AllowNoDefAcct", None, source="slurmdbd")
# if any purge types are to be skipped, comment them out below
atf.require_config_parameter("PurgeEventAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeJobAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeResvAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeStepAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeSuspendAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeTXNAfter", "1h", source="slurmdbd")
atf.require_config_parameter("PurgeUsageAfter", "1h", source="slurmdbd")
# debug2 really only required
atf.require_config_parameter("DebugLevel", "debug2", source="slurmdbd")
# these can be helpful when debugging/tracing
# atf.require_config_parameter("DebugLevel", "debug4", source="slurmdbd")
# atf.require_config_parameter("DebugFlags", f"{atf.get_config(live=False, source="slurmdbd", quiet=True)["DebugFlags"]},DB_USAGE,DB_ARCHIVE", source="slurmdbd")
# so database will exist
atf.require_slurm_running()
@pytest.fixture(scope="function", autouse=True)
def setup_db(sql_statement_repeat):
atf.run_command(
f"sacctmgr -i add cluster {cluster}",
user=atf.properties["slurm-user"],
fatal=True,
# quiet=True,
)
yield
atf.run_command(
f"sacctmgr -i remove cluster {cluster}",
user=atf.properties["slurm-user"],
# quiet=True,
)
# we need to pass in a mutable object (list with index) so the thread results can be returned
def run_command_bg(command, result, index, **run_command_kwargs):
result[index] = atf.run_command(command, **run_command_kwargs)
def test_purge_slurm_db_tables(sql_statement_repeat):
"""Test purging large number of rows"""
# rollup beginning/ending log signatures
rollup_beg_pat = "running rollup"
rollup_end_pat = "everything rolled"
dbd_log = atf.get_config_parameter(
"LogFile", live=False, quiet=True, source="slurmdbd"
)
for entry in table_gen_info:
if (
atf.get_config_parameter(
f"Purge{entry['type']}After", live=False, quiet=True, source="slurmdbd"
)
is None
):
continue
# stop so db can be altered
atf.stop_slurmdbd()
# make entries 2hrs old
two_hours_ago = int(time.time()) - 2 * 3600
count_sql = ""
table_count = len(entry["tables"])
for table_info in entry["tables"]:
if (
"minversion" in table_info
and atf.get_version("sbin/slurmdbd") < table_info["minversion"]
):
table_count -= 1
continue
# if row_count was given then use it, otherwise make each table equal size
if "row_count" in table_info:
row_count = int(table_info["row_count"])
else:
row_count = int(math.ceil(row_count_total / table_count))
# populate the table
if "from_template" in table_info and table_info["from_template"]:
# data copied from another template table so no need to repeat insert
# this makes things faster
seq_start = 1
seq_end = 1
else:
seq_start = two_hours_ago - row_count + 1
seq_end = two_hours_ago
mysql_command = sql_statement_repeat
mysql_command += f" -e \"call statement_repeat(\\\"insert into {table_info['name']} {table_info['sql']}\\\", {seq_start}, {seq_end}, 1, 1)\""
logging.info(
f"Populating database with {row_count} rows in {table_info['name']}"
)
atf.run_command(
mysql_command,
user=atf.properties["slurm-user"],
fatal=True,
)
count_sql += (
f"{'+' if count_sql else ''}(select count(*) from {table_info['name']})"
)
# save starting sum of table row counts
mysql_command = sql_statement_repeat + f' -Ns -e "select {count_sql}"'
starting_row_count = atf.run_command_output(
mysql_command,
user=atf.properties["slurm-user"],
# quiet=True,
fatal=True,
)
# fabricate "last ran" time of 2hrs ago so hourly rollup/purge will be triggered
mysql_command = (
sql_statement_repeat
+ f' -e "delete ignore from {cluster}_last_ran_table; insert into {cluster}_last_ran_table (hourly_rollup, daily_rollup, monthly_rollup) values ({two_hours_ago}, {two_hours_ago}, {two_hours_ago})"'
)
atf.run_command(
mysql_command,
user=atf.properties["slurm-user"],
fatal=True,
)
logging.info(
f"Waiting {entry['max_perf_time']}s for {entry['type']} purge to complete (happens as part of rollup)"
)
# start watching the log before starting slurmdbd so all desired output can be captured
results = [None]
index = 0
tail_thread = threading.Thread(
target=run_command_bg,
args=(
# use "-n 1" so we don't match rollup_end_pat from previous run
f"tail -n 1 -F {dbd_log} | grep -E --line-buffered -i -m2 '{rollup_beg_pat}|{rollup_end_pat}'",
results,
index,
),
kwargs={
"timeout": entry["max_perf_time"],
"quiet": True,
"user": atf.properties["slurm-user"],
},
)
tail_thread.start()
# wait for thread to start
while tail_thread.ident is None:
time.sleep(0.1)
atf.start_slurmdbd()
# wait for rollup (purge) to finish or timeout
tail_thread.join()
logging.info(
f"Rollup lines identified in {dbd_log}:\n{results[index]['stdout'].rstrip()}"
)
search_result = re.search(
rf"(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d\d\d).*{rollup_beg_pat}.*\s*.*(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d\d\d).*{rollup_end_pat}",
results[index]["stdout"],
re.IGNORECASE,
)
assert (
search_result
), f"slurmdbd should be able to purge {entry['type']} tables in less than {entry['max_perf_time']}s"
# count the actual number of rows removed by the purge
mysql_command = sql_statement_repeat + f' -Ns -e "select {count_sql}"'
ending_row_count = atf.run_command_output(
mysql_command,
user=atf.properties["slurm-user"],
# quiet=True,
fatal=True,
)
rows_removed = int(starting_row_count) - int(ending_row_count)
start_obj = datetime.strptime(search_result.group(1), "%Y-%m-%dT%H:%M:%S.%f")
end_obj = datetime.strptime(search_result.group(2), "%Y-%m-%dT%H:%M:%S.%f")
time_sec = (end_obj - start_obj).total_seconds()
rate = int(rows_removed / time_sec)
# debug: list tables with leftover rows
mysql_command = (
sql_statement_repeat
+ f" -e \"select rpad(table_name,40,' '),lpad(table_rows,6,' ') from information_schema.tables where table_schema=database() and table_name like '%_{entry['type'].lower()}_%' and table_rows>0\""
)
atf.run_command_output(mysql_command, user=atf.properties["slurm-user"])
logging.info(
f"{entry['type']} purge took {time_sec}s to remove {rows_removed} rows from {table_count} tables ({rate} rows/sec)"
)