blob: b0fa098953aef0d9fe018979f8ac41c53450f3b1 [file] [log] [blame] [edit]
/*****************************************************************************\
** mpichmx.c - srun support for MPICH-MX (based upon MPICH-GM code)
*****************************************************************************
* Copyright (C) 2004 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Takao Hatazaki <takao.hatazaki@hp.com>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <https://computing.llnl.gov/linux/slurm/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#ifdef WITH_PTHREADS
# include <pthread.h>
#endif
#include <signal.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include "src/common/slurm_xlator.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/net.h"
#include "src/common/mpi.h"
#include "src/plugins/mpi/mpichmx/mpichmx.h"
typedef struct {
int defined;
unsigned int port_board_id;
unsigned int unique_high_id;
unsigned int unique_low_id;
unsigned int numanode;
unsigned int remote_pid;
unsigned int remote_port;
} gm_slave_t;
#define GMPI_RECV_BUF_LEN 65536
struct gmpi_state {
pthread_t tid;
int fd; /* = -1 */
mpi_plugin_client_info_t *job;
};
static int _gmpi_parse_init_recv_msg(mpi_plugin_client_info_t *job, char *rbuf,
gm_slave_t *slave_data, int *ii)
{
unsigned int magic, id, port_board_id, unique_high_id,
unique_low_id, numanode, remote_pid, remote_port;
int got;
gm_slave_t *dp;
got = sscanf(rbuf, "<<<%u:%u:%u:%u:%u:%u:%u::%u>>>",
&magic, &id, &port_board_id, &unique_high_id,
&unique_low_id, &numanode, &remote_pid, &remote_port);
*ii = id;
if (got != 8) {
error("GMPI master received invalid init message");
return -1;
}
if (magic != job->jobid) {
error("GMPI master received invalid magic number");
return -1;
}
if (id >= job->step_layout->task_cnt)
fatal("GMPI id is out of range");
#if 0
/* Unlike GM ports, MX endpoints can be 0,
* Pere Munt, BSC-CMS */
if (port_board_id == 0)
fatal("MPI id=%d was unable to open a GM port", id);
#endif
dp = &slave_data[id];
if (dp->defined) {
error("Ignoring the message from MPI id=%d", id);
return -1;
}
dp->defined = 1;
dp->port_board_id = port_board_id;
dp->unique_high_id = unique_high_id;
dp->unique_low_id = unique_low_id;
dp->numanode = numanode;
dp->remote_pid = remote_pid;
dp->remote_port = remote_port;
debug3("slave_data[%d]: <<<%u:%u:%u:%u:%u:%u:%u::%u>>>",
id, magic, id, port_board_id,
dp->unique_high_id, dp->unique_low_id, dp->numanode,
dp->remote_pid, dp->remote_port);
return 0;
}
static int _gmpi_establish_map(gmpi_state_t *st)
{
mpi_plugin_client_info_t *job = st->job;
struct sockaddr_in addr;
in_addr_t *iaddrs;
socklen_t addrlen;
int accfd, newfd, rlen, nprocs, i, j, id;
size_t gmaplen, lmaplen, maplen;
char *p, *rbuf = NULL, *gmap = NULL, *lmap = NULL, *map = NULL;
char tmp[128];
gm_slave_t *slave_data = NULL, *dp;
/*
* Collect info from slaves.
* Will never finish unless slaves are GMPI processes.
*/
accfd = st->fd;
addrlen = sizeof(addr);
nprocs = job->step_layout->task_cnt;
iaddrs = (in_addr_t *)xmalloc(sizeof(*iaddrs)*nprocs);
slave_data = (gm_slave_t *)xmalloc(sizeof(*slave_data)*nprocs);
for (i=0; i<nprocs; i++)
slave_data[i].defined = 0;
i = 0;
rbuf = (char *)xmalloc(GMPI_RECV_BUF_LEN);
while (i < nprocs) {
newfd = accept(accfd, (struct sockaddr *)&addr, &addrlen);
if (newfd == -1) {
error("accept(2) in GMPI master thread: %m");
continue;
}
rlen = recv(newfd, rbuf, GMPI_RECV_BUF_LEN, 0);
if (rlen <= 0) {
error("GMPI master recv returned %d", rlen);
close(newfd);
continue;
} else {
rbuf[rlen] = 0;
}
if (_gmpi_parse_init_recv_msg(job, rbuf, slave_data,
&id) == 0) {
i++;
iaddrs[id] = ntohl(addr.sin_addr.s_addr);
}
close(newfd);
}
xfree(rbuf);
debug2("Received data from all of %d GMPI processes.", i);
/*
* Compose the global map string.
*/
gmap = (char *)xmalloc(128*nprocs);
p = gmap;
strcpy(p, "[[[");
p += 3;
for (i=0; i<nprocs; i++) {
dp = &slave_data[i];
sprintf(tmp, "<%u:%u:%u:%u>", dp->port_board_id,
dp->unique_high_id, dp->unique_low_id, dp->numanode);
strcpy(p, tmp);
p += strlen(tmp);
}
strcpy(p, "|||");
p += 3;
gmaplen = (size_t)(p - gmap);
/*
* Respond to slaves.
*/
lmap = (char *)xmalloc(128*nprocs);
for (i=0; i<nprocs; i++) {
/*
* Compose the string to send.
*/
dp = &slave_data[i];
p = lmap;
for (j=0; j<nprocs; j++) {
if (iaddrs[i] == iaddrs[j] &&
(dp->numanode == slave_data[j].numanode)) {
sprintf(tmp, "<%u>", j);
strcpy(p, tmp);
p += strlen(tmp);
}
}
lmaplen = (size_t)(p - lmap);
map = (char *)xmalloc(gmaplen+lmaplen+4);
strcpy(map, gmap);
strcpy(map+gmaplen, lmap);
strcpy(map+gmaplen+lmaplen, "]]]");
maplen = gmaplen + lmaplen + 3;
/*
* Send it.
*/
if ((newfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
fatal("GMPI master failed to respond");
}
j = 1;
if (setsockopt(newfd, SOL_SOCKET, SO_REUSEADDR,
(void *)&j, sizeof(j)))
error("setsockopt in GMPI master: %m");
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(iaddrs[i]);
addr.sin_port = htons(dp->remote_port);
if (connect(newfd, (struct sockaddr *)&addr, sizeof(addr)))
fatal("GMPI master failed to connect");
send(newfd, map, maplen, 0);
close(newfd);
xfree(map);
}
xfree(slave_data);
xfree(lmap);
xfree(gmap);
xfree(iaddrs);
debug2("GMPI master responded to all GMPI processes");
return 0;
}
static void _gmpi_wait_abort(gmpi_state_t *st)
{
mpi_plugin_client_info_t *job = st->job;
struct sockaddr_in addr;
socklen_t addrlen;
int newfd, rlen;
unsigned int magic;
char *rbuf;
rbuf = (char *)xmalloc(GMPI_RECV_BUF_LEN);
addrlen = sizeof(addr);
while (1) {
newfd = accept(st->fd, (struct sockaddr *)&addr,
&addrlen);
if (newfd == -1) {
fatal("GMPI master failed to accept (abort-wait)");
}
rlen = recv(newfd, rbuf, GMPI_RECV_BUF_LEN, 0);
if (rlen <= 0) {
error("GMPI recv (abort-wait) returned %d", rlen);
close(newfd);
continue;
} else {
rbuf[rlen] = 0;
}
if (sscanf(rbuf, "<<<ABORT_%u_ABORT>>>", &magic) != 1) {
error("GMPI (abort-wait) received spurious message.");
close(newfd);
continue;
}
if (magic != job->jobid) {
error("GMPI (abort-wait) received bad magic number.");
close(newfd);
continue;
}
close(newfd);
debug("Received ABORT message from an MPI process.");
slurm_signal_job_step(job->jobid, job->stepid, SIGKILL);
#if 0
xfree(rbuf);
close(jgmpi_fd);
gmpi_fd = -1;
return;
#endif
}
}
static void *_gmpi_thr(void *arg)
{
gmpi_state_t *st;
mpi_plugin_client_info_t *job;
st = (gmpi_state_t *) arg;
job = st->job;
debug3("GMPI master thread pid=%lu", (unsigned long) getpid());
_gmpi_establish_map(st);
debug3("GMPI master thread is waiting for ABORT message.");
_gmpi_wait_abort(st);
return (void *)0;
}
static gmpi_state_t *
gmpi_state_create(const mpi_plugin_client_info_t *job)
{
gmpi_state_t *state;
state = (gmpi_state_t *)xmalloc(sizeof(gmpi_state_t));
state->tid = (pthread_t)-1;
state->fd = -1;
state->job = (mpi_plugin_client_info_t *) job;
return state;
}
static void
gmpi_state_destroy(gmpi_state_t *st)
{
xfree(st);
}
extern gmpi_state_t *
gmpi_thr_create(const mpi_plugin_client_info_t *job, char ***env)
{
short port;
pthread_attr_t attr;
gmpi_state_t *st = NULL;
st = gmpi_state_create(job);
/*
* It is possible for one to modify the mpirun command in
* MPICH-GM distribution so that it calls srun, instead of
* rsh, for remote process invocations. In that case, we
* should not override envs nor open the master port.
*/
if (getenv("GMPI_PORT"))
return st;
if (net_stream_listen (&st->fd, &port) < 0) {
error ("Unable to create GMPI listen port: %m");
gmpi_state_destroy(st);
return NULL;
}
/*
* Accept in a separate thread.
*/
slurm_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (pthread_create(&st->tid, &attr, &_gmpi_thr, (void *)st)) {
slurm_attr_destroy(&attr);
gmpi_state_destroy(st);
return NULL;
}
slurm_attr_destroy(&attr);
env_array_overwrite_fmt(env, "GMPI_PORT", "%hu", port);
env_array_overwrite_fmt(env, "GMPI_MAGIC", "%u", job->jobid);
env_array_overwrite_fmt(env, "GMPI_NP", "%d",
job->step_layout->task_cnt);
env_array_overwrite_fmt(env, "GMPI_SHMEM", "1");
/* FIXME for multi-board config. */
env_array_overwrite_fmt(env, "GMPI_BOARD", "-1");
/* For new MX version */
env_array_overwrite_fmt(env, "MXMPI_PORT", "%hu", port);
env_array_overwrite_fmt(env, "MXMPI_MAGIC", "%u", job->jobid);
env_array_overwrite_fmt(env, "MXMPI_NP", "%d",
job->step_layout->task_cnt);
/* FIXME for multi-board config. */
env_array_overwrite_fmt(env, "MXMPI_BOARD", "-1");
/* for MACOSX to override default malloc */
env_array_overwrite_fmt(env, "DYLD_FORCE_FLAT_NAMESPACE", "1");
debug("Started GMPI master thread (%lu)", (unsigned long) st->tid);
return st;
}
/*
* Warning: This pthread_cancel/pthread_join is a little unsafe. The thread is
* not joinable, so on most systems the join will fail, then the thread's state
* will be destroyed, possibly before the thread has actually stopped. In
* practice the thread will usually be waiting on an accept call when it gets
* cancelled. If the mpi thread has a mutex locked when it is cancelled--while
* using the "info" or "error" functions for logging--the caller will deadlock.
* See mpich1_p4.c or mvapich.c for code that shuts down cleanly by letting
* the mpi thread wait on a poll call, and creating a pipe that the poll waits
* on, which can be written to by the main thread to tell the mpi thread to
* exit. Also see rev 18654 of mpichmx.c, on
* branches/slurm-2.1.mpi.plugin.cleanup for an implementation. There were no
* myrinet systems available for testing, which is why I couldn't complete the
* patch for this plugin. -djb
*/
extern int gmpi_thr_destroy(gmpi_state_t *st)
{
if (st != NULL) {
if (st->tid != (pthread_t)-1) {
pthread_cancel(st->tid);
pthread_join(st->tid, NULL);
}
gmpi_state_destroy(st);
}
return SLURM_SUCCESS;
}