blob: 6b12585419029fa023079cf69826bb7c5f611df0 [file] [log] [blame]
/*****************************************************************************\
* as_mysql_rollup.c - functions for rolling up data for associations
* and machines from the as_mysql storage.
*****************************************************************************
* Copyright (C) 2004-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Danny Auble <da@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "as_mysql_rollup.h"
#include "as_mysql_archive.h"
#include "src/common/parse_time.h"
#include "src/common/slurm_time.h"
enum {
TIME_ALLOC,
TIME_DOWN,
TIME_PDOWN,
TIME_RESV
};
enum {
ASSOC_TABLES,
QOS_TABLES,
WCKEY_TABLES
};
typedef struct {
uint64_t count;
uint32_t id;
uint64_t time_alloc;
uint64_t time_down;
uint64_t time_idle;
uint64_t time_over;
uint64_t time_pd;
uint64_t time_resv;
uint64_t total_time;
} local_tres_usage_t;
typedef struct {
int id;
int id_alt;
list_t *loc_tres;
} local_id_usage_t;
typedef struct {
time_t end;
int id; /*only needed for reservations */
list_t *loc_tres;
time_t start;
} local_cluster_usage_t;
typedef struct {
time_t end;
uint32_t flags;
int id;
hostlist_t *hl;
list_t *local_assocs; /* list of assocs to spread unused time
over of type local_id_usage_t */
list_t *loc_tres;
time_t orig_start;
time_t start;
double unused_wall;
} local_resv_usage_t;
static void _destroy_local_tres_usage(void *object)
{
local_tres_usage_t *a_usage = (local_tres_usage_t *)object;
if (a_usage) {
xfree(a_usage);
}
}
static void _destroy_local_id_usage(void *object)
{
local_id_usage_t *a_usage = (local_id_usage_t *)object;
if (a_usage) {
FREE_NULL_LIST(a_usage->loc_tres);
xfree(a_usage);
}
}
static void _destroy_local_cluster_usage(void *object)
{
local_cluster_usage_t *c_usage = (local_cluster_usage_t *)object;
if (c_usage) {
FREE_NULL_LIST(c_usage->loc_tres);
xfree(c_usage);
}
}
static void _destroy_local_resv_usage(void *object)
{
local_resv_usage_t *r_usage = (local_resv_usage_t *)object;
if (r_usage) {
FREE_NULL_HOSTLIST(r_usage->hl);
FREE_NULL_LIST(r_usage->local_assocs);
FREE_NULL_LIST(r_usage->loc_tres);
xfree(r_usage);
}
}
static int _find_loc_tres(void *x, void *key)
{
local_tres_usage_t *loc_tres = (local_tres_usage_t *)x;
uint32_t tres_id = *(uint32_t *)key;
if (loc_tres->id == tres_id)
return 1;
return 0;
}
static int _find_id_usage(void *x, void *key)
{
local_id_usage_t *loc = (local_id_usage_t *)x;
uint32_t id = *(uint32_t *)key;
if (loc->id == id)
return 1;
return 0;
}
static int _find_id_alt_usage(void *x, void *key)
{
local_id_usage_t *loc = x;
local_id_usage_t *id = key;
if ((loc->id == id->id) &&
(loc->id_alt == id->id_alt))
return 1;
return 0;
}
static void _remove_job_tres_time_from_cluster(list_t *c_tres, list_t *j_tres,
int seconds)
{
list_itr_t *c_itr;
local_tres_usage_t *loc_c_tres, *loc_j_tres;
uint64_t time;
if ((seconds <= 0) || !c_tres || !j_tres ||
!list_count(c_tres) || !list_count(j_tres))
return;
c_itr = list_iterator_create(c_tres);
while ((loc_c_tres = list_next(c_itr))) {
if (!(loc_j_tres = list_find_first(
j_tres, _find_loc_tres, &loc_c_tres->id)))
continue;
time = seconds * loc_j_tres->count;
if (time >= loc_c_tres->total_time)
loc_c_tres->total_time = 0;
else
loc_c_tres->total_time -= time;
}
list_iterator_destroy(c_itr);
}
static local_tres_usage_t *_add_time_tres(list_t *tres_list, int type,
uint32_t id, uint64_t time,
bool times_count)
{
local_tres_usage_t *loc_tres;
/* Energy TRES could have a NO_VAL64, we want to skip those as it is the
* same as a 0 since nothing was gathered.
*/
if (!time || (time == NO_VAL64))
return NULL;
loc_tres = list_find_first(tres_list, _find_loc_tres, &id);
if (!loc_tres) {
if (times_count)
return NULL;
loc_tres = xmalloc(sizeof(local_tres_usage_t));
loc_tres->id = id;
list_append(tres_list, loc_tres);
}
if (times_count) {
if (!loc_tres->count)
return NULL;
time *= loc_tres->count;
}
switch (type) {
case TIME_ALLOC:
loc_tres->time_alloc += time;
break;
case TIME_DOWN:
loc_tres->time_down += time;
break;
case TIME_PDOWN:
loc_tres->time_pd += time;
break;
case TIME_RESV:
loc_tres->time_resv += time;
break;
default:
error("_add_time_tres: unknown type %d given", type);
xassert(0);
break;
}
return loc_tres;
}
static void _add_time_tres_list(list_t *tres_list_out, list_t *tres_list_in,
int type, uint64_t time_in, bool times_count)
{
list_itr_t *itr;
local_tres_usage_t *loc_tres;
xassert(tres_list_in);
xassert(tres_list_out);
itr = list_iterator_create(tres_list_in);
while ((loc_tres = list_next(itr)))
_add_time_tres(tres_list_out, type,
loc_tres->id,
time_in ? time_in : loc_tres->total_time,
times_count);
list_iterator_destroy(itr);
}
/*
* Job usage is a ratio of its tres to the reservation's tres:
* Unused wall = unused wall - job_seconds * job_tres / resv_tres
*/
static int _update_unused_wall(local_resv_usage_t *r_usage, list_t *job_tres,
int job_seconds)
{
list_itr_t *resv_itr;
local_tres_usage_t *loc_tres;
uint32_t resv_tres_id;
uint64_t resv_tres_count;
double tres_ratio = 0.0;
/* Get TRES counts. Make sure the TRES types match. */
resv_itr = list_iterator_create(r_usage->loc_tres);
while ((loc_tres = list_next(resv_itr))) {
/* Avoid dividing by zero. */
if (!loc_tres->count)
continue;
resv_tres_id = loc_tres->id;
resv_tres_count = loc_tres->count;
if ((loc_tres = list_find_first(job_tres,
_find_loc_tres,
&resv_tres_id))) {
tres_ratio = (double)loc_tres->count /
(double)resv_tres_count;
break;
}
}
list_iterator_destroy(resv_itr);
/*
* Here we are converting TRES seconds to wall seconds. This is needed
* to determine how much time is actually idle in the reservation.
*/
r_usage->unused_wall -= (double)job_seconds * tres_ratio;
if (r_usage->unused_wall < 0) {
/*
* With a Flex reservation you can easily have more time than is
* possible. Just print this debug3 warning if it happens.
*/
debug3("Unused wall is less than zero; this should never happen outside a Flex reservation. Setting it to zero for resv id = %d, start = %ld.",
r_usage->id, r_usage->orig_start);
r_usage->unused_wall = 0;
}
return SLURM_SUCCESS;
}
static void _add_job_alloc_time_to_cluster(list_t *c_tres_list, list_t *j_tres)
{
list_itr_t *c_itr = list_iterator_create(c_tres_list);
local_tres_usage_t *loc_c_tres, *loc_j_tres;
while ((loc_c_tres = list_next(c_itr))) {
if (!(loc_j_tres = list_find_first(
j_tres, _find_loc_tres, &loc_c_tres->id)))
continue;
loc_c_tres->time_alloc += loc_j_tres->time_alloc;
}
list_iterator_destroy(c_itr);
}
static void _setup_cluster_tres(list_t *tres_list, uint32_t id,
uint64_t count, int seconds)
{
local_tres_usage_t *loc_tres =
list_find_first(tres_list, _find_loc_tres, &id);
if (!loc_tres) {
loc_tres = xmalloc(sizeof(local_tres_usage_t));
loc_tres->id = id;
list_append(tres_list, loc_tres);
}
loc_tres->count = count;
loc_tres->total_time += seconds * loc_tres->count;
}
static void _add_tres_2_list(list_t *tres_list, char *tres_str, int seconds)
{
char *tmp_str = tres_str;
int id;
uint64_t count;
xassert(tres_list);
if (!tres_str || !tres_str[0])
return;
while (tmp_str) {
id = atoi(tmp_str);
if (id < 1) {
error("_add_tres_2_list: no id "
"found at %s instead", tmp_str);
break;
}
/* We don't run rollup on a node basis
* because they are shared resources on
* many systems so it will almost always
* have over committed resources.
*/
if (id != TRES_NODE) {
if (!(tmp_str = strchr(tmp_str, '='))) {
error("_add_tres_2_list: no value found");
xassert(0);
break;
}
count = slurm_atoull(++tmp_str);
_setup_cluster_tres(tres_list, id, count, seconds);
}
if (!(tmp_str = strchr(tmp_str, ',')))
break;
tmp_str++;
}
return;
}
static void _add_job_alloc_time_to_assoc(list_t *a_tres_list,
list_t *j_tres_list)
{
local_tres_usage_t *loc_a_tres, *loc_j_tres;
/*
* NOTE: You have to use slurm_list_pop here, since
* mysql is exporting something of the same type as a
* macro, which messes everything up
* (my_list.h is the bad boy).
*/
while ((loc_j_tres = slurm_list_pop(j_tres_list))) {
if (!(loc_a_tres = list_find_first(
a_tres_list, _find_loc_tres, &loc_j_tres->id))) {
/*
* New TRES we haven't seen before in this association
* just transfer it over.
*/
list_append(a_tres_list, loc_j_tres);
continue;
}
loc_a_tres->time_alloc += loc_j_tres->time_alloc;
_destroy_local_tres_usage(loc_j_tres);
}
}
/* This will destroy the *loc_tres given after it is transferred */
static void _transfer_loc_tres(list_t **loc_tres, local_id_usage_t *usage)
{
if (!usage || !*loc_tres) {
FREE_NULL_LIST(*loc_tres);
return;
}
if (!usage->loc_tres) {
usage->loc_tres = *loc_tres;
*loc_tres = NULL;
} else {
_add_job_alloc_time_to_assoc(usage->loc_tres, *loc_tres);
FREE_NULL_LIST(*loc_tres);
}
}
static void _add_tres_time_2_list(list_t *tres_list, char *tres_str,
int type, int seconds, int suspend_seconds,
bool times_count)
{
char *tmp_str = tres_str;
int id;
uint64_t time, count;
local_tres_usage_t *loc_tres;
xassert(tres_list);
if (!tres_str || !tres_str[0])
return;
while (tmp_str) {
int loc_seconds = seconds;
id = atoi(tmp_str);
if (id < 1) {
error("_add_tres_time_2_list: no id "
"found at %s", tmp_str);
break;
}
if (!(tmp_str = strchr(tmp_str, '='))) {
error("_add_tres_time_2_list: no value found for "
"id %d '%s'", id, tres_str);
xassert(0);
break;
}
/* Take away suspended time from TRES that are idle when the
* job was suspended, currently only CPU's fill that bill.
*/
if (suspend_seconds && (id == TRES_CPU)) {
loc_seconds -= suspend_seconds;
if (loc_seconds < 1)
loc_seconds = 0;
}
time = count = slurm_atoull(++tmp_str);
/* ENERGY is already totalled for the entire job so don't
* multiple with time.
*/
if (id != TRES_ENERGY)
time *= loc_seconds;
loc_tres = _add_time_tres(tres_list, type, id,
time, times_count);
if (loc_tres && !loc_tres->count)
loc_tres->count = count;
if (!(tmp_str = strchr(tmp_str, ',')))
break;
tmp_str++;
}
return;
}
static int _process_purge(mysql_conn_t *mysql_conn,
char *cluster_name,
uint16_t archive_data,
uint32_t purge_period)
{
int rc = SLURM_SUCCESS;
slurmdb_archive_cond_t arch_cond;
slurmdb_job_cond_t job_cond;
/* if we didn't ask for archive data return here and don't do
anything extra just rollup */
if (!archive_data)
return SLURM_SUCCESS;
if (!slurmdbd_conf)
return SLURM_SUCCESS;
memset(&job_cond, 0, sizeof(job_cond));
memset(&arch_cond, 0, sizeof(arch_cond));
arch_cond.archive_dir = slurmdbd_conf->archive_dir;
arch_cond.archive_script = slurmdbd_conf->archive_script;
if (purge_period & slurmdbd_conf->purge_event)
arch_cond.purge_event = slurmdbd_conf->purge_event;
else
arch_cond.purge_event = NO_VAL;
if (purge_period & slurmdbd_conf->purge_job)
arch_cond.purge_job = slurmdbd_conf->purge_job;
else
arch_cond.purge_job = NO_VAL;
if (purge_period & slurmdbd_conf->purge_resv)
arch_cond.purge_resv = slurmdbd_conf->purge_resv;
else
arch_cond.purge_resv = NO_VAL;
if (purge_period & slurmdbd_conf->purge_step)
arch_cond.purge_step = slurmdbd_conf->purge_step;
else
arch_cond.purge_step = NO_VAL;
if (purge_period & slurmdbd_conf->purge_suspend)
arch_cond.purge_suspend = slurmdbd_conf->purge_suspend;
else
arch_cond.purge_suspend = NO_VAL;
if (purge_period & slurmdbd_conf->purge_txn)
arch_cond.purge_txn = slurmdbd_conf->purge_txn;
else
arch_cond.purge_txn = NO_VAL;
if (purge_period & slurmdbd_conf->purge_usage)
arch_cond.purge_usage = slurmdbd_conf->purge_usage;
else
arch_cond.purge_usage = NO_VAL;
job_cond.cluster_list = list_create(NULL);
list_append(job_cond.cluster_list, cluster_name);
arch_cond.job_cond = &job_cond;
rc = as_mysql_jobacct_process_archive(mysql_conn, &arch_cond);
FREE_NULL_LIST(job_cond.cluster_list);
return rc;
}
static void _setup_cluster_tres_usage(mysql_conn_t *mysql_conn,
char *cluster_name,
time_t curr_start, time_t curr_end,
time_t now, time_t use_start,
local_tres_usage_t *loc_tres,
char **query)
{
char start_char[256], end_char[256];
uint64_t total_used;
if (!loc_tres)
return;
/* Now put the lists into the usage tables */
/* sanity check to make sure we don't have more
allocated cpus than possible. */
if (loc_tres->total_time
&& (loc_tres->total_time < loc_tres->time_alloc)) {
slurm_make_time_str(&curr_start, start_char,
sizeof(start_char));
slurm_make_time_str(&curr_end, end_char,
sizeof(end_char));
error("We have more allocated time than is possible (%"PRIu64" > %"PRIu64") for cluster %s(%"PRIu64") from %s - %s tres %u (this may happen if oversubscription of resources is allowed without Gang)",
loc_tres->time_alloc, loc_tres->total_time,
cluster_name, loc_tres->count,
start_char, end_char, loc_tres->id);
loc_tres->time_alloc = loc_tres->total_time;
}
total_used = loc_tres->time_alloc +
loc_tres->time_down + loc_tres->time_pd;
/* Make sure the total time we care about
doesn't go over the limit */
if (loc_tres->total_time && (loc_tres->total_time < total_used)) {
int64_t overtime;
slurm_make_time_str(&curr_start, start_char,
sizeof(start_char));
slurm_make_time_str(&curr_end, end_char,
sizeof(end_char));
error("We have more time than is possible (%"PRIu64"+%"PRIu64"+%"PRIu64")(%"PRIu64") > %"PRIu64" for cluster %s(%"PRIu64") from %s - %s tres %u (this may happen if oversubscription of resources is allowed without Gang)",
loc_tres->time_alloc, loc_tres->time_down,
loc_tres->time_pd, total_used,
loc_tres->total_time,
cluster_name, loc_tres->count,
start_char, end_char, loc_tres->id);
/* First figure out how much actual down time
we have and then how much
planned down time we have. */
overtime = (int64_t)(loc_tres->total_time -
(loc_tres->time_alloc +
loc_tres->time_down));
if (overtime < 0) {
loc_tres->time_down += overtime;
if ((int64_t)loc_tres->time_down < 0)
loc_tres->time_down = 0;
}
overtime = (int64_t)(loc_tres->total_time -
(loc_tres->time_alloc +
loc_tres->time_down +
loc_tres->time_pd));
if (overtime < 0) {
loc_tres->time_pd += overtime;
if ((int64_t)loc_tres->time_pd < 0)
loc_tres->time_pd = 0;
}
total_used = loc_tres->time_alloc +
loc_tres->time_down + loc_tres->time_pd;
/* info("We now have (%"PRIu64"+%"PRIu64"+" */
/* "%"PRIu64")(%"PRIu64") " */
/* "?= %"PRIu64"", */
/* loc_tres->time_alloc, loc_tres->time_down, */
/* loc_tres->time_pd, total_used, */
/* loc_tres->total_time); */
}
/* info("Cluster %s now has (%"PRIu64"+%"PRIu64"+" */
/* "%"PRIu64")(%"PRIu64") ?= %"PRIu64"", */
/* cluster_name, */
/* c_usage->a_cpu, c_usage->d_cpu, */
/* c_usage->pd_cpu, total_used, */
/* c_usage->total_time); */
loc_tres->time_idle = loc_tres->total_time -
total_used - loc_tres->time_resv;
/* sanity check just to make sure we have a
* legitimate time after we calculated
* idle/reserved time put extra in the over
* commit field
*/
/* info("%s got idle of %lld", loc_tres->name, */
/* (int64_t)loc_tres->time_idle); */
if ((int64_t)loc_tres->time_idle < 0) {
/* info("got %d %d %d", loc_tres->time_resv, */
/* loc_tres->time_idle, loc_tres->time_over); */
loc_tres->time_resv += (int64_t)loc_tres->time_idle;
loc_tres->time_over -= (int64_t)loc_tres->time_idle;
loc_tres->time_idle = 0;
if ((int64_t)loc_tres->time_resv < 0)
loc_tres->time_resv = 0;
}
/* info("cluster %s(%u) down %"PRIu64" alloc %"PRIu64" " */
/* "resv %"PRIu64" idle %"PRIu64" over %"PRIu64" " */
/* "total= %"PRIu64" ?= %"PRIu64" from %s", */
/* cluster_name, */
/* loc_tres->count, loc_tres->time_down, */
/* loc_tres->time_alloc, */
/* loc_tres->time_resv, loc_tres->time_idle, */
/* loc_tres->time_over, */
/* loc_tres->time_down + loc_tres->time_alloc + */
/* loc_tres->time_resv + loc_tres->time_idle, */
/* loc_tres->total_time, */
/* slurm_ctime2(&loc_tres->start)); */
/* info("to %s", slurm_ctime2(&loc_tres->end)); */
if (*query)
xstrfmtcat(*query, ", (%ld, %ld, %ld, %u, %"PRIu64", "
"%"PRIu64", %"PRIu64", %"PRIu64", "
"%"PRIu64", %"PRIu64", %"PRIu64")",
now, now, use_start, loc_tres->id,
loc_tres->count,
loc_tres->time_alloc,
loc_tres->time_down,
loc_tres->time_pd,
loc_tres->time_idle,
loc_tres->time_over,
loc_tres->time_resv);
else
xstrfmtcat(*query, "insert into \"%s_%s\" "
"(creation_time, mod_time, "
"time_start, id_tres, count, "
"alloc_secs, down_secs, pdown_secs, "
"idle_secs, over_secs, plan_secs) "
"values (%ld, %ld, %ld, %u, %"PRIu64", "
"%"PRIu64", %"PRIu64", %"PRIu64", "
"%"PRIu64", %"PRIu64", %"PRIu64")",
cluster_name, cluster_hour_table,
now, now,
use_start, loc_tres->id,
loc_tres->count,
loc_tres->time_alloc,
loc_tres->time_down,
loc_tres->time_pd,
loc_tres->time_idle,
loc_tres->time_over,
loc_tres->time_resv);
return;
}
static int _process_cluster_usage(mysql_conn_t *mysql_conn,
char *cluster_name,
time_t curr_start, time_t curr_end,
time_t now, local_cluster_usage_t *c_usage)
{
int rc = SLURM_SUCCESS;
char *query = NULL;
list_itr_t *itr;
local_tres_usage_t *loc_tres;
if (!c_usage)
return rc;
/* Now put the lists into the usage tables */
xassert(c_usage->loc_tres);
itr = list_iterator_create(c_usage->loc_tres);
while ((loc_tres = list_next(itr))) {
_setup_cluster_tres_usage(mysql_conn, cluster_name,
curr_start, curr_end, now,
c_usage->start, loc_tres, &query);
}
list_iterator_destroy(itr);
if (!query)
return rc;
xstrfmtcat(query,
" on duplicate key update "
"mod_time=%ld, count=VALUES(count), "
"alloc_secs=VALUES(alloc_secs), "
"down_secs=VALUES(down_secs), "
"pdown_secs=VALUES(pdown_secs), "
"idle_secs=VALUES(idle_secs), "
"over_secs=VALUES(over_secs), "
"plan_secs=VALUES(plan_secs)",
now);
/* Spacing out the inserts here instead of doing them
all at once in the end proves to be faster. Just FYI
so we don't go testing again and again.
*/
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS)
error("Couldn't add cluster hour rollup");
return rc;
}
static void _create_id_usage_insert(char *cluster_name, int type,
time_t curr_start, time_t now,
local_id_usage_t *id_usage,
char **query)
{
local_tres_usage_t *loc_tres;
list_itr_t *itr;
bool first;
char *table = NULL, *id_name = NULL;
xassert(query);
switch (type) {
case ASSOC_TABLES:
id_name = "id_assoc";
table = assoc_hour_table;
break;
case QOS_TABLES:
id_name = "id_qos";
table = qos_hour_table;
break;
case WCKEY_TABLES:
id_name = "id_wckey";
table = wckey_hour_table;
break;
default:
error("_create_id_usage_insert: unknown type %d", type);
return;
break;
}
if (!id_usage->loc_tres || !list_count(id_usage->loc_tres)) {
error("%s %d doesn't have any tres", id_name, id_usage->id);
return;
}
first = 1;
itr = list_iterator_create(id_usage->loc_tres);
while ((loc_tres = list_next(itr))) {
if (!first) {
xstrfmtcat(*query,
", (%ld, %ld, %u, %u, %ld, %u, %"PRIu64")",
now, now,
id_usage->id, id_usage->id_alt,
curr_start, loc_tres->id,
loc_tres->time_alloc);
} else {
xstrfmtcat(*query,
"insert into \"%s_%s\" "
"(creation_time, mod_time, id, id_alt, "
"time_start, id_tres, alloc_secs) "
"values (%ld, %ld, %u, %u, "
"%ld, %u, %"PRIu64")",
cluster_name, table, now, now,
id_usage->id, id_usage->id_alt,
curr_start, loc_tres->id,
loc_tres->time_alloc);
first = 0;
}
}
list_iterator_destroy(itr);
xstrfmtcat(*query,
" on duplicate key update mod_time=%ld, "
"alloc_secs=VALUES(alloc_secs);", now);
}
static int _add_resv_usage_to_cluster(void *object, void *arg)
{
local_resv_usage_t *r_usage = (local_resv_usage_t *)object;
local_cluster_usage_t *c_usage = (local_cluster_usage_t *)arg;
xassert(c_usage);
/*
* Only record time for the clusters that have
* registered, or if a reservation has the IGNORE_JOBS
* flag we don't have an easy way to distinguish the
* cpus a job not running in the reservation, but on
* it's cpus.
* We still need them for figuring out unused wall time,
* but for cluster utilization we will just ignore them.
*/
if (r_usage->flags & RESERVE_FLAG_IGN_JOBS)
return SLURM_SUCCESS;
/*
* Since this reservation was added to the
* cluster and only certain people could run
* there we will use this as allocated time on
* the system. If the reservation was a
* maintenance then we add the time to planned
* down time.
*/
_add_time_tres_list(c_usage->loc_tres,
r_usage->loc_tres,
(r_usage->flags & RESERVE_FLAG_MAINT) ?
TIME_PDOWN : TIME_ALLOC, 0, 0);
/* slurm_make_time_str(&r_usage->start, start_char, */
/* sizeof(start_char)); */
/* slurm_make_time_str(&r_usage->end, end_char, */
/* sizeof(end_char)); */
/* info("adding this much %lld to cluster %s " */
/* "%d %d %s - %s", */
/* r_usage->total_time, c_usage->name, */
/* (row_flags & RESERVE_FLAG_MAINT), */
/* r_usage->id, start_char, end_char); */
return SLURM_SUCCESS;
}
static local_cluster_usage_t *_setup_cluster_usage(mysql_conn_t *mysql_conn,
char *cluster_name,
time_t curr_start,
time_t curr_end,
list_t *resv_usage_list,
list_t *cluster_down_list,
int dims)
{
local_cluster_usage_t *c_usage = NULL;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_ROW row;
int i = 0;
list_itr_t *d_itr = NULL;
list_itr_t *r_itr = NULL;
local_cluster_usage_t *loc_c_usage;
local_resv_usage_t *loc_r_usage;
char *event_req_inx[] = {
"node_name",
"time_start",
"time_end",
"state",
"tres",
};
char *event_str = NULL;
enum {
EVENT_REQ_NAME,
EVENT_REQ_START,
EVENT_REQ_END,
EVENT_REQ_STATE,
EVENT_REQ_TRES,
EVENT_REQ_COUNT
};
xstrfmtcat(event_str, "%s", event_req_inx[i]);
for(i=1; i<EVENT_REQ_COUNT; i++) {
xstrfmtcat(event_str, ", %s", event_req_inx[i]);
}
/* first get the events during this time. All that is
* except things with the maintenance flag set in the
* state. We handle those later with the reservations.
*/
query = xstrdup_printf("select %s from \"%s_%s\" where "
"!(state & %"PRIu64") && (time_start < %ld "
"&& (time_end >= %ld "
"|| time_end = 0)) "
"order by node_name, time_start",
event_str, cluster_name, event_table,
NODE_STATE_MAINT,
curr_end, curr_start);
xfree(event_str);
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
if (!(result = mysql_db_query_ret(mysql_conn, query, 0))) {
xfree(query);
return NULL;
}
xfree(query);
d_itr = list_iterator_create(cluster_down_list);
r_itr = list_iterator_create(resv_usage_list);
while ((row = mysql_fetch_row(result))) {
time_t row_start = slurm_atoul(row[EVENT_REQ_START]);
time_t row_end = slurm_atoul(row[EVENT_REQ_END]);
uint16_t state = slurm_atoul(row[EVENT_REQ_STATE]);
time_t local_start, local_end;
int seconds, resv_seconds;
if (row_start < curr_start)
row_start = curr_start;
if (!row_end || row_end > curr_end)
row_end = curr_end;
/* Don't worry about it if the time is less
* than 1 second.
*/
if ((seconds = (row_end - row_start)) < 1)
continue;
/* this means we are a cluster registration
entry */
if (!row[EVENT_REQ_NAME][0]) {
local_cluster_usage_t *loc_c_usage;
/* if the cpu count changes we will
* only care about the last cpu count but
* we will keep a total of the time for
* all cpus to get the correct cpu time
* for the entire period.
*/
if (state || !c_usage) {
loc_c_usage = xmalloc(
sizeof(local_cluster_usage_t));
loc_c_usage->start = row_start;
loc_c_usage->loc_tres =
list_create(_destroy_local_tres_usage);
/* If this has a state it
means the slurmctld went
down and we should put this
on the list and remove any
jobs from this time that
were running later.
*/
if (state)
list_append(cluster_down_list,
loc_c_usage);
else
c_usage = loc_c_usage;
} else
loc_c_usage = c_usage;
loc_c_usage->end = row_end;
_add_tres_2_list(loc_c_usage->loc_tres,
row[EVENT_REQ_TRES], seconds);
continue;
}
/*
* Only record down time for the cluster we
* are looking for. If it was during this
* time period we would already have it.
*/
if (!c_usage)
continue;
resv_seconds = 0;
/*
* Now switch this time from any non-maint
* reservations that may have had the node
* allocated during this time.
*/
list_iterator_reset(r_itr);
while ((loc_r_usage = list_next(r_itr))) {
time_t temp_end = row_end;
time_t temp_start = row_start;
list_t *loc_tres = NULL;
if (hostlist_find_dims(loc_r_usage->hl,
row[EVENT_REQ_NAME], dims)
< 0)
continue;
if (loc_r_usage->start > temp_start)
temp_start = loc_r_usage->start;
if (loc_r_usage->end < temp_end)
temp_end = loc_r_usage->end;
if ((resv_seconds = (temp_end - temp_start)) < 1)
continue;
loc_tres = list_create(_destroy_local_tres_usage);
_add_tres_time_2_list(loc_tres,
row[EVENT_REQ_TRES],
loc_r_usage->flags &
RESERVE_FLAG_MAINT ?
TIME_PDOWN : TIME_DOWN,
resv_seconds,
0, 0);
_add_tres_time_2_list(c_usage->loc_tres,
row[EVENT_REQ_TRES],
loc_r_usage->flags &
RESERVE_FLAG_MAINT ?
TIME_PDOWN : TIME_DOWN,
resv_seconds,
0, 0);
_remove_job_tres_time_from_cluster(
loc_r_usage->loc_tres,
loc_tres, resv_seconds);
FREE_NULL_LIST(loc_tres);
}
local_start = row_start;
local_end = row_end;
if (local_start < c_usage->start)
local_start = c_usage->start;
if (local_end > c_usage->end)
local_end = c_usage->end;
/* Don't worry about it if the time is less than 1 second. */
if ((seconds = (local_end - local_start)) < 1)
continue;
seconds -= resv_seconds;
if (seconds > 0) {
if (((state & NODE_STATE_BASE) == NODE_STATE_FUTURE) ||
(state & NODE_STATE_POWERED_DOWN))
_add_tres_time_2_list(c_usage->loc_tres,
row[EVENT_REQ_TRES],
TIME_PDOWN,
seconds, 0, 0);
else
_add_tres_time_2_list(c_usage->loc_tres,
row[EVENT_REQ_TRES],
TIME_DOWN,
seconds, 0, 0);
}
/*
* Now remove this time if there was a
* disconnected slurmctld during the down time.
*/
list_iterator_reset(d_itr);
while ((loc_c_usage = list_next(d_itr))) {
time_t temp_end = row_end;
time_t temp_start = row_start;
if (loc_c_usage->start > temp_start)
temp_start = loc_c_usage->start;
if (loc_c_usage->end < temp_end)
temp_end = loc_c_usage->end;
seconds = (temp_end - temp_start);
if (seconds < 1)
continue;
_remove_job_tres_time_from_cluster(
loc_c_usage->loc_tres,
c_usage->loc_tres, seconds);
/* info("Node %s was down for " */
/* "%d seconds while " */
/* "cluster %s's slurmctld " */
/* "wasn't responding", */
/* row[EVENT_REQ_NAME], */
/* seconds, cluster_name); */
}
}
mysql_free_result(result);
list_iterator_destroy(d_itr);
if (c_usage)
(void)list_for_each(resv_usage_list,
_add_resv_usage_to_cluster,
c_usage);
return c_usage;
}
static int _setup_resv_usage(mysql_conn_t *mysql_conn,
char *cluster_name,
time_t curr_start,
time_t curr_end,
list_t *resv_usage_list,
int dims)
{
MYSQL_RES *result = NULL;
MYSQL_ROW row;
int i;
char *query;
char *resv_str = NULL;
local_resv_usage_t *r_usage = NULL;
char *resv_req_inx[] = {
"id_resv",
"assoclist",
"flags",
"nodelist",
"tres",
"time_start",
"time_end",
"time_force",
"unused_wall"
};
enum {
RESV_REQ_ID,
RESV_REQ_ASSOCS,
RESV_REQ_FLAGS,
RESV_REQ_NODES,
RESV_REQ_TRES,
RESV_REQ_START,
RESV_REQ_END,
RESV_REQ_FORCE,
RESV_REQ_UNUSED,
RESV_REQ_COUNT
};
/* now get the reservations during this time */
i=0;
xstrfmtcat(resv_str, "%s", resv_req_inx[i]);
for(i=1; i<RESV_REQ_COUNT; i++)
xstrfmtcat(resv_str, ", %s", resv_req_inx[i]);
query = xstrdup_printf("select %s from \"%s_%s\" where "
"(time_start < %ld && time_end >= %ld) "
"order by time_start",
resv_str, cluster_name, resv_table,
curr_end, curr_start);
xfree(resv_str);
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
result = mysql_db_query_ret(mysql_conn, query, 0);
xfree(query);
if (!result)
return SLURM_ERROR;
/*
* If a reservation overlaps another reservation we
* total up everything here as if they didn't but when
* calculating the total time for a cluster we will
* remove the extra time received. This may result in
* unexpected results with association based reports
* since the association is given the total amount of
* time of each reservation, thus equaling more time
* than is available. Job/Cluster/Reservation reports
* should be fine though since we really don't over
* allocate resources. The issue with us not being
* able to handle overlapping reservations here is
* unless the reservation completely overlaps the
* other reservation we have no idea how many cpus
* should be removed since this could be a
* heterogeneous system. This same problem exists
* when a reservation is created with the ignore_jobs
* option which will allow jobs to continue to run in the
* reservation that aren't suppose to.
*/
while ((row = mysql_fetch_row(result))) {
time_t row_start = slurm_atoul(row[RESV_REQ_START]);
time_t row_end = slurm_atoul(row[RESV_REQ_END]);
time_t row_force = slurm_atoul(row[RESV_REQ_FORCE]);
int unused;
int resv_seconds;
time_t orig_start = row_start;
if (row_start >= curr_start) {
/*
* This is the first time we are seeing this
* reservation, so set our unused to be 0.
* This is mostly helpful when
* rerolling set it back to 0.
*/
unused = 0;
} else
unused = slurm_atoul(row[RESV_REQ_UNUSED]);
if (row_force > row_start)
row_start = row_force;
if (row_start <= curr_start)
row_start = curr_start;
if (!row_end || row_end > curr_end)
row_end = curr_end;
/* Don't worry about it if the time is less
* than 1 second.
*/
if ((resv_seconds = (row_end - row_start)) < 1)
continue;
r_usage = xmalloc(sizeof(local_resv_usage_t));
r_usage->flags = slurm_atoul(row[RESV_REQ_FLAGS]);
r_usage->id = slurm_atoul(row[RESV_REQ_ID]);
r_usage->local_assocs = list_create(xfree_ptr);
slurm_addto_char_list(r_usage->local_assocs,
row[RESV_REQ_ASSOCS]);
r_usage->loc_tres =
list_create(_destroy_local_tres_usage);
_add_tres_2_list(r_usage->loc_tres,
row[RESV_REQ_TRES], resv_seconds);
/*
* Original start is needed when updating the
* reservation's unused_wall later on.
*/
r_usage->orig_start = orig_start;
r_usage->start = row_start;
r_usage->end = row_end;
r_usage->unused_wall = unused + resv_seconds;
r_usage->hl = hostlist_create_dims(row[RESV_REQ_NODES], dims);
list_append(resv_usage_list, r_usage);
}
mysql_free_result(result);
return SLURM_SUCCESS;
}
static void _add_planned_time(local_cluster_usage_t *c_usage, time_t job_start,
time_t job_eligible, uint32_t array_pending,
uint32_t row_rcpu)
{
int eligible_start, eligible_end, loc_seconds = 0;
if (!c_usage || (job_start && (job_start < c_usage->start)))
return;
eligible_start = MAX(job_eligible, c_usage->start);
eligible_end = job_start ? MIN(job_start, c_usage->end) : c_usage->end;
loc_seconds = (eligible_end - eligible_start);
if (loc_seconds <= 0)
return;
/*
* If we have pending jobs in an array
* they haven't been inserted into the
* database yet as proper job records,
* so handle them here.
*/
if (array_pending)
loc_seconds *= array_pending;
_add_time_tres(c_usage->loc_tres, TIME_RESV, TRES_CPU,
loc_seconds * (uint64_t) row_rcpu, 0);
}
static local_id_usage_t *_check_q_usage(list_t *qos_usage_list,
local_id_usage_t *curr_q_usage,
local_id_usage_t *id_usage)
{
xassert(qos_usage_list);
xassert(id_usage);
if (curr_q_usage && _find_id_alt_usage(curr_q_usage, id_usage))
return curr_q_usage;
curr_q_usage = list_find_first(qos_usage_list,
_find_id_alt_usage,
id_usage);
if (!curr_q_usage) {
curr_q_usage = xmalloc(sizeof(*curr_q_usage));
curr_q_usage->id = id_usage->id;
curr_q_usage->id_alt = id_usage->id_alt;
list_append(qos_usage_list, curr_q_usage);
curr_q_usage->loc_tres = list_create(
_destroy_local_tres_usage);
}
return curr_q_usage;
}
extern int as_mysql_hourly_rollup(mysql_conn_t *mysql_conn,
char *cluster_name,
time_t start, time_t end,
uint16_t archive_data)
{
int rc = SLURM_SUCCESS;
int add_sec = 3600;
int i=0, dims;
time_t now = time(NULL);
time_t curr_start = start;
time_t curr_end = curr_start + add_sec;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_ROW row;
list_itr_t *a_itr = NULL;
list_itr_t *c_itr = NULL;
list_itr_t *q_itr = NULL;
list_itr_t *w_itr = NULL;
list_itr_t *r_itr = NULL;
list_t *assoc_usage_list = list_create(_destroy_local_id_usage);
list_t *cluster_down_list = list_create(_destroy_local_cluster_usage);
list_t *qos_usage_list = list_create(_destroy_local_id_usage);
list_t *wckey_usage_list = list_create(_destroy_local_id_usage);
list_t *resv_usage_list = list_create(_destroy_local_resv_usage);
uint16_t track_wckey = slurm_get_track_wckey();
local_cluster_usage_t *loc_c_usage = NULL;
local_cluster_usage_t *c_usage = NULL;
local_resv_usage_t *r_usage = NULL;
local_id_usage_t *a_usage = NULL;
local_id_usage_t *q_usage = NULL;
local_id_usage_t *w_usage = NULL;
/* char start_char[20], end_char[20]; */
char *job_req_inx[] = {
"job.job_db_inx",
// "job.id_job",
"job.id_assoc",
"job.id_qos",
"job.id_wckey",
"job.array_task_pending",
"job.time_eligible",
"job.time_start",
"job.time_end",
"job.time_suspended",
"job.cpus_req",
"job.id_resv",
"job.tres_alloc"
};
char *job_str = NULL;
enum {
JOB_REQ_DB_INX,
// JOB_REQ_JOBID,
JOB_REQ_ASSOCID,
JOB_REQ_QOSID,
JOB_REQ_WCKEYID,
JOB_REQ_ARRAY_PENDING,
JOB_REQ_ELG,
JOB_REQ_START,
JOB_REQ_END,
JOB_REQ_SUSPENDED,
JOB_REQ_RCPU,
JOB_REQ_RESVID,
JOB_REQ_TRES,
JOB_REQ_COUNT
};
char *suspend_req_inx[] = {
"time_start",
"time_end"
};
char *suspend_str = NULL;
enum {
SUSPEND_REQ_START,
SUSPEND_REQ_END,
SUSPEND_REQ_COUNT
};
i=0;
xstrfmtcat(job_str, "%s", job_req_inx[i]);
for(i=1; i<JOB_REQ_COUNT; i++) {
xstrfmtcat(job_str, ", %s", job_req_inx[i]);
}
i=0;
xstrfmtcat(suspend_str, "%s", suspend_req_inx[i]);
for(i=1; i<SUSPEND_REQ_COUNT; i++) {
xstrfmtcat(suspend_str, ", %s", suspend_req_inx[i]);
}
/* We need to figure out the dimensions of this cluster */
query = xstrdup_printf("select dimensions from %s where name='%s'",
cluster_table, cluster_name);
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
result = mysql_db_query_ret(mysql_conn, query, 0);
xfree(query);
if (!result) {
error("%s: error querying cluster_table", __func__);
rc = SLURM_ERROR;
goto end_it;
}
row = mysql_fetch_row(result);
if (!row) {
error("%s: no cluster by name %s known",
__func__, cluster_name);
rc = SLURM_ERROR;
goto end_it;
}
dims = atoi(row[0]);
mysql_free_result(result);
/* info("begin start %s", slurm_ctime2(&curr_start)); */
/* info("begin end %s", slurm_ctime2(&curr_end)); */
a_itr = list_iterator_create(assoc_usage_list);
c_itr = list_iterator_create(cluster_down_list);
w_itr = list_iterator_create(wckey_usage_list);
r_itr = list_iterator_create(resv_usage_list);
q_itr = list_iterator_create(qos_usage_list);
while (curr_start < end) {
int last_id = -1;
int last_wckeyid = -1;
DB_DEBUG(DB_USAGE, mysql_conn->conn,
"%s curr hour is now %ld-%ld",
cluster_name, curr_start, curr_end);
/* info("start %s", slurm_ctime2(&curr_start)); */
/* info("end %s", slurm_ctime2(&curr_end)); */
if ((rc = _setup_resv_usage(mysql_conn, cluster_name,
curr_start, curr_end,
resv_usage_list, dims))
!= SLURM_SUCCESS)
goto end_it;
c_usage = _setup_cluster_usage(mysql_conn, cluster_name,
curr_start, curr_end,
resv_usage_list,
cluster_down_list,
dims);
if (c_usage)
xassert(c_usage->loc_tres);
/* now get the jobs during this time only */
query = xstrdup_printf("select %s from \"%s_%s\" as job "
"FORCE INDEX (rollup) "
"where (job.time_eligible && "
"job.time_eligible < %ld && "
"(job.time_end >= %ld || "
"job.time_end = 0)) "
"group by job.job_db_inx "
"order by job.id_assoc, "
"job.time_eligible",
job_str, cluster_name, job_table,
curr_end, curr_start);
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
if (!(result = mysql_db_query_ret(
mysql_conn, query, 0))) {
rc = SLURM_ERROR;
goto end_it;
}
xfree(query);
while ((row = mysql_fetch_row(result))) {
//uint32_t job_id = slurm_atoul(row[JOB_REQ_JOBID]);
uint32_t assoc_id = slurm_atoul(row[JOB_REQ_ASSOCID]);
uint32_t qos_id = slurm_atoul(row[JOB_REQ_QOSID]);
uint32_t wckey_id = slurm_atoul(row[JOB_REQ_WCKEYID]);
uint32_t array_pending =
slurm_atoul(row[JOB_REQ_ARRAY_PENDING]);
uint32_t resv_id = slurm_atoul(row[JOB_REQ_RESVID]);
time_t row_eligible = slurm_atoul(row[JOB_REQ_ELG]);
time_t row_start = slurm_atoul(row[JOB_REQ_START]);
time_t row_end = slurm_atoul(row[JOB_REQ_END]);
uint32_t row_rcpu = slurm_atoul(row[JOB_REQ_RCPU]);
list_t *loc_tres = NULL;
int loc_seconds = 0;
int seconds = 0, suspend_seconds = 0;
local_id_usage_t id_usage = {
.id = assoc_id,
.id_alt = qos_id,
};
if (row_start && (row_start < curr_start))
row_start = curr_start;
if (!row_start && row_end)
row_start = row_end;
if (!row_end || row_end > curr_end)
row_end = curr_end;
if (!row_start || ((row_end - row_start) < 1))
goto calc_cluster;
seconds = (row_end - row_start);
if (slurm_atoul(row[JOB_REQ_SUSPENDED])) {
MYSQL_RES *result2 = NULL;
MYSQL_ROW row2;
/* get the suspended time for this job */
query = xstrdup_printf(
"select %s from \"%s_%s\" where "
"(time_start < %ld && (time_end >= %ld "
"|| time_end = 0)) && job_db_inx=%s "
"order by time_start",
suspend_str, cluster_name,
suspend_table,
curr_end, curr_start,
row[JOB_REQ_DB_INX]);
debug4("%d(%s:%d) query\n%s",
mysql_conn->conn, THIS_FILE,
__LINE__, query);
if (!(result2 = mysql_db_query_ret(
mysql_conn,
query, 0))) {
rc = SLURM_ERROR;
mysql_free_result(result);
goto end_it;
}
xfree(query);
while ((row2 = mysql_fetch_row(result2))) {
int tot_time = 0;
time_t local_start = slurm_atoul(
row2[SUSPEND_REQ_START]);
time_t local_end = slurm_atoul(
row2[SUSPEND_REQ_END]);
if (!local_start)
continue;
if (row_start > local_start)
local_start = row_start;
if (!local_end || row_end < local_end)
local_end = row_end;
tot_time = (local_end - local_start);
if (tot_time > 0)
suspend_seconds += tot_time;
}
mysql_free_result(result2);
}
/*
* Do the qos calculation check the assoc_id now since
* it will change in the next if
*/
q_usage = _check_q_usage(qos_usage_list, q_usage,
&id_usage);
if (last_id != assoc_id) {
a_usage = xmalloc(sizeof(local_id_usage_t));
a_usage->id = assoc_id;
list_append(assoc_usage_list, a_usage);
last_id = assoc_id;
/* a_usage->loc_tres is made later,
don't do it here.
*/
}
/* Short circuit this so so we don't get a pointer. */
if (!track_wckey)
last_wckeyid = wckey_id;
/* do the wckey calculation */
if (last_wckeyid != wckey_id) {
list_iterator_reset(w_itr);
while ((w_usage = list_next(w_itr)))
if (w_usage->id == wckey_id)
break;
if (!w_usage) {
w_usage = xmalloc(
sizeof(local_id_usage_t));
w_usage->id = wckey_id;
list_append(wckey_usage_list,
w_usage);
w_usage->loc_tres = list_create(
_destroy_local_tres_usage);
}
last_wckeyid = wckey_id;
}
/* do the cluster allocated calculation */
calc_cluster:
/*
* We need to have this clean for each job
* since we add the time to the cluster individually.
*/
loc_tres = list_create(_destroy_local_tres_usage);
_add_tres_time_2_list(loc_tres, row[JOB_REQ_TRES],
TIME_ALLOC, seconds,
suspend_seconds, 0);
if (q_usage)
_add_tres_time_2_list(q_usage->loc_tres,
row[JOB_REQ_TRES],
TIME_ALLOC, seconds,
suspend_seconds, 0);
if (w_usage)
_add_tres_time_2_list(w_usage->loc_tres,
row[JOB_REQ_TRES],
TIME_ALLOC, seconds,
suspend_seconds, 0);
/*
* Now figure out there was a disconnected
* slurmctld during this job.
*/
list_iterator_reset(c_itr);
while ((loc_c_usage = list_next(c_itr))) {
int temp_end = row_end;
int temp_start = row_start;
if (loc_c_usage->start > temp_start)
temp_start = loc_c_usage->start;
if (loc_c_usage->end < temp_end)
temp_end = loc_c_usage->end;
loc_seconds = (temp_end - temp_start);
if (loc_seconds < 1)
continue;
_remove_job_tres_time_from_cluster(
loc_c_usage->loc_tres,
loc_tres,
loc_seconds);
/* info("Job %u was running for " */
/* "%d seconds while " */
/* "cluster %s's slurmctld " */
/* "wasn't responding", */
/* job_id, loc_seconds, cluster_name); */
}
/* first figure out the reservation */
if (resv_id) {
/*
* Since we have already added the entire
* reservation as used time on the cluster we
* only need to calculate the used time for the
* reservation and then divy up the unused time
* over the associations able to run in the
* reservation. Since the job was to run, or ran
* a reservation we don't care about eligible
* time since that could totally skew the
* clusters reserved time since the job may be
* able to run outside of the reservation.
*/
list_iterator_reset(r_itr);
while ((r_usage = list_next(r_itr))) {
int temp_end, temp_start;
/*
* since the reservation could have
* changed in some way, thus making a
* new reservation record in the
* database, we have to make sure all
* of the reservations are checked to
* see if such a thing has happened
*/
if (r_usage->id != resv_id)
continue;
if (r_usage->flags &
RESERVE_FLAG_IGN_JOBS) {
_add_planned_time(
c_usage,
MIN(row_start,
r_usage->end),
MAX(row_eligible,
r_usage->start),
array_pending,
row_rcpu);
}
temp_end = row_end;
temp_start = row_start;
if (r_usage->start > temp_start)
temp_start =
r_usage->start;
if (r_usage->end < temp_end)
temp_end = r_usage->end;
loc_seconds = (temp_end - temp_start);
if (loc_seconds <= 0)
continue;
if (c_usage &&
(r_usage->flags &
RESERVE_FLAG_IGN_JOBS))
/*
* job usage was not
* bundled with resv
* usage so need to
* account for it
* individually here
*/
_add_tres_time_2_list(
c_usage->loc_tres,
row[JOB_REQ_TRES],
TIME_ALLOC,
loc_seconds,
0, 0);
_add_time_tres_list(
r_usage->loc_tres,
loc_tres, TIME_ALLOC,
loc_seconds, 1);
if ((rc = _update_unused_wall(
r_usage,
loc_tres,
loc_seconds))
!= SLURM_SUCCESS)
goto end_it;
}
_transfer_loc_tres(&loc_tres, a_usage);
continue;
}
if (c_usage && row_start && (seconds > 0)) {
/* info("%d assoc %d adds " */
/* "(%d)(%d-%d) * %d = %d " */
/* "to %d", */
/* job_id, */
/* a_usage->id, */
/* seconds, */
/* row_end, row_start, */
/* row_acpu, */
/* seconds * row_acpu, */
/* row_acpu); */
_add_job_alloc_time_to_cluster(
c_usage->loc_tres,
loc_tres);
}
/*
* The loc_tres isn't needed after this so transfer to
* the association and go on our merry way.
*/
_transfer_loc_tres(&loc_tres, a_usage);
_add_planned_time(c_usage, row_start, row_eligible,
array_pending, row_rcpu);
}
mysql_free_result(result);
/* now figure out how much more to add to the
associations that could had run in the reservation
*/
query = NULL;
list_iterator_reset(r_itr);
while ((r_usage = list_next(r_itr))) {
list_itr_t *t_itr;
local_tres_usage_t *loc_tres;
xstrfmtcat(query, "update \"%s_%s\" set unused_wall=%f where id_resv=%u and time_start=%ld;",
cluster_name, resv_table,
r_usage->unused_wall, r_usage->id,
r_usage->orig_start);
if (!r_usage->loc_tres ||
!list_count(r_usage->loc_tres))
continue;
t_itr = list_iterator_create(r_usage->loc_tres);
while ((loc_tres = list_next(t_itr))) {
int64_t idle = loc_tres->total_time -
loc_tres->time_alloc;
char *assoc = NULL;
list_itr_t *tmp_itr = NULL;
int assoc_cnt, resv_unused_secs;
if (idle <= 0)
break; /* since this will be
* the same for all TRES */
/* now divide that time by the number of
associations in the reservation and add
them to each association */
resv_unused_secs = idle;
assoc_cnt = list_count(r_usage->local_assocs);
if (assoc_cnt)
resv_unused_secs /= assoc_cnt;
/* info("resv %d got %d seconds for TRES %u " */
/* "for %d assocs", */
/* r_usage->id, resv_unused_secs, */
/* loc_tres->id, */
/* list_count(r_usage->local_assocs)); */
tmp_itr = list_iterator_create(
r_usage->local_assocs);
while ((assoc = list_next(tmp_itr))) {
uint32_t associd = slurm_atoul(assoc);
slurmdb_assoc_rec_t assoc_rec = {
.cluster = cluster_name,
.id = associd,
};
slurmdb_assoc_rec_t *assoc_ptr = NULL;
slurmdb_qos_rec_t qos_rec = { 0 };
local_id_usage_t id_usage = {
.id = associd,
};
/*
* Figure out the closest to correct QOS
*/
(void) assoc_mgr_fill_in_assoc(
mysql_conn,
&assoc_rec,
ACCOUNTING_ENFORCE_ASSOCS,
&assoc_ptr, false);
assoc_mgr_get_default_qos_info(
assoc_ptr, &qos_rec);
if (!qos_rec.id)
assoc_mgr_fill_in_qos(
mysql_conn, &qos_rec,
ACCOUNTING_ENFORCE_QOS,
NULL, false);
id_usage.id_alt = qos_rec.id;
if (id_usage.id_alt) {
q_usage = _check_q_usage(
qos_usage_list, q_usage,
&id_usage);
_add_time_tres(
q_usage->loc_tres,
TIME_ALLOC,
loc_tres->id,
resv_unused_secs,
0);
}
if ((last_id != associd) &&
!(a_usage = list_find_first(
assoc_usage_list,
_find_id_usage,
&associd))) {
a_usage = xmalloc(
sizeof(local_id_usage_t));
a_usage->id = associd;
list_append(assoc_usage_list,
a_usage);
a_usage->loc_tres = list_create(
_destroy_local_tres_usage);
}
last_id = associd;
_add_time_tres(a_usage->loc_tres,
TIME_ALLOC, loc_tres->id,
resv_unused_secs, 0);
}
list_iterator_destroy(tmp_itr);
}
list_iterator_destroy(t_itr);
}
if (query) {
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s",
query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS) {
error("couldn't update reservations with unused time");
goto end_it;
}
}
/* now apply the down time from the slurmctld disconnects */
if (c_usage) {
list_iterator_reset(c_itr);
while ((loc_c_usage = list_next(c_itr))) {
local_tres_usage_t *loc_tres;
list_itr_t *tmp_itr = list_iterator_create(
loc_c_usage->loc_tres);
while ((loc_tres = list_next(tmp_itr)))
_add_time_tres(c_usage->loc_tres,
TIME_DOWN,
loc_tres->id,
loc_tres->total_time,
0);
list_iterator_destroy(tmp_itr);
}
if ((rc = _process_cluster_usage(
mysql_conn, cluster_name, curr_start,
curr_end, now, c_usage))
!= SLURM_SUCCESS) {
goto end_it;
}
}
list_iterator_reset(a_itr);
while ((a_usage = list_next(a_itr)))
_create_id_usage_insert(cluster_name, ASSOC_TABLES,
curr_start, now,
a_usage, &query);
if (query) {
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s",
query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS) {
error("Couldn't add assoc hour rollup");
goto end_it;
}
}
list_iterator_reset(q_itr);
while ((q_usage = list_next(q_itr)))
_create_id_usage_insert(cluster_name, QOS_TABLES,
curr_start, now,
q_usage, &query);
if (query) {
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s",
query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS) {
error("Couldn't add qos hour rollup");
goto end_it;
}
}
if (!track_wckey)
goto end_loop;
list_iterator_reset(w_itr);
while ((w_usage = list_next(w_itr)))
_create_id_usage_insert(cluster_name, WCKEY_TABLES,
curr_start, now,
w_usage, &query);
if (query) {
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s",
query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS) {
error("Couldn't add wckey hour rollup");
goto end_it;
}
}
end_loop:
_destroy_local_cluster_usage(c_usage);
c_usage = NULL;
r_usage = NULL;
a_usage = NULL;
q_usage = NULL;
w_usage = NULL;
list_flush(assoc_usage_list);
list_flush(cluster_down_list);
list_flush(qos_usage_list);
list_flush(wckey_usage_list);
list_flush(resv_usage_list);
curr_start = curr_end;
curr_end = curr_start + add_sec;
}
end_it:
xfree(query);
xfree(suspend_str);
xfree(job_str);
_destroy_local_cluster_usage(c_usage);
if (a_itr)
list_iterator_destroy(a_itr);
if (c_itr)
list_iterator_destroy(c_itr);
if (q_itr)
list_iterator_destroy(q_itr);
if (w_itr)
list_iterator_destroy(w_itr);
if (r_itr)
list_iterator_destroy(r_itr);
FREE_NULL_LIST(assoc_usage_list);
FREE_NULL_LIST(cluster_down_list);
FREE_NULL_LIST(qos_usage_list);
FREE_NULL_LIST(wckey_usage_list);
FREE_NULL_LIST(resv_usage_list);
/* info("stop start %s", slurm_ctime2(&curr_start)); */
/* info("stop end %s", slurm_ctime2(&curr_end)); */
/* go check to see if we archive and purge */
if (rc == SLURM_SUCCESS) {
if (mysql_db_commit(mysql_conn)) {
char start[25], end[25];
error("Couldn't commit cluster (%s) "
"hour rollup for %s - %s",
cluster_name, slurm_ctime2_r(&curr_start, start),
slurm_ctime2_r(&curr_end, end));
rc = SLURM_ERROR;
} else
rc = _process_purge(mysql_conn, cluster_name,
archive_data, SLURMDB_PURGE_HOURS);
}
return rc;
}
extern int as_mysql_nonhour_rollup(mysql_conn_t *mysql_conn,
bool run_month,
char *cluster_name,
time_t start, time_t end,
uint16_t archive_data)
{
/* can't just add 86400 since daylight savings starts and ends every
* once in a while
*/
int rc = SLURM_SUCCESS;
struct tm start_tm;
time_t curr_start = start;
time_t curr_end;
time_t now = time(NULL);
char *query = NULL;
uint16_t track_wckey = slurm_get_track_wckey();
char *unit_name;
while (curr_start < end) {
if (!localtime_r(&curr_start, &start_tm)) {
error("Couldn't get localtime from start %ld",
curr_start);
return SLURM_ERROR;
}
start_tm.tm_sec = 0;
start_tm.tm_min = 0;
start_tm.tm_hour = 0;
if (run_month) {
unit_name = "month";
start_tm.tm_mday = 1;
start_tm.tm_mon++;
} else {
unit_name = "day";
start_tm.tm_mday++;
}
curr_end = slurm_mktime(&start_tm);
DB_DEBUG(DB_USAGE, mysql_conn->conn,
"curr %s is now %ld-%ld",
unit_name, curr_start, curr_end);
/* info("start %s", slurm_ctime2(&curr_start)); */
/* info("end %s", slurm_ctime2(&curr_end)); */
query = xstrdup_printf(
"insert into \"%s_%s\" (creation_time, mod_time, id, "
"id_alt, id_tres, time_start, alloc_secs) "
"select %ld, %ld, id, id_alt, id_tres, "
"%ld, @ASUM:=SUM(alloc_secs) from \"%s_%s\" where "
"(time_start < %ld && time_start >= %ld) "
"group by id, id_alt, id_tres on duplicate key update "
"mod_time=%ld, alloc_secs=@ASUM;",
cluster_name,
run_month ? assoc_month_table : assoc_day_table,
now, now, curr_start,
cluster_name,
run_month ? assoc_day_table : assoc_hour_table,
curr_end, curr_start, now);
xstrfmtcat(
query,
"insert into \"%s_%s\" (creation_time, mod_time, id, "
"id_alt, id_tres, time_start, alloc_secs) "
"select %ld, %ld, id, id_alt, id_tres, "
"%ld, @ASUM:=SUM(alloc_secs) from \"%s_%s\" where "
"(time_start < %ld && time_start >= %ld) "
"group by id, id_alt, id_tres on duplicate key update "
"mod_time=%ld, alloc_secs=@ASUM;",
cluster_name,
run_month ? qos_month_table : qos_day_table,
now, now, curr_start,
cluster_name,
run_month ? qos_day_table : qos_hour_table,
curr_end, curr_start, now);
/* We group on deleted here so if there are no entries
we don't get an error, just nothing is returned.
Else we get a bunch of NULL's
*/
xstrfmtcat(query,
"insert into \"%s_%s\" (creation_time, "
"mod_time, time_start, id_tres, count, "
"alloc_secs, down_secs, pdown_secs, "
"idle_secs, over_secs, plan_secs) "
"select %ld, %ld, "
"%ld, id_tres, @CPU:=MAX(count), "
"@ASUM:=SUM(alloc_secs), "
"@DSUM:=SUM(down_secs), "
"@PDSUM:=SUM(pdown_secs), "
"@ISUM:=SUM(idle_secs), "
"@OSUM:=SUM(over_secs), "
"@PSUM:=SUM(plan_secs) from \"%s_%s\" where "
"(time_start < %ld && time_start >= %ld) "
"group by deleted, id_tres "
"on duplicate key update "
"mod_time=%ld, count=@CPU, "
"alloc_secs=@ASUM, down_secs=@DSUM, "
"pdown_secs=@PDSUM, idle_secs=@ISUM, "
"over_secs=@OSUM, plan_secs=@PSUM;",
cluster_name,
run_month ? cluster_month_table : cluster_day_table,
now, now, curr_start,
cluster_name,
run_month ? cluster_day_table : cluster_hour_table,
curr_end, curr_start, now);
if (track_wckey) {
xstrfmtcat(query,
"insert into \"%s_%s\" (creation_time, "
"mod_time, id, id_alt, id_tres, time_start, "
"alloc_secs) "
"select %ld, %ld, "
"id, id_alt, id_tres, %ld, "
"@ASUM:=SUM(alloc_secs) "
"from \"%s_%s\" where (time_start < %ld && "
"time_start >= %ld) "
"group by id, id_alt, id_tres "
"on duplicate key update "
"mod_time=%ld, alloc_secs=@ASUM;",
cluster_name,
run_month ? wckey_month_table :
wckey_day_table,
now, now, curr_start,
cluster_name,
run_month ? wckey_day_table :
wckey_hour_table,
curr_end, curr_start, now);
}
DB_DEBUG(DB_USAGE, mysql_conn->conn, "query\n%s", query);
rc = mysql_db_query(mysql_conn, query);
xfree(query);
if (rc != SLURM_SUCCESS) {
error("Couldn't add %s rollup", unit_name);
return SLURM_ERROR;
}
curr_start = curr_end;
}
/* info("stop start %s", slurm_ctime2(&curr_start)); */
/* info("stop end %s", slurm_ctime2(&curr_end)); */
/* go check to see if we archive and purge */
rc = _process_purge(mysql_conn, cluster_name, archive_data,
run_month ? SLURMDB_PURGE_MONTHS :
SLURMDB_PURGE_DAYS);
return rc;
}