blob: ab05d30c2a4efaabc5cf3bb35a645db2e86f2792 [file] [log] [blame]
/*****************************************************************************\
** agent.c - PMI2 handling thread
*****************************************************************************
* Copyright (C) 2011-2012 National University of Defense Technology.
* Written by Hongjia Cao <hjcao@nudt.edu.cn>.
* All rights reserved.
* Portions copyright (C) 2015 Mellanox Technologies Inc.
* Written by Artem Y. Polyakov <artemp@mellanox.com>.
* All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <poll.h>
#include "src/common/slurm_xlator.h"
#include "src/common/eio.h"
#include "src/common/macros.h"
#include "src/interfaces/mpi.h"
#include "src/common/xstring.h"
#include "src/common/xmalloc.h"
#include "src/slurmd/slurmstepd/slurmstepd_job.h"
#include "client.h"
#include "pmi.h"
#include "setup.h"
static int *initialized = NULL;
static int *finalized = NULL;
static eio_handle_t *pmi2_handle;
static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t agent_running_cond = PTHREAD_COND_INITIALIZER;
static pthread_t _agent_tid = 0;
static bool _tree_listen_readable(eio_obj_t *obj);
static int _tree_listen_read(eio_obj_t *obj, list_t *objs);
static struct io_operations tree_listen_ops = {
.readable = &_tree_listen_readable,
.handle_read = &_tree_listen_read,
};
static bool _task_readable(eio_obj_t *obj);
static int _task_read(eio_obj_t *obj, list_t *objs);
/* static bool _task_writable(eio_obj_t *obj); */
/* static int _task_write(eio_obj_t *obj, list_t *objs); */
static struct io_operations task_ops = {
.readable = &_task_readable,
.handle_read = &_task_read,
};
static int _handle_pmi1_init(int fd, int lrank);
/*********************************************************************/
static int
_handle_task_request(int fd, int lrank)
{
int rc = SLURM_SUCCESS;
debug3("mpi/pmi2: in _handle_task_request, lrank=%d", lrank);
if (initialized[lrank] == 0) {
rc = _handle_pmi1_init(fd, lrank);
initialized[lrank] = 1;
} else if (is_pmi11()) {
rc = handle_pmi1_cmd(fd, lrank);
} else if (is_pmi20()) {
rc = handle_pmi2_cmd(fd, lrank);
} else {
fatal("this is impossible");
}
return rc;
}
static int
_handle_tree_request(int fd)
{
uint32_t temp;
int rc = SLURM_SUCCESS;
if (in_stepd()) { /* skip uid passed from slurmd */
safe_read(fd, &temp, sizeof(uint32_t));
temp = ntohl(temp);
debug3("mpi/pmi2: _handle_tree_request: req from uid %u", temp);
}
rc = handle_tree_cmd(fd);
return rc;
rwfail:
return SLURM_ERROR;
}
/*********************************************************************/
static bool
_is_fd_ready(int fd)
{
struct pollfd pfd[1];
int rc;
pfd[0].fd = fd;
pfd[0].events = POLLIN;
rc = poll(pfd, 1, 10);
return ((rc == 1) && (pfd[0].revents & POLLIN));
}
static bool
_tree_listen_readable(eio_obj_t *obj)
{
debug2("mpi/pmi2: _tree_listen_readable");
if (obj->shutdown == true) {
if (obj->fd != -1) {
close(obj->fd);
obj->fd = -1;
}
debug2(" false, shutdown");
return false;
}
return true;
}
static int _tree_listen_read(eio_obj_t *obj, list_t *objs)
{
int sd;
slurm_addr_t addr;
socklen_t size = sizeof(addr);
debug2("mpi/pmi2: _tree_listen_read");
while (1) {
/*
* Return early if fd is not now ready
*/
if (!_is_fd_ready(obj->fd))
return 0;
while ((sd = accept4(obj->fd, (struct sockaddr *)&addr,
&size, SOCK_CLOEXEC)) < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) /* No more connections */
return 0;
if ((errno == ECONNABORTED) ||
(errno == EWOULDBLOCK)) {
return 0;
}
error("mpi/pmi2: unable to accept new connection: %m");
return 0;
}
if (! in_stepd()) {
debug3("mpi/pmi2: accepted tree connection: ip=%pA sd=%d",
&addr, sd);
}
/* read command from socket and handle it */
_handle_tree_request(sd);
close(sd);
}
return 0;
}
/*********************************************************************/
static bool
_task_readable(eio_obj_t *obj)
{
int lrank;
debug2("mpi/pmi2: _task_readable");
lrank = (int)(long)(obj->arg);
if (finalized[lrank] == 1) {
debug2(" false, finalized");
return false;
}
if (obj->shutdown == true) {
if (obj->fd != -1) {
close(obj->fd);
obj->fd = -1;
}
debug2(" false, shutdown");
return false;
}
return true;
}
static int _task_read(eio_obj_t *obj, list_t *objs)
{
int rc, lrank;
lrank = (int)(long)(obj->arg);
rc = _handle_task_request(obj->fd, lrank);
return rc;
}
/*********************************************************************/
/* the PMI1 init */
static int
_handle_pmi1_init(int fd, int lrank)
{
char buf[64];
int version, subversion;
int n, rc = 0;
debug3("mpi/pmi2: in _handle_pmi1_init");
while ( (n = read(fd, buf, 64)) < 0 && errno == EINTR);
if ((n < 0) || (n >= 64)) {
error("mpi/pmi2: failed to read PMI1 init command");
return SLURM_ERROR;
}
buf[n] = '\0';
n = sscanf(buf, "cmd=init pmi_version=%d pmi_subversion=%d\n",
&version, &subversion);
if (n != 2) {
error("mpi/pmi2: invalid PMI1 init command: `%s'", buf);
rc = 1;
version = 2;
subversion = 0;
goto send_response;
}
rc = set_pmi_version(version, subversion);
if (rc != SLURM_SUCCESS) {
get_pmi_version(&version, &subversion);
} else
rc = 0;
send_response:
snprintf(buf, 64, "cmd=response_to_init rc=%d pmi_version=%d "
"pmi_subversion=%d\n", rc, version, subversion);
while ( (n = write(fd, buf, strlen(buf))) < 0 && errno == EINTR);
if (n < 0) {
error ("mpi/pmi2: failed to write PMI1 init response");
return SLURM_ERROR;
}
debug3("mpi/pmi2: out _handle_pmi1_init");
return SLURM_SUCCESS;
}
/*********************************************************************/
/*
* main loop of agent thread
*/
static void *
_agent(void * unused)
{
eio_obj_t *tree_listen_obj, *task_obj;
int i;
pmi2_handle = eio_handle_create(0);
//fd_set_nonblocking(tree_sock);
tree_listen_obj = eio_obj_create(tree_sock, &tree_listen_ops,
(void *)(-1));
eio_new_initial_obj(pmi2_handle, tree_listen_obj);
/* for stepd, add the sockets to tasks */
if (in_stepd()) {
for (i = 0; i < job_info.ltasks; i ++) {
task_obj = eio_obj_create(STEPD_PMI_SOCK(i), &task_ops,
(void*)(long)(i));
eio_new_initial_obj(pmi2_handle, task_obj);
}
initialized = xmalloc(job_info.ltasks * sizeof(int));
finalized = xmalloc(job_info.ltasks * sizeof(int));
}
slurm_mutex_lock(&agent_mutex);
slurm_cond_signal(&agent_running_cond);
slurm_mutex_unlock(&agent_mutex);
eio_handle_mainloop(pmi2_handle);
debug("mpi/pmi2: agent thread exit");
eio_handle_destroy(pmi2_handle);
return NULL;
}
/*
* start the PMI2 agent thread
*/
extern int
pmi2_start_agent(void)
{
static bool first = true;
slurm_mutex_lock(&agent_mutex);
if (!first) {
slurm_mutex_unlock(&agent_mutex);
return SLURM_SUCCESS;
}
first = false;
slurm_thread_create(&_agent_tid, _agent, NULL);
slurm_cond_wait(&agent_running_cond, &agent_mutex);
debug("mpi/pmi2: started agent thread");
slurm_mutex_unlock(&agent_mutex);
return SLURM_SUCCESS;
}
/*
* stop the PMI2 agent thread
*/
extern int
pmi2_stop_agent(void)
{
slurm_mutex_lock(&agent_mutex);
if (_agent_tid) {
eio_signal_shutdown(pmi2_handle);
/* wait for the agent thread to stop */
slurm_thread_join(_agent_tid);
}
slurm_mutex_unlock(&agent_mutex);
return SLURM_SUCCESS;
}
extern void
task_finalize(int lrank)
{
finalized[lrank] = 1;
}