blob: ada20f73b2fa1292c43d325ad6a7aa0d8d461ef3 [file]
/*****************************************************************************\
* job_submit_pbs.c - Translate PBS job options specifications to the Slurm
* equivalents, particularly job dependencies.
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <dlfcn.h>
#include <inttypes.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/slurm_xlator.h"
#include "src/common/threadpool.h"
#include "src/common/xstring.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/slurmctld.h"
#define _DEBUG 0
/* Required Slurm plugin symbols: */
const char plugin_name[] = "Job submit PBS plugin";
const char plugin_type[] = "job_submit/pbs";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
static pthread_mutex_t depend_mutex = PTHREAD_MUTEX_INITIALIZER;
extern int init(void)
{
return SLURM_SUCCESS;
}
extern void fini(void)
{
return;
}
static void _add_env(job_desc_msg_t *job_desc, char *new_env)
{
if (!job_desc->environment || !new_env)
return; /* Nothing we can do for interactive jobs */
xrealloc(job_desc->environment,
sizeof(char *) * (job_desc->env_size + 2));
job_desc->environment[job_desc->env_size] = xstrdup(new_env);
job_desc->env_size++;
}
static void _add_env2(job_desc_msg_t *job_desc, char *key, char *val)
{
char *new_env = NULL;
if (!job_desc->environment || !key || !val)
return; /* Nothing we can do for interactive jobs */
xstrfmtcat(new_env, "%s=%s", key, val);
_add_env(job_desc, new_env);
xfree(new_env);
}
static void _decr_depend_cnt(job_record_t *job_ptr)
{
char buf[16], *end_ptr = NULL, *tok = NULL;
int cnt, width;
if (job_ptr->comment)
tok = strstr(job_ptr->comment, "on:");
if (!tok) {
info("%s: invalid job depend before option on job %u",
plugin_type, job_ptr->job_id);
return;
}
cnt = strtol(tok + 3, &end_ptr, 10);
if (cnt > 0)
cnt--;
width = MIN(sizeof(buf) - 1, (end_ptr - tok - 3));
sprintf(buf, "%*d", width, cnt);
memcpy(tok + 3, buf, width);
}
/* We can not invoke update_job_dependency() until the new job record has
* been created, hence this sleeping thread modifies the dependent job
* later. */
static void *_dep_agent(void *args)
{
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
job_record_t *job_ptr = (job_record_t *) args;
char *end_ptr = NULL, *tok;
int cnt = 0;
usleep(100000);
lock_slurmctld(job_write_lock);
if (job_ptr && job_ptr->details && (job_ptr->magic == JOB_MAGIC) &&
job_ptr->comment && strstr(job_ptr->comment, "on:")) {
char *new_depend = job_ptr->details->dependency;
job_ptr->details->dependency = NULL;
update_job_dependency(job_ptr, new_depend, false);
xfree(new_depend);
tok = strstr(job_ptr->comment, "on:");
cnt = strtol(tok + 3, &end_ptr, 10);
}
if (cnt == 0)
set_job_prio(job_ptr);
unlock_slurmctld(job_write_lock);
return NULL;
}
static void _xlate_before(char *depend, uint32_t submit_uid, uint32_t my_job_id)
{
uint32_t job_id;
char *last_ptr = NULL, *new_dep = NULL, *tok, *type;
job_record_t *job_ptr;
tok = strtok_r(depend, ":", &last_ptr);
if (!xstrcmp(tok, "before"))
type = "after";
else if (!xstrcmp(tok, "beforeany"))
type = "afterany";
else if (!xstrcmp(tok, "beforenotok"))
type = "afternotok";
else if (!xstrcmp(tok, "beforeok"))
type = "afterok";
else {
info("%s: discarding invalid job dependency option %s",
plugin_type, tok);
return;
}
/* NOTE: We are updating a job record here in order to implement
* the depend=before option. We are doing so without the write lock
* on the job record, but using a local mutex to prevent multiple
* updates on the same job when multiple jobs satisfying the dependency
* are being processed at the same time (all with read locks). The
* job read lock will prevent anyone else from getting a job write
* lock and using a job write lock causes serious performance problems
* for slow job_submit plugins. Not an ideal solution, but the best
* option that we see. */
slurm_mutex_lock(&depend_mutex);
tok = strtok_r(NULL, ":", &last_ptr);
while (tok) {
job_id = atoi(tok);
job_ptr = find_job_record(job_id);
if (!job_ptr) {
info("%s: discarding invalid job dependency before %s",
plugin_type, tok);
} else if ((submit_uid != job_ptr->user_id) &&
!validate_super_user(submit_uid)) {
error("%s: Security violation: uid %u trying to alter "
"job %u belonging to uid %u",
plugin_type, submit_uid, job_ptr->job_id,
job_ptr->user_id);
} else if ((!IS_JOB_PENDING(job_ptr)) ||
(job_ptr->details == NULL)) {
info("%s: discarding job before dependency on "
"non-pending job %u",
plugin_type, job_ptr->job_id);
} else {
if (job_ptr->details->dependency) {
xstrcat(new_dep, job_ptr->details->dependency);
xstrcat(new_dep, ",");
}
xstrfmtcat(new_dep, "%s:%u", type, my_job_id);
xfree(job_ptr->details->dependency);
job_ptr->details->dependency = new_dep;
new_dep = NULL;
_decr_depend_cnt(job_ptr);
slurm_thread_create_detached(_dep_agent, job_ptr);
}
tok = strtok_r(NULL, ":", &last_ptr);
}
slurm_mutex_unlock(&depend_mutex);
}
/* Translate PBS job dependencies to Slurm equivalents to the exptned possible
*
* PBS option Slurm nearest equivalent
* =========== ========================
* after after
* afterok afterok
* afternotok afternotok
* afterany after
* before (set after in referenced job and release as needed)
* beforeok (set afterok in referenced job and release as needed)
* beforenotok (set afternotok in referenced job and release as needed)
* beforeany (set afterany in referenced job and release as needed)
* N/A expand
* on (store value in job comment and hold it)
* N/A singleton
*/
static void _xlate_dependency(job_desc_msg_t *job_desc, uint32_t submit_uid,
uint32_t my_job_id)
{
char *result = NULL;
char *last_ptr = NULL, *tok;
if (!job_desc->dependency)
return;
#if _DEBUG
info("dependency in:%s", job_desc->dependency);
#endif
tok = strtok_r(job_desc->dependency, ",", &last_ptr);
while (tok) {
if (!xstrncmp(tok, "after", 5) ||
!xstrncmp(tok, "expand", 6) ||
!xstrncmp(tok, "singleton", 9)) {
if (result)
xstrcat(result, ",");
xstrcat(result, tok);
} else if (!xstrncmp(tok, "on:", 3)) {
job_desc->priority = 0; /* Job is held */
if (job_desc->comment)
xstrcat(job_desc->comment, ",");
xstrcat(job_desc->comment, tok);
} else if (!xstrncmp(tok, "before", 6)) {
_xlate_before(tok, submit_uid, my_job_id);
} else {
info("%s: discarding unknown job dependency option %s",
plugin_type, tok);
}
tok = strtok_r(NULL, ",", &last_ptr);
}
#if _DEBUG
info("dependency out:%s", result);
#endif
xfree(job_desc->dependency);
job_desc->dependency = result;
}
extern int job_submit(job_desc_msg_t *job_desc, uint32_t submit_uid)
{
char *std_out, *tok;
uint32_t my_job_id;
my_job_id = get_next_job_id(true);
_xlate_dependency(job_desc, submit_uid, my_job_id);
if (job_desc->account)
_add_env2(job_desc, "PBS_ACCOUNT", job_desc->account);
if (job_desc->script) {
/* Setting PBS_ENVIRONMENT causes Intel MPI to believe that
* it is running on a PBS system, which isn't the case here. */
/* _add_env(job_desc, "PBS_ENVIRONMENT=PBS_BATCH"); */
} else {
/* Interactive jobs lack an environment in the job submit
* RPC, so it needs to be handled by a SPANK plugin */
/* _add_env(job_desc, "PBS_ENVIRONMENT=PBS_INTERACTIVE"); */
}
if (job_desc->partition)
_add_env2(job_desc, "PBS_QUEUE", job_desc->partition);
if (job_desc->std_out)
std_out = job_desc->std_out;
else
std_out = "slurm-%j.out";
if (job_desc->comment)
xstrcat(job_desc->comment, ",");
xstrcat(job_desc->comment, "stdout=");
if (std_out && (std_out[0] != '/') && job_desc->work_dir) {
xstrcat(job_desc->comment, job_desc->work_dir);
xstrcat(job_desc->comment, "/");
}
tok = strstr(std_out, "%j");
if (tok) {
char *tok2;
char *tmp = xstrdup(std_out);
tok2 = strstr(tmp, "%j");
tok2[0] = '\0';
xstrfmtcat(tmp, "%u", my_job_id);
xstrcat(tmp, tok + 2);
xstrcat(job_desc->comment, tmp);
xfree(tmp);
} else {
xstrcat(job_desc->comment, std_out);
}
return SLURM_SUCCESS;
}
/* Lua script hook called for "modify job" event. */
extern int job_modify(job_desc_msg_t *job_desc, job_record_t *job_ptr,
uint32_t submit_uid, char **err_msg)
{
/* Locks: Read config, write job, read node, read partition
* HAVE BEEN SET ON ENTRY TO THIS FUNCTION */
char *tok;
xassert(job_ptr);
_xlate_dependency(job_desc, submit_uid, job_ptr->job_id);
if (job_desc->std_out) {
if (job_ptr->comment)
xstrcat(job_ptr->comment, ",");
xstrcat(job_ptr->comment, "stdout=");
if ((job_desc->std_out[0] != '/') && job_ptr->details &&
job_ptr->details->work_dir) {
xstrcat(job_ptr->comment, job_ptr->details->work_dir);
xstrcat(job_ptr->comment, "/");
}
tok = strstr(job_desc->std_out, "%j");
if (tok) {
char *tok2;
char *tmp = xstrdup(job_desc->std_out);
tok2 = strstr(tmp, "%j");
tok2[0] = '\0';
xstrfmtcat(tmp, "%u", job_ptr->job_id);
xstrcat(tmp, tok + 2);
xstrcat(job_ptr->comment, tmp);
xfree(tmp);
} else {
xstrcat(job_ptr->comment, job_desc->std_out);
}
xfree(job_desc->std_out);
}
return SLURM_SUCCESS;
}