blob: 138c5104fc260d428314bf966d1adef5caaed9ed [file] [log] [blame]
/*****************************************************************************\
* checkpoint_aix.c - AIX slurm checkpoint plugin.
* $Id$
*****************************************************************************
* Copyright (C) 2004 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>
* UCRL-CODE-226842.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#if HAVE_STDINT_H
# include <stdint.h>
#endif
#if HAVE_INTTYPES_H
# include <inttypes.h>
#endif
#ifdef WITH_PTHREADS
# include <pthread.h>
#endif
#include <signal.h>
#include <stdio.h>
#include <time.h>
#include <slurm/slurm.h>
#include <slurm/slurm_errno.h>
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/pack.h"
#include "src/common/xassert.h"
#include "src/common/xstring.h"
#include "src/common/xmalloc.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/slurmctld.h"
struct check_job_info {
uint16_t disabled; /* counter, checkpointable only if zero */
uint16_t node_cnt;
uint16_t reply_cnt;
uint16_t wait_time;
time_t time_stamp; /* begin or end checkpoint time */
uint32_t error_code;
char *error_msg;
};
static void _send_sig(uint32_t job_id, uint32_t step_id, uint16_t signal,
char *node_name, slurm_addr node_addr);
static int _step_sig(struct step_record * step_ptr, uint16_t wait,
uint16_t signal, uint16_t sig_timeout);
/* checkpoint request timeout processing */
static pthread_t ckpt_agent_tid = 0;
static pthread_mutex_t ckpt_agent_mutex = PTHREAD_MUTEX_INITIALIZER;
static List ckpt_timeout_list = NULL;
struct ckpt_timeout_info {
uint32_t job_id;
uint32_t step_id;
uint16_t signal;
time_t start_time;
time_t end_time;
char *node_name;
slurm_addr node_addr;
};
static void *_ckpt_agent_thr(void *arg);
static void _ckpt_enqueue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time, uint16_t signal, uint16_t wait_time,
char *node_name, slurm_addr node_addr);
static void _ckpt_dequeue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time);
static void _ckpt_timeout_free(void *rec);
static void _ckpt_signal_step(struct ckpt_timeout_info *rec);
/*
* These variables are required by the generic plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it.
*
* plugin_name - a string giving a human-readable description of the
* plugin. There is no maximum length, but the symbol must refer to
* a valid string.
*
* plugin_type - a string suggesting the type of the plugin or its
* applicability to a particular form of data or method of data handling.
* If the low-level plugin API is used, the contents of this string are
* unimportant and may be anything. SLURM uses the higher-level plugin
* interface which requires this string to be of the form
*
* <application>/<method>
*
* where <application> is a description of the intended application of
* the plugin (e.g., "checkpoint" for SLURM checkpoint) and <method>
* is a description of how this plugin satisfies that application. SLURM will
* only load checkpoint plugins if the plugin_type string has a
* prefix of "checkpoint/".
*
* plugin_version - an unsigned 32-bit integer giving the version number
* of the plugin. If major and minor revisions are desired, the major
* version number may be multiplied by a suitable magnitude constant such
* as 100 or 1000. Various SLURM versions will likely require a certain
* minimum versions for their plugins as the checkpoint API matures.
*/
const char plugin_name[] = "Checkpoint AIX plugin";
const char plugin_type[] = "checkpoint/aix";
const uint32_t plugin_version = 90;
/*
* init() is called when the plugin is loaded, before any other functions
* are called. Put global initialization here.
*/
extern int init ( void )
{
pthread_attr_t attr;
slurm_attr_init(&attr);
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate: %m");
if (pthread_create(&ckpt_agent_tid, &attr, _ckpt_agent_thr, NULL)) {
error("pthread_create: %m");
return SLURM_ERROR;
}
slurm_attr_destroy(&attr);
return SLURM_SUCCESS;
}
extern int fini ( void )
{
int i;
if (!&ckpt_agent_tid)
return SLURM_SUCCESS;
for (i=0; i<4; i++) {
if (pthread_cancel(ckpt_agent_tid)) {
ckpt_agent_tid = 0;
return SLURM_SUCCESS;
}
usleep(1000);
}
error("Could not kill checkpoint pthread");
return SLURM_ERROR;
}
/*
* The remainder of this file implements the standard SLURM checkpoint API.
*/
extern int slurm_ckpt_op ( uint16_t op, uint16_t data,
struct step_record * step_ptr, time_t * event_time,
uint32_t *error_code, char **error_msg )
{
int rc = SLURM_SUCCESS;
struct check_job_info *check_ptr;
xassert(step_ptr);
check_ptr = (struct check_job_info *) step_ptr->check_job;
xassert(check_ptr);
switch (op) {
case CHECK_ABLE:
if (check_ptr->disabled)
rc = ESLURM_DISABLED;
else {
if ((check_ptr->reply_cnt < check_ptr->node_cnt)
&& event_time) {
/* Return time of last event */
*event_time = check_ptr->time_stamp;
}
rc = SLURM_SUCCESS;
}
break;
case CHECK_DISABLE:
check_ptr->disabled++;
break;
case CHECK_ENABLE:
check_ptr->disabled--;
break;
case CHECK_CREATE:
check_ptr->time_stamp = time(NULL);
check_ptr->reply_cnt = 0;
check_ptr->error_code = 0;
xfree(check_ptr->error_msg);
#ifdef SIGSOUND
rc = _step_sig(step_ptr, data, SIGSOUND, SIGWINCH);
#else
/* No checkpoint, SIGWINCH for testing purposes */
info("Checkpoint not supported, sending SIGWINCH");
rc = _step_sig(step_ptr, data, SIGWINCH, SIGWINCH);
#endif
break;
case CHECK_VACATE:
check_ptr->time_stamp = time(NULL);
check_ptr->reply_cnt = 0;
check_ptr->error_code = 0;
xfree(check_ptr->error_msg);
#ifdef SIGMIGRATE
rc = _step_sig(step_ptr, data, SIGMIGRATE, SIGTERM);
#else
/* No checkpoint, kill job now, useful for testing */
info("Checkpoint not supported, sending SIGTERM");
rc = _step_sig(step_ptr, data, SIGTERM, SIGTERM);
#endif
break;
case CHECK_RESTART:
rc = ESLURM_NOT_SUPPORTED;
break;
case CHECK_ERROR:
xassert(error_code);
xassert(error_msg);
*error_code = check_ptr->error_code;
xfree(*error_msg);
*error_msg = xstrdup(check_ptr->error_msg);
break;
default:
error("Invalid checkpoint operation: %d", op);
rc = EINVAL;
}
return rc;
}
extern int slurm_ckpt_comp ( struct step_record * step_ptr, time_t event_time,
uint32_t error_code, char *error_msg )
{
struct check_job_info *check_ptr;
xassert(step_ptr);
check_ptr = (struct check_job_info *) step_ptr->check_job;
xassert(check_ptr);
if (event_time && (event_time != check_ptr->time_stamp))
return ESLURM_ALREADY_DONE;
if (error_code > check_ptr->error_code) {
info("slurm_ckpt_comp for step %u.%u error %u: %s",
step_ptr->job_ptr->job_id, step_ptr->step_id,
error_code, error_msg);
check_ptr->error_code = error_code;
xfree(check_ptr->error_msg);
check_ptr->error_msg = xstrdup(error_msg);
return SLURM_SUCCESS;
}
/* We need an error-free reply from each compute node,
* plus POE itself to note completion */
if (check_ptr->reply_cnt++ == check_ptr->node_cnt) {
time_t now = time(NULL);
long delay = (long) difftime(now, check_ptr->time_stamp);
info("slurm_ckpt_comp for step %u.%u in %ld secs",
step_ptr->job_ptr->job_id, step_ptr->step_id,
delay);
check_ptr->time_stamp = now;
_ckpt_dequeue_timeout(step_ptr->job_ptr->job_id,
step_ptr->step_id, event_time);
}
return SLURM_SUCCESS;
}
extern int slurm_ckpt_alloc_job(check_jobinfo_t *jobinfo)
{
*jobinfo = (check_jobinfo_t) xmalloc(sizeof(struct check_job_info));
return SLURM_SUCCESS;
}
extern int slurm_ckpt_free_job(check_jobinfo_t jobinfo)
{
xfree(jobinfo);
return SLURM_SUCCESS;
}
extern int slurm_ckpt_pack_job(check_jobinfo_t jobinfo, Buf buffer)
{
struct check_job_info *check_ptr =
(struct check_job_info *)jobinfo;
pack16(check_ptr->disabled, buffer);
pack16(check_ptr->node_cnt, buffer);
pack16(check_ptr->reply_cnt, buffer);
pack16(check_ptr->wait_time, buffer);
pack32(check_ptr->error_code, buffer);
packstr(check_ptr->error_msg, buffer);
pack_time(check_ptr->time_stamp, buffer);
return SLURM_SUCCESS;
}
extern int slurm_ckpt_unpack_job(check_jobinfo_t jobinfo, Buf buffer)
{
uint16_t uint16_tmp;
struct check_job_info *check_ptr =
(struct check_job_info *)jobinfo;
safe_unpack16(&check_ptr->disabled, buffer);
safe_unpack16(&check_ptr->node_cnt, buffer);
safe_unpack16(&check_ptr->reply_cnt, buffer);
safe_unpack16(&check_ptr->wait_time, buffer);
safe_unpack32(&check_ptr->error_code, buffer);
safe_unpackstr_xmalloc(&check_ptr->error_msg, &uint16_tmp, buffer);
safe_unpack_time(&check_ptr->time_stamp, buffer);
return SLURM_SUCCESS;
unpack_error:
xfree(check_ptr->error_msg);
return SLURM_ERROR;
}
/* Send a signal RPC to a specific node */
static void _send_sig(uint32_t job_id, uint32_t step_id, uint16_t signal,
char *node_name, slurm_addr node_addr)
{
agent_arg_t *agent_args;
kill_tasks_msg_t *kill_tasks_msg;
kill_tasks_msg = xmalloc(sizeof(kill_tasks_msg_t));
kill_tasks_msg->job_id = job_id;
kill_tasks_msg->job_step_id = step_id;
kill_tasks_msg->signal = signal;
agent_args = xmalloc(sizeof(agent_arg_t));
agent_args->msg_type = REQUEST_SIGNAL_TASKS;
agent_args->retry = 1;
agent_args->msg_args = kill_tasks_msg;
agent_args->hostlist = hostlist_create(node_name);
agent_args->node_count = 1;
agent_queue_request(agent_args);
}
/* Send specified signal only to the process launched on node 0.
* If the request times out, send sig_timeout. */
static int _step_sig(struct step_record * step_ptr, uint16_t wait,
uint16_t signal, uint16_t sig_timeout)
{
struct check_job_info *check_ptr;
struct job_record *job_ptr;
int i;
xassert(step_ptr);
check_ptr = (struct check_job_info *) step_ptr->check_job;
xassert(check_ptr);
job_ptr = step_ptr->job_ptr;
xassert(job_ptr);
if (IS_JOB_FINISHED(job_ptr))
return ESLURM_ALREADY_DONE;
if (check_ptr->disabled)
return ESLURM_DISABLED;
check_ptr->node_cnt = 0; /* re-calculate below */
for (i = 0; i < node_record_count; i++) {
if (bit_test(step_ptr->step_node_bitmap, i) == 0)
continue;
if (check_ptr->node_cnt++ > 0)
continue;
_send_sig(step_ptr->job_ptr->job_id, step_ptr->step_id,
signal, node_record_table_ptr[i].name,
node_record_table_ptr[i].slurm_addr);
_ckpt_enqueue_timeout(step_ptr->job_ptr->job_id,
step_ptr->step_id, check_ptr->time_stamp,
sig_timeout, wait, node_record_table_ptr[i].name,
node_record_table_ptr[i].slurm_addr);
}
if (!check_ptr->node_cnt) {
error("_step_sig: job %u.%u has no nodes", job_ptr->job_id,
step_ptr->step_id);
return ESLURM_INVALID_NODE_NAME;
}
check_ptr->time_stamp = time(NULL);
check_ptr->wait_time = wait;
info("checkpoint requested for job %u.%u", job_ptr->job_id,
step_ptr->step_id);
return SLURM_SUCCESS;
}
/* Checkpoint processing pthread
* Never returns, but is cancelled on plugin termiantion */
static void *_ckpt_agent_thr(void *arg)
{
ListIterator iter;
struct ckpt_timeout_info *rec;
time_t now;
while (1) {
sleep(1);
if (!ckpt_timeout_list)
continue;
now = time(NULL);
iter = list_iterator_create(ckpt_timeout_list);
slurm_mutex_lock(&ckpt_agent_mutex);
/* look for and process any timeouts */
while ((rec = list_next(iter))) {
if (rec->end_time > now)
continue;
info("checkpoint timeout for %u.%u",
rec->job_id, rec->step_id);
_ckpt_signal_step(rec);
list_delete(iter);
}
slurm_mutex_unlock(&ckpt_agent_mutex);
list_iterator_destroy(iter);
}
}
static void _ckpt_signal_step(struct ckpt_timeout_info *rec)
{
/* debug("signal %u.%u %u", rec->job_id, rec->step_id, rec->signal); */
_send_sig(rec->job_id, rec->step_id, rec->signal,
rec->node_name, rec->node_addr);
}
/* Queue a checkpoint request timeout */
static void _ckpt_enqueue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time, uint16_t signal, uint16_t wait_time,
char *node_name, slurm_addr node_addr)
{
struct ckpt_timeout_info *rec;
if ((wait_time == 0) || (signal == 0))
return;
slurm_mutex_lock(&ckpt_agent_mutex);
if (!ckpt_timeout_list)
ckpt_timeout_list = list_create(_ckpt_timeout_free);
rec = xmalloc(sizeof(struct ckpt_timeout_info));
rec->job_id = job_id;
rec->step_id = step_id;
rec->signal = signal;
rec->start_time = start_time;
rec->end_time = start_time + wait_time;
rec->node_name = xstrdup(node_name);
rec->node_addr = node_addr;
/* debug("enqueue %u.%u %u", job_id, step_id, wait_time); */
list_enqueue(ckpt_timeout_list, rec);
slurm_mutex_unlock(&ckpt_agent_mutex);
}
static void _ckpt_timeout_free(void *rec)
{
struct ckpt_timeout_info *ckpt_rec = (struct ckpt_timeout_info *)rec;
if (ckpt_rec) {
xfree(ckpt_rec->node_name);
xfree(ckpt_rec);
}
}
/* De-queue a checkpoint timeout request. The operation completed */
static void _ckpt_dequeue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time)
{
ListIterator iter;
struct ckpt_timeout_info *rec;
slurm_mutex_lock(&ckpt_agent_mutex);
if (!ckpt_timeout_list)
goto fini;
iter = list_iterator_create(ckpt_timeout_list);
while ((rec = list_next(iter))) {
if ((rec->job_id != job_id) || (rec->step_id != step_id)
|| (start_time && (rec->start_time != start_time)))
continue;
/* debug("dequeue %u.%u", job_id, step_id); */
list_delete(iter);
break;
}
list_iterator_destroy(iter);
fini:
slurm_mutex_unlock(&ckpt_agent_mutex);
}