blob: 12a6834e66ea8a9450ef151aa4dbabdf24275045 [file] [log] [blame]
/*****************************************************************************\
* src/salloc/msg.c - Message handler for salloc
*****************************************************************************
* Copyright (C) 2006 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Christopher J. Morrone <morrone2@llnl.gov>
* UCRL-CODE-226842.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/un.h>
#include <sys/types.h>
#include <signal.h>
#include <pthread.h>
#include <slurm/slurm.h>
#include "src/common/slurm_protocol_defs.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_common.h"
#include "src/common/net.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/xmalloc.h"
#include "src/common/slurm_auth.h"
#include "src/common/eio.h"
#include "src/common/xsignal.h"
#include "src/salloc/salloc.h"
#include "src/salloc/opt.h"
#include "src/salloc/msg.h"
struct salloc_msg_thread {
eio_handle_t *handle;
pthread_t id;
};
static uid_t slurm_uid;
static void _handle_msg(slurm_msg_t *msg);
static bool _message_socket_readable(eio_obj_t *obj);
static int _message_socket_accept(eio_obj_t *obj, List objs);
static pthread_mutex_t msg_thr_start_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t msg_thr_start_cond = PTHREAD_COND_INITIALIZER;
static struct io_operations message_socket_ops = {
readable: &_message_socket_readable,
handle_read: &_message_socket_accept
};
static void *_msg_thr_internal(void *arg)
{
int signals[] = {SIGHUP, SIGINT, SIGQUIT, SIGPIPE, SIGTERM,
SIGUSR1, SIGUSR2, 0};
debug("Entering _msg_thr_internal");
xsignal_block(signals);
pthread_mutex_lock(&msg_thr_start_lock);
pthread_cond_signal(&msg_thr_start_cond);
pthread_mutex_unlock(&msg_thr_start_lock);
eio_handle_mainloop((eio_handle_t *)arg);
debug("Leaving _msg_thr_internal");
return NULL;
}
extern salloc_msg_thread_t *msg_thr_create(uint16_t *port)
{
int sock = -1;
eio_obj_t *obj;
salloc_msg_thread_t *msg_thr = NULL;
debug("Entering _msg_thr_create()");
slurm_uid = (uid_t) slurm_get_slurm_user_id();
msg_thr = (salloc_msg_thread_t *)xmalloc(sizeof(salloc_msg_thread_t));
if (net_stream_listen(&sock, (short *)port) < 0) {
error("unable to intialize step launch listening socket: %m");
xfree(msg_thr);
return NULL;
}
debug("port from net_stream_listen is %hu", *port);
obj = eio_obj_create(sock, &message_socket_ops, NULL);
msg_thr->handle = eio_handle_create();
eio_new_initial_obj(msg_thr->handle, obj);
pthread_mutex_lock(&msg_thr_start_lock);
if (pthread_create(&msg_thr->id, NULL,
_msg_thr_internal, (void *)msg_thr->handle) != 0) {
error("pthread_create of message thread: %m");
eio_handle_destroy(msg_thr->handle);
xfree(msg_thr);
return NULL;
}
/* Wait until the message thread has blocked signals
before continuing. */
pthread_cond_wait(&msg_thr_start_cond, &msg_thr_start_lock);
pthread_mutex_unlock(&msg_thr_start_lock);
return msg_thr;
}
extern void msg_thr_destroy(salloc_msg_thread_t *msg_thr)
{
if (msg_thr == NULL)
return;
eio_signal_shutdown(msg_thr->handle);
pthread_join(msg_thr->id, NULL);
eio_handle_destroy(msg_thr->handle);
xfree(msg_thr);
}
static bool _message_socket_readable(eio_obj_t *obj)
{
debug3("Called _message_socket_readable");
if (obj->shutdown == true) {
if (obj->fd != -1) {
debug2(" false, shutdown");
close(obj->fd);
obj->fd = -1;
/*_wait_for_connections();*/
} else {
debug2(" false");
}
return false;
} else {
return true;
}
}
static int _message_socket_accept(eio_obj_t *obj, List objs)
{
int fd;
unsigned char *uc;
short port;
struct sockaddr_un addr;
slurm_msg_t *msg = NULL;
int len = sizeof(addr);
debug3("Called _msg_socket_accept");
while ((fd = accept(obj->fd, (struct sockaddr *)&addr,
(socklen_t *)&len)) < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN
|| errno == ECONNABORTED
|| errno == EWOULDBLOCK) {
return SLURM_SUCCESS;
}
error("Error on msg accept socket: %m");
obj->shutdown = true;
return SLURM_SUCCESS;
}
fd_set_close_on_exec(fd);
fd_set_blocking(fd);
/* Should not call slurm_get_addr() because the IP may not be
in /etc/hosts. */
uc = (unsigned char *)&((struct sockaddr_in *)&addr)->sin_addr.s_addr;
port = ((struct sockaddr_in *)&addr)->sin_port;
debug2("got message connection from %u.%u.%u.%u:%hu",
uc[0], uc[1], uc[2], uc[3], ntohs(port));
fflush(stdout);
msg = xmalloc(sizeof(slurm_msg_t));
slurm_msg_t_init(msg);
again:
if(slurm_receive_msg(fd, msg, 0) != 0) {
printf("error on slurm_recieve_msg\n");
fflush(stdout);
if (errno == EINTR) {
goto again;
}
error("slurm_receive_msg[%u.%u.%u.%u]: %m",
uc[0],uc[1],uc[2],uc[3]);
goto cleanup;
}
_handle_msg(msg); /* handle_msg frees msg->data */
cleanup:
if ((msg->conn_fd >= 0) && slurm_close_accepted_conn(msg->conn_fd) < 0)
error ("close(%d): %m", msg->conn_fd);
slurm_free_msg(msg);
return SLURM_SUCCESS;
}
static void _handle_node_fail(slurm_msg_t *msg)
{
srun_node_fail_msg_t *nf = (srun_node_fail_msg_t *)msg->data;
error("Node failure on %s", nf->nodelist);
slurm_free_srun_node_fail_msg(msg->data);
}
/*
* Job has been notified of it's approaching time limit.
* Job will be killed shortly after timeout.
* This RPC can arrive multiple times with the same or updated timeouts.
*/
static void _handle_timeout(slurm_msg_t *msg)
{
static time_t last_timeout = 0;
srun_timeout_msg_t *to = (srun_timeout_msg_t *)msg->data;
debug3("received timeout message");
if (to->timeout != last_timeout) {
last_timeout = to->timeout;
info("Job allocation time limit to be reached at %s",
ctime(&to->timeout));
}
slurm_free_srun_timeout_msg(msg->data);
}
static void _handle_user_msg(slurm_msg_t *msg)
{
srun_user_msg_t *um;
um = msg->data;
info("%s", um->msg);
slurm_free_srun_user_msg(msg->data);
}
static void _handle_job_complete(slurm_msg_t *msg)
{
srun_job_complete_msg_t *comp = (srun_job_complete_msg_t *)msg->data;
debug3("job complete message received");
if (comp->step_id == NO_VAL) {
pthread_mutex_lock(&allocation_state_lock);
if (allocation_state != REVOKED) {
/* If the allocation_state is already REVOKED, then
* no need to print this message. We probably
* relinquished the allocation ourself.
*/
info("Job allocation %u has been revoked.",
comp->job_id);
}
if (allocation_state == GRANTED
&& command_pid > -1
&& opt.kill_command_signal_set) {
verbose("Sending signal %d to command \"%s\", pid %d",
opt.kill_command_signal,
command_argv[0], command_pid);
kill(command_pid, opt.kill_command_signal);
}
allocation_state = REVOKED;
pthread_mutex_unlock(&allocation_state_lock);
} else {
verbose("Job step %u.%u is finished.",
comp->job_id, comp->step_id);
}
slurm_free_srun_job_complete_msg(msg->data);
}
static void
_handle_msg(slurm_msg_t *msg)
{
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
uid_t uid = getuid();
if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) {
error ("Security violation, slurm message from uid %u",
(unsigned int) req_uid);
return;
}
switch (msg->msg_type) {
case SRUN_PING:
debug("received ping message");
slurm_send_rc_msg(msg, SLURM_SUCCESS);
slurm_free_srun_ping_msg(msg->data);
break;
case SRUN_JOB_COMPLETE:
_handle_job_complete(msg);
break;
case SRUN_TIMEOUT:
_handle_timeout(msg);
break;
case SRUN_USER_MSG:
_handle_user_msg(msg);
break;
case SRUN_NODE_FAIL:
_handle_node_fail(msg);
break;
default:
error("received spurious message type: %d\n",
msg->msg_type);
break;
}
return;
}