blob: defd2b9a21d54db6186cf0de69ec8dbad8236ae1 [file] [log] [blame] [edit]
/*****************************************************************************\
* srun_comm.c - srun communications
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>
* UCRL-CODE-226842.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <string.h>
#include "src/common/node_select.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/srun_comm.h"
#define SRUN_LAUNCH_MSG 0
/* Launch the srun request. Note that retry is always zero since
* we don't want to clog the system up with messages destined for
* defunct srun processes
*/
static void _srun_agent_launch(slurm_addr *addr, char *host,
slurm_msg_type_t type, void *msg_args)
{
agent_arg_t *agent_args = xmalloc(sizeof(agent_arg_t));
agent_args->node_count = 1;
agent_args->retry = 0;
agent_args->addr = addr;
agent_args->hostlist = hostlist_create(host);
agent_args->msg_type = type;
agent_args->msg_args = msg_args;
agent_queue_request(agent_args);
}
/*
* srun_allocate - notify srun of a resource allocation
* IN job_id - id of the job allocated resource
*/
extern void srun_allocate (uint32_t job_id)
{
struct job_record *job_ptr = find_job_record (job_id);
xassert(job_ptr);
if (job_ptr && job_ptr->alloc_resp_port
&& job_ptr->alloc_resp_host && job_ptr->alloc_resp_host[0]) {
slurm_addr * addr;
resource_allocation_response_msg_t *msg_arg;
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->alloc_resp_port,
job_ptr->alloc_resp_host);
msg_arg = xmalloc(sizeof(resource_allocation_response_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->node_list = xstrdup(job_ptr->nodes);
msg_arg->num_cpu_groups = job_ptr->num_cpu_groups;
msg_arg->cpus_per_node = xmalloc(sizeof(uint32_t) *
job_ptr->num_cpu_groups);
memcpy(msg_arg->cpus_per_node, job_ptr->cpus_per_node,
(sizeof(uint32_t) * job_ptr->num_cpu_groups));
msg_arg->cpu_count_reps = xmalloc(sizeof(uint32_t) *
job_ptr->num_cpu_groups);
memcpy(msg_arg->cpu_count_reps, job_ptr->cpu_count_reps,
(sizeof(uint32_t) * job_ptr->num_cpu_groups));
msg_arg->node_cnt = job_ptr->node_cnt;
msg_arg->select_jobinfo = select_g_copy_jobinfo(
job_ptr->select_jobinfo);
msg_arg->error_code = SLURM_SUCCESS;
_srun_agent_launch(addr, job_ptr->alloc_resp_host,
RESPONSE_RESOURCE_ALLOCATION, msg_arg);
}
}
/*
* srun_allocate_abort - notify srun of a resource allocation failure
* IN job_id - id of the job allocated resource
*/
extern void srun_allocate_abort(struct job_record *job_ptr)
{
if (job_ptr && job_ptr->alloc_resp_port
&& job_ptr->alloc_resp_host && job_ptr->alloc_resp_host[0]) {
slurm_addr * addr;
srun_job_complete_msg_t *msg_arg;
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->alloc_resp_port,
job_ptr->alloc_resp_host);
msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = NO_VAL;
_srun_agent_launch(addr, job_ptr->alloc_resp_host,
SRUN_JOB_COMPLETE,
msg_arg);
}
}
/*
* srun_node_fail - notify srun of a node's failure
* IN job_id - id of job to notify
* IN node_name - name of failed node
*/
extern void srun_node_fail (uint32_t job_id, char *node_name)
{
struct node_record *node_ptr;
struct job_record *job_ptr = find_job_record (job_id);
int bit_position;
slurm_addr * addr;
srun_node_fail_msg_t *msg_arg;
ListIterator step_iterator;
struct step_record *step_ptr;
xassert(job_ptr);
xassert(node_name);
if (!job_ptr || job_ptr->job_state != JOB_RUNNING)
return;
if (!node_name || (node_ptr = find_node_record(node_name)) == NULL)
return;
bit_position = node_ptr - node_record_table_ptr;
if (job_ptr->other_port
&& job_ptr->other_host && job_ptr->other_host[0]) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->other_port, job_ptr->other_host);
msg_arg = xmalloc(sizeof(srun_node_fail_msg_t));
msg_arg->job_id = job_id;
msg_arg->step_id = NO_VAL;
msg_arg->nodelist = xstrdup(node_name);
_srun_agent_launch(addr, job_ptr->other_host, SRUN_NODE_FAIL,
msg_arg);
}
step_iterator = list_iterator_create(job_ptr->step_list);
while ((step_ptr = (struct step_record *) list_next(step_iterator))) {
if (!bit_test(step_ptr->step_node_bitmap, bit_position))
continue; /* job step not on this node */
if ( (step_ptr->port == 0) ||
(step_ptr->host == NULL) ||
(step_ptr->batch_step) ||
(step_ptr->host[0] == '\0') )
continue;
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, step_ptr->port, step_ptr->host);
msg_arg = xmalloc(sizeof(srun_node_fail_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = step_ptr->step_id;
msg_arg->nodelist = xstrdup(node_name);
_srun_agent_launch(addr, step_ptr->host, SRUN_NODE_FAIL,
msg_arg);
}
list_iterator_destroy(step_iterator);
}
/* srun_ping - ping all srun commands that have not been heard from recently */
extern void srun_ping (void)
{
ListIterator job_iterator;
struct job_record *job_ptr;
slurm_addr * addr;
time_t now = time(NULL);
time_t old = now - (slurmctld_conf.inactive_limit / 2);
srun_ping_msg_t *msg_arg;
if (slurmctld_conf.inactive_limit == 0)
return; /* No limit, don't bother pinging */
job_iterator = list_iterator_create(job_list);
while ((job_ptr = (struct job_record *) list_next(job_iterator))) {
xassert (job_ptr->magic == JOB_MAGIC);
if (job_ptr->job_state != JOB_RUNNING)
continue;
if ( (job_ptr->time_last_active <= old)
&& job_ptr->other_port
&& job_ptr->other_host && job_ptr->other_host[0] ) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->other_port,
job_ptr->other_host);
msg_arg = xmalloc(sizeof(srun_ping_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = NO_VAL;
_srun_agent_launch(addr, job_ptr->other_host,
SRUN_PING, msg_arg);
}
}
list_iterator_destroy(job_iterator);
}
/*
* srun_timeout - notify srun of a job's imminent timeout
* IN job_ptr - pointer to the slurmctld job record
*/
extern void srun_timeout (struct job_record *job_ptr)
{
slurm_addr * addr;
srun_timeout_msg_t *msg_arg;
ListIterator step_iterator;
struct step_record *step_ptr;
xassert(job_ptr);
if (job_ptr->job_state != JOB_RUNNING)
return;
if (job_ptr->other_port
&& job_ptr->other_host && job_ptr->other_host[0]) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->other_port, job_ptr->other_host);
msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = NO_VAL;
msg_arg->timeout = job_ptr->end_time;
_srun_agent_launch(addr, job_ptr->other_host, SRUN_TIMEOUT,
msg_arg);
}
step_iterator = list_iterator_create(job_ptr->step_list);
while ((step_ptr = (struct step_record *) list_next(step_iterator))) {
if ( (step_ptr->port == 0) ||
(step_ptr->host == NULL) ||
(step_ptr->batch_step) ||
(step_ptr->host[0] == '\0') )
continue;
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, step_ptr->port, step_ptr->host);
msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = step_ptr->step_id;
msg_arg->timeout = job_ptr->end_time;
_srun_agent_launch(addr, step_ptr->host, SRUN_TIMEOUT,
msg_arg);
}
list_iterator_destroy(step_iterator);
}
/*
* srun_user_message - Send arbitrary message to an srun job (no job steps)
*/
extern void srun_user_message(struct job_record *job_ptr, char *msg)
{
slurm_addr * addr;
srun_user_msg_t *msg_arg;
xassert(job_ptr);
if ((job_ptr->job_state != JOB_PENDING)
&& (job_ptr->job_state != JOB_RUNNING))
return;
if (job_ptr->other_port
&& job_ptr->other_host && job_ptr->other_host[0]) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->other_port, job_ptr->other_host);
msg_arg = xmalloc(sizeof(srun_user_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->msg = xstrdup(msg);
_srun_agent_launch(addr, job_ptr->other_host, SRUN_USER_MSG,
msg_arg);
}
}
/*
* srun_job_complete - notify srun of a job's termination
* IN job_ptr - pointer to the slurmctld job record
*/
extern void srun_job_complete (struct job_record *job_ptr)
{
slurm_addr * addr;
srun_job_complete_msg_t *msg_arg;
ListIterator step_iterator;
struct step_record *step_ptr;
xassert(job_ptr);
if (job_ptr->other_port
&& job_ptr->other_host && job_ptr->other_host[0]) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, job_ptr->other_port, job_ptr->other_host);
msg_arg = xmalloc(sizeof(srun_timeout_msg_t));
msg_arg->job_id = job_ptr->job_id;
msg_arg->step_id = NO_VAL;
_srun_agent_launch(addr, job_ptr->other_host,
SRUN_JOB_COMPLETE,
msg_arg);
}
step_iterator = list_iterator_create(job_ptr->step_list);
while ((step_ptr = (struct step_record *) list_next(step_iterator))) {
if (step_ptr->batch_step) /* batch script itself */
continue;
srun_step_complete(step_ptr);
}
list_iterator_destroy(step_iterator);
}
/*
* srun_step_complete - notify srun of a job step's termination
* IN step_ptr - pointer to the slurmctld job step record
*/
extern void srun_step_complete (struct step_record *step_ptr)
{
slurm_addr * addr;
srun_job_complete_msg_t *msg_arg;
xassert(step_ptr);
if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, step_ptr->port, step_ptr->host);
msg_arg = xmalloc(sizeof(srun_job_complete_msg_t));
msg_arg->job_id = step_ptr->job_ptr->job_id;
msg_arg->step_id = step_ptr->step_id;
_srun_agent_launch(addr, step_ptr->host, SRUN_JOB_COMPLETE,
msg_arg);
}
}
/*
* srun_exec - request that srun execute a specific command
* and route it's output to stdout
* IN step_ptr - pointer to the slurmctld job step record
* IN argv - command and arguments to execute
*/
extern void srun_exec(struct step_record *step_ptr, char **argv)
{
slurm_addr * addr;
srun_exec_msg_t *msg_arg;
int cnt = 1, i;
xassert(step_ptr);
if (step_ptr->port && step_ptr->host && step_ptr->host[0]) {
for (i=0; argv[i]; i++)
cnt++; /* start at 1 to include trailing NULL */
addr = xmalloc(sizeof(struct sockaddr_in));
slurm_set_addr(addr, step_ptr->port, step_ptr->host);
msg_arg = xmalloc(sizeof(srun_exec_msg_t));
msg_arg->job_id = step_ptr->job_ptr->job_id;
msg_arg->step_id = step_ptr->step_id;
msg_arg->argc = cnt;
msg_arg->argv = xmalloc(sizeof(char *) * cnt);
for (i=0; i<cnt ; i++)
msg_arg->argv[i] = xstrdup(argv[i]);
_srun_agent_launch(addr, step_ptr->host, SRUN_EXEC,
msg_arg);
} else {
error("srun_exec %u.%u lacks communication channel",
step_ptr->job_ptr->job_id, step_ptr->step_id);
}
}
/*
* srun_response - note that srun has responded
* IN job_id - id of job responding
* IN step_id - id of step responding or NO_VAL if not a step
*/
extern void srun_response(uint32_t job_id, uint32_t step_id)
{
struct job_record *job_ptr = find_job_record (job_id);
time_t now = time(NULL);
if (job_ptr == NULL)
return;
job_ptr->time_last_active = now;
}